You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/06/15 13:43:45 UTC

[pulsar] branch master updated: [fix][broker] Fix token expiration (#16016)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7576c930351 [fix][broker] Fix token expiration (#16016)
7576c930351 is described below

commit 7576c9303513ef8212452ff64a5a53ec7def6a5b
Author: Zixuan Liu <no...@gmail.com>
AuthorDate: Wed Jun 15 21:43:37 2022 +0800

    [fix][broker] Fix token expiration (#16016)
    
    ### Motivation
    
    When token expiration, the broker requests the client to refresh the token, then the broker performs `org.apache.pulsar.broker.service.ServerCnx#doAuthentication` when the broker receives the auth response, which uses `org.apache.pulsar.broker.authentication.AuthenticationState#authenticate` to authentication, but the `org.apache.pulsar.broker.authentication.AuthenticationProviderToken.TokenAuthenticationState#authenticate` doesn't do anything, this cause a loop to refresh the token.
    
    Right now the token is only validated in the TokenAuthenticationState constructor, we need to add a check to the authenticate method.
---
 .../AuthenticationProviderToken.java               |   3 +-
 .../pulsar/client/api/ProducerConsumerBase.java    |   6 +
 .../api/TokenExpirationProduceConsumerTest.java    | 169 +++++++++++++++++++++
 3 files changed, 177 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderToken.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderToken.java
index 164d5ee672c..ccd4e8e1aab 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderToken.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderToken.java
@@ -356,7 +356,8 @@ public class AuthenticationProviderToken implements AuthenticationProvider {
          */
         @Override
         public AuthData authenticate(AuthData authData) throws AuthenticationException {
-            // There's no additional auth stage required
+            String token = new String(authData.getBytes(), UTF_8);
+            checkExpiration(token);
             return null;
         }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerConsumerBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerConsumerBase.java
index f96f8e47a26..a7a14568f6e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerConsumerBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerConsumerBase.java
@@ -31,6 +31,12 @@ import org.testng.Assert;
 import org.testng.annotations.BeforeMethod;
 
 public abstract class ProducerConsumerBase extends MockedPulsarServiceBaseTest {
+    protected final String TLS_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/cacert.pem";
+    protected final String TLS_CLIENT_CERT_FILE_PATH = "./src/test/resources/authentication/tls/client-cert.pem";
+    protected final String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/authentication/tls/client-key.pem";
+    protected final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/authentication/tls/broker-cert.pem";
+    protected final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/authentication/tls/broker-key.pem";
+
     protected String methodName;
 
     @BeforeMethod(alwaysRun = true)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenExpirationProduceConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenExpirationProduceConsumerTest.java
new file mode 100644
index 00000000000..6b26d174042
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenExpirationProduceConsumerTest.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.client.api;
+
+import static org.testng.Assert.assertThrows;
+import static org.testng.Assert.assertTrue;
+import com.google.common.collect.Sets;
+import io.jsonwebtoken.Jwts;
+import io.jsonwebtoken.SignatureAlgorithm;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import javax.crypto.SecretKey;
+import java.time.Duration;
+import java.util.Base64;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@Test(groups = "broker-api")
+@Slf4j
+public class TokenExpirationProduceConsumerTest extends TlsProducerConsumerBase {
+    private final String tenant ="my-tenant";
+    private final NamespaceName namespaceName = NamespaceName.get("my-tenant","my-ns");
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        // TLS configuration for Broker
+        internalSetUpForBroker();
+
+        // Start Broker
+        super.init();
+
+        admin = getAdmin(ADMIN_TOKEN);
+        admin.clusters().createCluster(configClusterName,
+                ClusterData.builder()
+                        .serviceUrl(brokerUrl.toString())
+                        .serviceUrlTls(brokerUrlTls.toString())
+                        .brokerServiceUrl(pulsar.getBrokerServiceUrl())
+                        .brokerServiceUrlTls(pulsar.getBrokerServiceUrlTls())
+                        .build());
+        admin.tenants().createTenant(tenant,
+                new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet(configClusterName)));
+        admin.namespaces().createNamespace(namespaceName.toString());
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    private static final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+    public static final String ADMIN_TOKEN = Jwts.builder().setSubject("admin").signWith(SECRET_KEY).compact();
+
+    public String getExpireToken(String role, Date date) {
+        return Jwts.builder().setSubject(role).signWith(SECRET_KEY)
+                .setExpiration(date).compact();
+    }
+
+    protected void internalSetUpForBroker() {
+        conf.setBrokerServicePortTls(Optional.of(0));
+        conf.setWebServicePortTls(Optional.of(0));
+        conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
+        conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
+        conf.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
+        conf.setClusterName(configClusterName);
+        conf.setAuthenticationRefreshCheckSeconds(1);
+        conf.setTlsRequireTrustedClientCertOnConnect(false);
+        conf.setTlsAllowInsecureConnection(false);
+        conf.setAuthenticationEnabled(true);
+        conf.setTransactionCoordinatorEnabled(true);
+        conf.setSuperUserRoles(Sets.newHashSet("admin"));
+        conf.setAuthenticationProviders(Sets.newHashSet(AuthenticationProviderToken.class.getName()));
+        conf.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
+        conf.setBrokerClientAuthenticationParameters("token:" + ADMIN_TOKEN);
+        conf.getProperties().setProperty("tokenSecretKey", "data:;base64,"
+                + Base64.getEncoder().encodeToString(SECRET_KEY.getEncoded()));
+    }
+
+    private PulsarClient getClient(String token) throws Exception {
+        ClientBuilder clientBuilder = PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrlTls())
+                .tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH)
+                .enableTls(true)
+                .allowTlsInsecureConnection(false)
+                .enableTlsHostnameVerification(true)
+                .authentication(AuthenticationToken.class.getName(),"token:" +token)
+                .operationTimeout(1000, TimeUnit.MILLISECONDS);
+        return clientBuilder.build();
+    }
+
+    private PulsarAdmin getAdmin(String token) throws Exception {
+        PulsarAdminBuilder clientBuilder = PulsarAdmin.builder().serviceHttpUrl(pulsar.getWebServiceAddressTls())
+                .tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH)
+                .allowTlsInsecureConnection(false)
+                .authentication(AuthenticationToken.class.getName(),"token:" +token)
+                .enableTlsHostnameVerification(true);
+        return clientBuilder.build();
+    }
+
+    @Test
+    public void testTokenExpirationProduceConsumer() throws Exception {
+        Calendar calendar = Calendar.getInstance();
+        calendar.add(Calendar.SECOND, 20);
+        String role = "test";
+        String token = getExpireToken(role, calendar.getTime());
+        Date expiredTime = calendar.getTime();
+
+        Set<AuthAction> permissions = new HashSet<>();
+        permissions.add(AuthAction.consume);
+        permissions.add(AuthAction.produce);
+        admin.namespaces().grantPermissionOnNamespace(namespaceName.toString(), role, permissions);
+
+        @Cleanup
+        PulsarClient pulsarClient = getClient(token);
+        String topic = namespaceName + "/test-token";
+
+        @Cleanup final Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName("test-token")
+                .subscribe();
+        @Cleanup final Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .create();
+
+        Awaitility.await().timeout(Duration.ofSeconds(60)).pollInterval(3, TimeUnit.SECONDS).untilAsserted(() -> {
+            assertThrows(PulsarClientException.TimeoutException.class, () -> {
+                producer.send("heart beat".getBytes());
+                Message<byte[]> message = consumer.receive();
+                consumer.acknowledge(message);
+            });
+            assertTrue(new Date().compareTo(expiredTime) > 0);
+        });
+    }
+}