You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/01/26 11:27:06 UTC

[pulsar] branch branch-2.7 updated (4fc2e07 -> 78e7b20)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a change to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from 4fc2e07  Keep topic-level policies commands consistent with that for namespace… (#9215)
     new dc21282  Fix fake complete issue in offloading (#9306)
     new 78e7b20  Add refresh authentication command in broker (#9064)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 23 ++++++-
 .../bookkeeper/mledger/impl/OffloadPrefixTest.java | 49 ++++++++++++++
 .../authentication/SaslAuthenticateTest.java       |  2 +-
 .../broker/authentication/AuthenticationState.java |  4 +-
 .../AuthenticationProviderTokenTest.java           |  2 +-
 .../client/api/MutualAuthenticationTest.java       |  2 +-
 ...kenOauth2AuthenticatedProducerConsumerTest.java | 76 ++++++++++++++++++++++
 .../org/apache/pulsar/common/api/AuthData.java     |  5 +-
 .../client/impl/auth/AuthenticationSasl.java       |  2 +-
 .../impl/auth/SaslAuthenticationDataProvider.java  |  2 +-
 .../org/apache/pulsar/client/impl/ClientCnx.java   | 14 +++-
 .../pulsar/proxy/server/DirectProxyHandler.java    | 13 +++-
 .../apache/pulsar/proxy/server/ProxyClientCnx.java |  2 +-
 .../token/PulsarTokenAuthenticationBaseSuite.java  | 11 +++-
 14 files changed, 191 insertions(+), 16 deletions(-)


[pulsar] 02/02: Add refresh authentication command in broker (#9064)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 78e7b2069f25d3da96e8204e0dac27d44448602f
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Tue Jan 26 18:11:04 2021 +0800

    Add refresh authentication command in broker (#9064)
    
    * Add refresh authentication command in the broker
    ---
    
    **Motivation**
    
    Some authentication provider is using cached authentication data. Until
    redoing the 'getAuthData' then it will provide the new auth data. So
    I add a refresh command and do the refresh authentication data provider
    when received the command.
    
    **Verify this change**
    
    Existent tests can pass.
    
    (cherry picked from commit 373948ee1df6294ef43dafe457a86e53cd86ab32)
---
 .../authentication/SaslAuthenticateTest.java       |  2 +-
 .../broker/authentication/AuthenticationState.java |  4 +-
 .../AuthenticationProviderTokenTest.java           |  2 +-
 .../client/api/MutualAuthenticationTest.java       |  2 +-
 ...kenOauth2AuthenticatedProducerConsumerTest.java | 76 ++++++++++++++++++++++
 .../org/apache/pulsar/common/api/AuthData.java     |  5 +-
 .../client/impl/auth/AuthenticationSasl.java       |  2 +-
 .../impl/auth/SaslAuthenticationDataProvider.java  |  2 +-
 .../org/apache/pulsar/client/impl/ClientCnx.java   | 14 +++-
 .../pulsar/proxy/server/DirectProxyHandler.java    | 13 +++-
 .../apache/pulsar/proxy/server/ProxyClientCnx.java |  2 +-
 .../token/PulsarTokenAuthenticationBaseSuite.java  | 11 +++-
 12 files changed, 121 insertions(+), 14 deletions(-)

diff --git a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
index e94d2ae..bb20077 100644
--- a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
+++ b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
@@ -260,7 +260,7 @@ public class SaslAuthenticateTest extends ProducerConsumerBase {
 
         // auth between server and client.
         // first time auth
-        AuthData initData1 = dataProvider.authenticate(AuthData.of(AuthData.INIT_AUTH_DATA));
+        AuthData initData1 = dataProvider.authenticate(AuthData.INIT_AUTH_DATA);
         AuthData serverData1 = authState.authenticate(initData1);
         boolean complete = authState.isComplete();
         assertFalse(complete);
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationState.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationState.java
index ac881ac..f51e818 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationState.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationState.java
@@ -68,7 +68,7 @@ public interface AuthenticationState {
 
     /**
      * If the authentication state supports refreshing and the credentials are expired,
-     * the auth provider will call this method ot initiate the refresh process.
+     * the auth provider will call this method to initiate the refresh process.
      * <p>
      * The auth state here will return the broker side data that will be used to send
      * a challenge to the client.
@@ -77,6 +77,6 @@ public interface AuthenticationState {
      * @throws AuthenticationException
      */
     default AuthData refreshAuthentication() throws AuthenticationException {
-        return null;
+        return AuthData.REFRESH_AUTH_DATA;
     }
 }
diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderTokenTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderTokenTest.java
index 60fa5f6..c2610aa 100644
--- a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderTokenTest.java
+++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderTokenTest.java
@@ -634,7 +634,7 @@ public class AuthenticationProviderTokenTest {
         assertTrue(authState.isComplete());
 
         AuthData brokerData = authState.refreshAuthentication();
-        assertNull(brokerData);
+        assertEquals(brokerData, AuthData.REFRESH_AUTH_DATA);
     }
 
     // tests for Token Audience
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java
index ef06323..458c4ae 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java
@@ -70,7 +70,7 @@ public class MutualAuthenticationTest extends ProducerConsumerBase {
             String dataString = new String(data.getBytes(), UTF_8);
             AuthData toSend;
 
-            if (Arrays.equals(dataString.getBytes(), AuthData.INIT_AUTH_DATA)) {
+            if (Arrays.equals(dataString.getBytes(), AuthData.INIT_AUTH_DATA_BYTES)) {
                 toSend = AuthData.of(clientAuthStrings[0].getBytes(UTF_8));
             } else if (Arrays.equals(dataString.getBytes(), serverAuthStrings[0].getBytes(UTF_8))) {
                 toSend = AuthData.of(clientAuthStrings[1].getBytes(UTF_8));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java
index cfb80dc..b4e7bb0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java
@@ -25,15 +25,19 @@ import java.net.URI;
 import java.net.URL;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.time.Duration;
 import java.util.HashSet;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.client.impl.auth.oauth2.AuthenticationFactoryOAuth2;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.awaitility.Awaitility;
+import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.AfterMethod;
@@ -62,6 +66,7 @@ public class TokenOauth2AuthenticatedProducerConsumerTest extends ProducerConsum
     protected void setup() throws Exception {
         conf.setAuthenticationEnabled(true);
         conf.setAuthorizationEnabled(true);
+        conf.setAuthenticationRefreshCheckSeconds(5);
 
         Set<String> superUserRoles = new HashSet<>();
         superUserRoles.add(ADMIN_ROLE);
@@ -157,4 +162,75 @@ public class TokenOauth2AuthenticatedProducerConsumerTest extends ProducerConsum
         log.info("-- Exiting {} test --", methodName);
     }
 
+    @Test
+    public void testOAuth2TokenRefreshedWithoutReconnect() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+        clientSetup();
+
+        // test rest by admin
+        admin.clusters().createCluster("test", new ClusterData(brokerUrl.toString()));
+        admin.tenants().createTenant("my-property",
+            new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test")));
+        admin.namespaces().createNamespace("my-property/my-ns", Sets.newHashSet("test"));
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/my-topic")
+            .subscriptionName("my-subscriber-name").subscribe();
+
+        ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic");
+        Producer<byte[]> producer = producerBuilder.create();
+        for (int i = 0; i < 10; i++) {
+            String message = "my-message-" + i;
+            producer.send(message.getBytes());
+        }
+
+        Message<byte[]> msg = null;
+        Set<String> messageSet = Sets.newHashSet();
+        for (int i = 0; i < 10; i++) {
+            msg = consumer.receive(5, TimeUnit.SECONDS);
+            String receivedMessage = new String(msg.getData());
+            log.debug("Received message: [{}]", receivedMessage);
+            String expectedMessage = "my-message-" + i;
+            testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
+        }
+        // Acknowledge the consumption of all messages at once
+        consumer.acknowledgeCumulative(msg);
+
+        // get the first connection stats
+        ProducerImpl producerImpl = (ProducerImpl) producer;
+        String accessTokenOld = producerImpl.getClientCnx().getAuthenticationDataProvider().getCommandData();
+        long lastDisconnectTime = producer.getLastDisconnectedTimestamp();
+
+        // the token expire duration is 10 seconds, so we need to wait for the authenticationData refreshed
+        Awaitility.await()
+            .atLeast(10, TimeUnit.SECONDS)
+            .atMost(20, TimeUnit.SECONDS)
+            .with()
+            .pollInterval(Duration.ofSeconds(1))
+            .untilAsserted(() -> {
+                String accessTokenNew = producerImpl.getClientCnx().getAuthenticationDataProvider().getCommandData();
+                Assert.assertNotEquals(accessTokenOld, accessTokenNew);
+            });
+
+        // get the lastDisconnectTime, it should be same with the before, because the connection shouldn't disconnect
+        long lastDisconnectTimeAfterTokenExpired = producer.getLastDisconnectedTimestamp();
+        Assert.assertEquals(lastDisconnectTime, lastDisconnectTimeAfterTokenExpired);
+
+        for (int i = 0; i < 10; i++) {
+            String message = "my-message-" + i;
+            producer.send(message.getBytes());
+        }
+
+        msg = null;
+        messageSet = Sets.newHashSet();
+        for (int i = 0; i < 10; i++) {
+            msg = consumer.receive(5, TimeUnit.SECONDS);
+            String receivedMessage = new String(msg.getData());
+            log.debug("Received message: [{}]", receivedMessage);
+            String expectedMessage = "my-message-" + i;
+            testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
+        }
+        // Acknowledge the consumption of all messages at once
+        consumer.acknowledgeCumulative(msg);
+        consumer.close();
+    }
 }
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/common/api/AuthData.java b/pulsar-client-api/src/main/java/org/apache/pulsar/common/api/AuthData.java
index f12c436..188cb27 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/common/api/AuthData.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/common/api/AuthData.java
@@ -28,7 +28,10 @@ import lombok.Data;
 @Data(staticConstructor = "of")
 public final class AuthData {
     // CHECKSTYLE.OFF: StaticVariableName
-    public static byte[] INIT_AUTH_DATA = "PulsarAuthInit".getBytes(UTF_8);
+    public static byte[] INIT_AUTH_DATA_BYTES = "PulsarAuthInit".getBytes(UTF_8);
+    public static byte[] REFRESH_AUTH_DATA_BYTES = "PulsarAuthRefresh".getBytes(UTF_8);
+    public static AuthData INIT_AUTH_DATA = AuthData.of(INIT_AUTH_DATA_BYTES);
+    public static AuthData REFRESH_AUTH_DATA = AuthData.of(REFRESH_AUTH_DATA_BYTES);
     // CHECKSTYLE.ON: StaticVariableName
 
     private final byte[] bytes;
diff --git a/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationSasl.java b/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationSasl.java
index ee850e1..54d1802 100644
--- a/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationSasl.java
+++ b/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationSasl.java
@@ -261,7 +261,7 @@ public class AuthenticationSasl implements Authentication, EncodedAuthentication
             }
             // first time init
             headers.put(SASL_HEADER_STATE, SASL_STATE_CLIENT_INIT);
-            AuthData initData = authData.authenticate(AuthData.of(AuthData.INIT_AUTH_DATA));
+            AuthData initData = authData.authenticate(AuthData.INIT_AUTH_DATA);
             headers.put(SASL_AUTH_TOKEN,
                 Base64.getEncoder().encodeToString(initData.getBytes()));
         } else {
diff --git a/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/SaslAuthenticationDataProvider.java b/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/SaslAuthenticationDataProvider.java
index 261e06c..daae735 100644
--- a/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/SaslAuthenticationDataProvider.java
+++ b/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/SaslAuthenticationDataProvider.java
@@ -51,7 +51,7 @@ public class SaslAuthenticationDataProvider implements AuthenticationDataProvide
     @Override
     public AuthData authenticate(AuthData commandData) throws AuthenticationException {
         // init
-        if (Arrays.equals(commandData.getBytes(), AuthData.INIT_AUTH_DATA)) {
+        if (Arrays.equals(commandData.getBytes(), AuthData.INIT_AUTH_DATA_BYTES)) {
             if (pulsarSaslClient.hasInitialResponse()) {
                 return pulsarSaslClient.evaluateChallenge(AuthData.of(new byte[0]));
             }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index bbcba68..c551588 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -37,6 +37,7 @@ import io.netty.util.concurrent.Promise;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.nio.channels.ClosedChannelException;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
 import java.util.Queue;
@@ -138,6 +139,7 @@ public class ClientCnx extends PulsarHandler {
     private ScheduledFuture<?> timeoutTask;
 
     // Added for mutual authentication.
+    @Getter
     protected AuthenticationDataProvider authenticationDataProvider;
     private TransactionBufferHandler transactionBufferHandler;
 
@@ -226,7 +228,7 @@ public class ClientCnx extends PulsarHandler {
         // each channel will have a mutual client/server pair, mutual client evaluateChallenge with init data,
         // and return authData to server.
         authenticationDataProvider = authentication.getAuthData(remoteHostName);
-        AuthData authData = authenticationDataProvider.authenticate(AuthData.of(AuthData.INIT_AUTH_DATA));
+        AuthData authData = authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
         return Commands.newConnect(authentication.getAuthMethodName(), authData, this.protocolVersion,
                 PulsarVersion.getVersion(), proxyToTargetBrokerAddress, null, null, null);
     }
@@ -318,6 +320,16 @@ public class ClientCnx extends PulsarHandler {
         checkArgument(authChallenge.hasChallenge());
         checkArgument(authChallenge.getChallenge().hasAuthData());
 
+        if (Arrays.equals(AuthData.REFRESH_AUTH_DATA_BYTES, authChallenge.getChallenge().getAuthData().toByteArray())) {
+            try {
+                authenticationDataProvider = authentication.getAuthData(remoteHostName);
+            } catch (PulsarClientException e) {
+                log.error("{} Error when refreshing authentication data provider: {}", ctx.channel(), e);
+                connectionFuture.completeExceptionally(e);
+                return;
+            }
+        }
+
         // mutual authn. If auth not complete, continue auth; if auth complete, complete connectionFuture.
         try {
             AuthData authData = authenticationDataProvider
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index 44d7ad5..e7233a6 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -46,6 +46,7 @@ import io.netty.util.concurrent.FutureListener;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.Arrays;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Supplier;
@@ -57,6 +58,7 @@ import lombok.Getter;
 import org.apache.pulsar.PulsarVersion;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.AuthenticationDataProvider;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.tls.TlsHostnameVerifier;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.api.AuthData;
@@ -222,7 +224,7 @@ public class DirectProxyHandler {
             this.ctx = ctx;
             // Send the Connect command to broker
             authenticationDataProvider = authentication.getAuthData(remoteHostName);
-            AuthData authData = authenticationDataProvider.authenticate(AuthData.of(AuthData.INIT_AUTH_DATA));
+            AuthData authData = authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
             ByteBuf command = null;
             command = Commands.newConnect(authentication.getAuthMethodName(), authData, protocolVersion, "Pulsar proxy",
                     null /* target broker */, originalPrincipal, clientAuthData, clientAuthMethod);
@@ -262,6 +264,15 @@ public class DirectProxyHandler {
             checkArgument(authChallenge.hasChallenge());
             checkArgument(authChallenge.getChallenge().hasAuthData() && authChallenge.getChallenge().hasAuthData());
 
+            if (Arrays.equals(AuthData.REFRESH_AUTH_DATA_BYTES, authChallenge.getChallenge().getAuthData())) {
+                try {
+                    authenticationDataProvider = authentication.getAuthData(remoteHostName);
+                } catch (PulsarClientException e) {
+                    log.error("{} Error when refreshing authentication data provider: {}", ctx.channel(), e);
+                    return;
+                }
+            }
+
             // mutual authn. If auth not complete, continue auth; if auth complete, complete connectionFuture.
             try {
                 AuthData authData = authenticationDataProvider
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
index 0878370..665b9f8 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
@@ -54,7 +54,7 @@ public class ProxyClientCnx extends ClientCnx {
         }
 
         authenticationDataProvider = authentication.getAuthData(remoteHostName);
-        AuthData authData = authenticationDataProvider.authenticate(AuthData.of(AuthData.INIT_AUTH_DATA));
+        AuthData authData = authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
         return Commands.newConnect(authentication.getAuthMethodName(), authData, this.protocolVersion,
             PulsarVersion.getVersion(), proxyToTargetBrokerAddress, clientAuthRole, clientAuthData,
             clientAuthMethod);
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/PulsarTokenAuthenticationBaseSuite.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/PulsarTokenAuthenticationBaseSuite.java
index 7a923ee..eb96555 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/PulsarTokenAuthenticationBaseSuite.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/PulsarTokenAuthenticationBaseSuite.java
@@ -35,6 +35,8 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.ProducerImpl;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.auth.AuthenticationToken;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.TenantInfo;
@@ -288,6 +290,7 @@ public abstract class PulsarTokenAuthenticationBaseSuite extends PulsarClusterTe
         admin.namespaces().grantPermissionOnNamespace(namespace, REGULAR_USER_ROLE, EnumSet.allOf(AuthAction.class));
 
         String initialToken = this.createClientTokenWithExpiry(5, TimeUnit.SECONDS);
+        String refreshedToken = this.createClientTokenWithExpiry(30, TimeUnit.SECONDS);
 
         @Cleanup
         PulsarClient client = PulsarClient.builder()
@@ -295,7 +298,7 @@ public abstract class PulsarTokenAuthenticationBaseSuite extends PulsarClusterTe
                 .authentication(AuthenticationFactory.token(() -> {
                     if (shouldRefreshToken) {
                         try {
-                            return createClientTokenWithExpiry(5, TimeUnit.SECONDS);
+                            return refreshedToken;
                         } catch (Exception e) {
                             return null;
                         }
@@ -308,17 +311,19 @@ public abstract class PulsarTokenAuthenticationBaseSuite extends PulsarClusterTe
         @Cleanup
         Producer<String> producer = client.newProducer(Schema.STRING)
                 .topic(topic)
-                .sendTimeout(1, TimeUnit.SECONDS)
+                .sendTimeout(3, TimeUnit.SECONDS)
                 .create();
-
         // Initially the token is valid and producer will be able to publish
         producer.send("hello-1");
+        long lastDisconnectedTimestamp = producer.getLastDisconnectedTimestamp();
 
         Thread.sleep(TimeUnit.SECONDS.toMillis(10));
 
         if (shouldRefreshToken) {
             // The token will have been refreshed, so the app won't see any error
             producer.send("hello-2");
+            long timestamp = producer.getLastDisconnectedTimestamp();
+            assertEquals(timestamp, lastDisconnectedTimestamp);
         } else {
             // The token has expired, so this next message will be rejected
             try {


[pulsar] 01/02: Fix fake complete issue in offloading (#9306)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit dc21282348c45228226722e5e458aab49ed5b072
Author: Renkai Ge <ga...@gmail.com>
AuthorDate: Tue Jan 26 17:23:26 2021 +0800

    Fix fake complete issue in offloading (#9306)
    
    ### Motivation
    In our current code, complete in offloading context may set true even sync metadata to Zookeeper failed, which may lead to more fatal error like data in Bookkeeper will be deleted but other managed ledger will see data not offloaded and try to read from Bookkeeper.
    
    ### Modification
    This PR make sure local ledger info will be updated after Zookeeper updated.
    * prevent ledgers info change without write to zk succeed
    * add unit test to prevent fake positive when offload failed
    
    (cherry picked from commit 3c22b473ddb124941d4bc9044ed6caaad97fab53)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 23 +++++++++-
 .../bookkeeper/mledger/impl/OffloadPrefixTest.java | 49 ++++++++++++++++++++++
 2 files changed, 70 insertions(+), 2 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 1e6b898..10c54bb 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -2637,12 +2637,14 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                 } else {
                     try {
                         LedgerInfo newInfo = transformation.transform(oldInfo);
-                        ledgers.put(ledgerId, newInfo);
-                        store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(), ledgersStat,
+                        final HashMap<Long, LedgerInfo> newLedgers = new HashMap<>(ledgers);
+                        newLedgers.put(ledgerId, newInfo);
+                        store.asyncUpdateLedgerIds(name, buildManagedLedgerInfo(newLedgers), ledgersStat,
                                 new MetaStoreCallback<Void>() {
                                     @Override
                                     public void operationComplete(Void result, Stat stat) {
                                         ledgersStat = stat;
+                                        ledgers.put(ledgerId, newInfo);
                                         unlockingPromise.complete(null);
                                     }
 
@@ -3111,6 +3113,23 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         return mlInfo.build();
     }
 
+    private ManagedLedgerInfo buildManagedLedgerInfo(Map<Long, LedgerInfo> ledgers) {
+        ManagedLedgerInfo.Builder mlInfo = ManagedLedgerInfo.newBuilder().addAllLedgerInfo(ledgers.values());
+        if (state == State.Terminated) {
+            mlInfo.setTerminatedPosition(NestedPositionInfo.newBuilder().setLedgerId(lastConfirmedEntry.getLedgerId())
+                    .setEntryId(lastConfirmedEntry.getEntryId()));
+        }
+        if (managedLedgerInterceptor != null) {
+            managedLedgerInterceptor.onUpdateManagedLedgerInfo(propertiesMap);
+        }
+        for (Map.Entry<String, String> property : propertiesMap.entrySet()) {
+            mlInfo.addProperties(MLDataFormats.KeyValue.newBuilder()
+                    .setKey(property.getKey()).setValue(property.getValue()));
+        }
+
+        return mlInfo.build();
+    }
+
     /**
      * Throws an exception if the managed ledger has been previously fenced.
      *
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
index 807876e..81080db 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java
@@ -47,11 +47,13 @@ import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
 import org.apache.commons.lang3.tuple.Pair;
 
 import org.apache.pulsar.common.policies.data.OffloadPolicies;
+import org.apache.zookeeper.MockZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -979,8 +981,13 @@ public class OffloadPrefixTest extends MockedBookKeeperTestCase {
     }
 
     static class MockLedgerOffloader implements LedgerOffloader {
+        interface InjectAfterOffload {
+            void call();
+        }
+
         ConcurrentHashMap<Long, UUID> offloads = new ConcurrentHashMap<Long, UUID>();
         ConcurrentHashMap<Long, UUID> deletes = new ConcurrentHashMap<Long, UUID>();
+        InjectAfterOffload inject = null;
 
         Set<Long> offloadedLedgers() {
             return offloads.keySet();
@@ -1012,6 +1019,10 @@ public class OffloadPrefixTest extends MockedBookKeeperTestCase {
             } else {
                 promise.completeExceptionally(new Exception("Already exists exception"));
             }
+
+            if (inject != null) {
+                inject.call();
+            }
             return promise;
         }
 
@@ -1047,6 +1058,44 @@ public class OffloadPrefixTest extends MockedBookKeeperTestCase {
         }
     }
 
+    @Test
+    public void testFailByZk() throws Exception {
+        MockLedgerOffloader offloader = new MockLedgerOffloader();
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setMaxEntriesPerLedger(10);
+        config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
+        config.setRetentionTime(10, TimeUnit.MINUTES);
+        config.setRetentionSizeInMB(10);
+        config.setLedgerOffloader(offloader);
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("my_test_ledger", config);
+
+        int i = 0;
+        for (; i < 25; i++) {
+            String content = "entry-" + i;
+            ledger.addEntry(content.getBytes());
+        }
+        assertEquals(ledger.getLedgersInfoAsList().size(), 3);
+
+        offloader.inject = () -> {
+            try {
+                stopZooKeeper();
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        };
+
+        try {
+            ledger.offloadPrefix(ledger.getLastConfirmedEntry());
+        } catch (Exception e) {
+
+        }
+        final LedgerInfo ledgerInfo = ledger.getLedgersInfoAsList().get(0);
+        final MLDataFormats.OffloadContext offloadContext = ledgerInfo.getOffloadContext();
+        //should not set complete when
+        assertEquals(offloadContext.getComplete(), false);
+        zkc = MockZooKeeper.newInstance();
+    }
+
     static class ErroringMockLedgerOffloader extends MockLedgerOffloader {
         CompletableFuture<Set<Long>> errorLedgers = new CompletableFuture<>();