You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/01/26 11:27:08 UTC

[pulsar] 02/02: Add refresh authentication command in broker (#9064)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 78e7b2069f25d3da96e8204e0dac27d44448602f
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Tue Jan 26 18:11:04 2021 +0800

    Add refresh authentication command in broker (#9064)
    
    * Add refresh authentication command in the broker
    ---
    
    **Motivation**
    
    Some authentication provider is using cached authentication data. Until
    redoing the 'getAuthData' then it will provide the new auth data. So
    I add a refresh command and do the refresh authentication data provider
    when received the command.
    
    **Verify this change**
    
    Existent tests can pass.
    
    (cherry picked from commit 373948ee1df6294ef43dafe457a86e53cd86ab32)
---
 .../authentication/SaslAuthenticateTest.java       |  2 +-
 .../broker/authentication/AuthenticationState.java |  4 +-
 .../AuthenticationProviderTokenTest.java           |  2 +-
 .../client/api/MutualAuthenticationTest.java       |  2 +-
 ...kenOauth2AuthenticatedProducerConsumerTest.java | 76 ++++++++++++++++++++++
 .../org/apache/pulsar/common/api/AuthData.java     |  5 +-
 .../client/impl/auth/AuthenticationSasl.java       |  2 +-
 .../impl/auth/SaslAuthenticationDataProvider.java  |  2 +-
 .../org/apache/pulsar/client/impl/ClientCnx.java   | 14 +++-
 .../pulsar/proxy/server/DirectProxyHandler.java    | 13 +++-
 .../apache/pulsar/proxy/server/ProxyClientCnx.java |  2 +-
 .../token/PulsarTokenAuthenticationBaseSuite.java  | 11 +++-
 12 files changed, 121 insertions(+), 14 deletions(-)

diff --git a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
index e94d2ae..bb20077 100644
--- a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
+++ b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
@@ -260,7 +260,7 @@ public class SaslAuthenticateTest extends ProducerConsumerBase {
 
         // auth between server and client.
         // first time auth
-        AuthData initData1 = dataProvider.authenticate(AuthData.of(AuthData.INIT_AUTH_DATA));
+        AuthData initData1 = dataProvider.authenticate(AuthData.INIT_AUTH_DATA);
         AuthData serverData1 = authState.authenticate(initData1);
         boolean complete = authState.isComplete();
         assertFalse(complete);
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationState.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationState.java
index ac881ac..f51e818 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationState.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationState.java
@@ -68,7 +68,7 @@ public interface AuthenticationState {
 
     /**
      * If the authentication state supports refreshing and the credentials are expired,
-     * the auth provider will call this method ot initiate the refresh process.
+     * the auth provider will call this method to initiate the refresh process.
      * <p>
      * The auth state here will return the broker side data that will be used to send
      * a challenge to the client.
@@ -77,6 +77,6 @@ public interface AuthenticationState {
      * @throws AuthenticationException
      */
     default AuthData refreshAuthentication() throws AuthenticationException {
-        return null;
+        return AuthData.REFRESH_AUTH_DATA;
     }
 }
diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderTokenTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderTokenTest.java
index 60fa5f6..c2610aa 100644
--- a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderTokenTest.java
+++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderTokenTest.java
@@ -634,7 +634,7 @@ public class AuthenticationProviderTokenTest {
         assertTrue(authState.isComplete());
 
         AuthData brokerData = authState.refreshAuthentication();
-        assertNull(brokerData);
+        assertEquals(brokerData, AuthData.REFRESH_AUTH_DATA);
     }
 
     // tests for Token Audience
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java
index ef06323..458c4ae 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java
@@ -70,7 +70,7 @@ public class MutualAuthenticationTest extends ProducerConsumerBase {
             String dataString = new String(data.getBytes(), UTF_8);
             AuthData toSend;
 
-            if (Arrays.equals(dataString.getBytes(), AuthData.INIT_AUTH_DATA)) {
+            if (Arrays.equals(dataString.getBytes(), AuthData.INIT_AUTH_DATA_BYTES)) {
                 toSend = AuthData.of(clientAuthStrings[0].getBytes(UTF_8));
             } else if (Arrays.equals(dataString.getBytes(), serverAuthStrings[0].getBytes(UTF_8))) {
                 toSend = AuthData.of(clientAuthStrings[1].getBytes(UTF_8));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java
index cfb80dc..b4e7bb0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java
@@ -25,15 +25,19 @@ import java.net.URI;
 import java.net.URL;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.time.Duration;
 import java.util.HashSet;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.client.impl.auth.oauth2.AuthenticationFactoryOAuth2;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.awaitility.Awaitility;
+import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.AfterMethod;
@@ -62,6 +66,7 @@ public class TokenOauth2AuthenticatedProducerConsumerTest extends ProducerConsum
     protected void setup() throws Exception {
         conf.setAuthenticationEnabled(true);
         conf.setAuthorizationEnabled(true);
+        conf.setAuthenticationRefreshCheckSeconds(5);
 
         Set<String> superUserRoles = new HashSet<>();
         superUserRoles.add(ADMIN_ROLE);
@@ -157,4 +162,75 @@ public class TokenOauth2AuthenticatedProducerConsumerTest extends ProducerConsum
         log.info("-- Exiting {} test --", methodName);
     }
 
+    @Test
+    public void testOAuth2TokenRefreshedWithoutReconnect() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+        clientSetup();
+
+        // test rest by admin
+        admin.clusters().createCluster("test", new ClusterData(brokerUrl.toString()));
+        admin.tenants().createTenant("my-property",
+            new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test")));
+        admin.namespaces().createNamespace("my-property/my-ns", Sets.newHashSet("test"));
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/my-topic")
+            .subscriptionName("my-subscriber-name").subscribe();
+
+        ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic");
+        Producer<byte[]> producer = producerBuilder.create();
+        for (int i = 0; i < 10; i++) {
+            String message = "my-message-" + i;
+            producer.send(message.getBytes());
+        }
+
+        Message<byte[]> msg = null;
+        Set<String> messageSet = Sets.newHashSet();
+        for (int i = 0; i < 10; i++) {
+            msg = consumer.receive(5, TimeUnit.SECONDS);
+            String receivedMessage = new String(msg.getData());
+            log.debug("Received message: [{}]", receivedMessage);
+            String expectedMessage = "my-message-" + i;
+            testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
+        }
+        // Acknowledge the consumption of all messages at once
+        consumer.acknowledgeCumulative(msg);
+
+        // get the first connection stats
+        ProducerImpl producerImpl = (ProducerImpl) producer;
+        String accessTokenOld = producerImpl.getClientCnx().getAuthenticationDataProvider().getCommandData();
+        long lastDisconnectTime = producer.getLastDisconnectedTimestamp();
+
+        // the token expire duration is 10 seconds, so we need to wait for the authenticationData refreshed
+        Awaitility.await()
+            .atLeast(10, TimeUnit.SECONDS)
+            .atMost(20, TimeUnit.SECONDS)
+            .with()
+            .pollInterval(Duration.ofSeconds(1))
+            .untilAsserted(() -> {
+                String accessTokenNew = producerImpl.getClientCnx().getAuthenticationDataProvider().getCommandData();
+                Assert.assertNotEquals(accessTokenOld, accessTokenNew);
+            });
+
+        // get the lastDisconnectTime, it should be same with the before, because the connection shouldn't disconnect
+        long lastDisconnectTimeAfterTokenExpired = producer.getLastDisconnectedTimestamp();
+        Assert.assertEquals(lastDisconnectTime, lastDisconnectTimeAfterTokenExpired);
+
+        for (int i = 0; i < 10; i++) {
+            String message = "my-message-" + i;
+            producer.send(message.getBytes());
+        }
+
+        msg = null;
+        messageSet = Sets.newHashSet();
+        for (int i = 0; i < 10; i++) {
+            msg = consumer.receive(5, TimeUnit.SECONDS);
+            String receivedMessage = new String(msg.getData());
+            log.debug("Received message: [{}]", receivedMessage);
+            String expectedMessage = "my-message-" + i;
+            testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
+        }
+        // Acknowledge the consumption of all messages at once
+        consumer.acknowledgeCumulative(msg);
+        consumer.close();
+    }
 }
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/common/api/AuthData.java b/pulsar-client-api/src/main/java/org/apache/pulsar/common/api/AuthData.java
index f12c436..188cb27 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/common/api/AuthData.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/common/api/AuthData.java
@@ -28,7 +28,10 @@ import lombok.Data;
 @Data(staticConstructor = "of")
 public final class AuthData {
     // CHECKSTYLE.OFF: StaticVariableName
-    public static byte[] INIT_AUTH_DATA = "PulsarAuthInit".getBytes(UTF_8);
+    public static byte[] INIT_AUTH_DATA_BYTES = "PulsarAuthInit".getBytes(UTF_8);
+    public static byte[] REFRESH_AUTH_DATA_BYTES = "PulsarAuthRefresh".getBytes(UTF_8);
+    public static AuthData INIT_AUTH_DATA = AuthData.of(INIT_AUTH_DATA_BYTES);
+    public static AuthData REFRESH_AUTH_DATA = AuthData.of(REFRESH_AUTH_DATA_BYTES);
     // CHECKSTYLE.ON: StaticVariableName
 
     private final byte[] bytes;
diff --git a/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationSasl.java b/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationSasl.java
index ee850e1..54d1802 100644
--- a/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationSasl.java
+++ b/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationSasl.java
@@ -261,7 +261,7 @@ public class AuthenticationSasl implements Authentication, EncodedAuthentication
             }
             // first time init
             headers.put(SASL_HEADER_STATE, SASL_STATE_CLIENT_INIT);
-            AuthData initData = authData.authenticate(AuthData.of(AuthData.INIT_AUTH_DATA));
+            AuthData initData = authData.authenticate(AuthData.INIT_AUTH_DATA);
             headers.put(SASL_AUTH_TOKEN,
                 Base64.getEncoder().encodeToString(initData.getBytes()));
         } else {
diff --git a/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/SaslAuthenticationDataProvider.java b/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/SaslAuthenticationDataProvider.java
index 261e06c..daae735 100644
--- a/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/SaslAuthenticationDataProvider.java
+++ b/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/SaslAuthenticationDataProvider.java
@@ -51,7 +51,7 @@ public class SaslAuthenticationDataProvider implements AuthenticationDataProvide
     @Override
     public AuthData authenticate(AuthData commandData) throws AuthenticationException {
         // init
-        if (Arrays.equals(commandData.getBytes(), AuthData.INIT_AUTH_DATA)) {
+        if (Arrays.equals(commandData.getBytes(), AuthData.INIT_AUTH_DATA_BYTES)) {
             if (pulsarSaslClient.hasInitialResponse()) {
                 return pulsarSaslClient.evaluateChallenge(AuthData.of(new byte[0]));
             }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index bbcba68..c551588 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -37,6 +37,7 @@ import io.netty.util.concurrent.Promise;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.nio.channels.ClosedChannelException;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
 import java.util.Queue;
@@ -138,6 +139,7 @@ public class ClientCnx extends PulsarHandler {
     private ScheduledFuture<?> timeoutTask;
 
     // Added for mutual authentication.
+    @Getter
     protected AuthenticationDataProvider authenticationDataProvider;
     private TransactionBufferHandler transactionBufferHandler;
 
@@ -226,7 +228,7 @@ public class ClientCnx extends PulsarHandler {
         // each channel will have a mutual client/server pair, mutual client evaluateChallenge with init data,
         // and return authData to server.
         authenticationDataProvider = authentication.getAuthData(remoteHostName);
-        AuthData authData = authenticationDataProvider.authenticate(AuthData.of(AuthData.INIT_AUTH_DATA));
+        AuthData authData = authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
         return Commands.newConnect(authentication.getAuthMethodName(), authData, this.protocolVersion,
                 PulsarVersion.getVersion(), proxyToTargetBrokerAddress, null, null, null);
     }
@@ -318,6 +320,16 @@ public class ClientCnx extends PulsarHandler {
         checkArgument(authChallenge.hasChallenge());
         checkArgument(authChallenge.getChallenge().hasAuthData());
 
+        if (Arrays.equals(AuthData.REFRESH_AUTH_DATA_BYTES, authChallenge.getChallenge().getAuthData().toByteArray())) {
+            try {
+                authenticationDataProvider = authentication.getAuthData(remoteHostName);
+            } catch (PulsarClientException e) {
+                log.error("{} Error when refreshing authentication data provider: {}", ctx.channel(), e);
+                connectionFuture.completeExceptionally(e);
+                return;
+            }
+        }
+
         // mutual authn. If auth not complete, continue auth; if auth complete, complete connectionFuture.
         try {
             AuthData authData = authenticationDataProvider
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index 44d7ad5..e7233a6 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -46,6 +46,7 @@ import io.netty.util.concurrent.FutureListener;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.Arrays;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Supplier;
@@ -57,6 +58,7 @@ import lombok.Getter;
 import org.apache.pulsar.PulsarVersion;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.AuthenticationDataProvider;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.tls.TlsHostnameVerifier;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.api.AuthData;
@@ -222,7 +224,7 @@ public class DirectProxyHandler {
             this.ctx = ctx;
             // Send the Connect command to broker
             authenticationDataProvider = authentication.getAuthData(remoteHostName);
-            AuthData authData = authenticationDataProvider.authenticate(AuthData.of(AuthData.INIT_AUTH_DATA));
+            AuthData authData = authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
             ByteBuf command = null;
             command = Commands.newConnect(authentication.getAuthMethodName(), authData, protocolVersion, "Pulsar proxy",
                     null /* target broker */, originalPrincipal, clientAuthData, clientAuthMethod);
@@ -262,6 +264,15 @@ public class DirectProxyHandler {
             checkArgument(authChallenge.hasChallenge());
             checkArgument(authChallenge.getChallenge().hasAuthData() && authChallenge.getChallenge().hasAuthData());
 
+            if (Arrays.equals(AuthData.REFRESH_AUTH_DATA_BYTES, authChallenge.getChallenge().getAuthData())) {
+                try {
+                    authenticationDataProvider = authentication.getAuthData(remoteHostName);
+                } catch (PulsarClientException e) {
+                    log.error("{} Error when refreshing authentication data provider: {}", ctx.channel(), e);
+                    return;
+                }
+            }
+
             // mutual authn. If auth not complete, continue auth; if auth complete, complete connectionFuture.
             try {
                 AuthData authData = authenticationDataProvider
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
index 0878370..665b9f8 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
@@ -54,7 +54,7 @@ public class ProxyClientCnx extends ClientCnx {
         }
 
         authenticationDataProvider = authentication.getAuthData(remoteHostName);
-        AuthData authData = authenticationDataProvider.authenticate(AuthData.of(AuthData.INIT_AUTH_DATA));
+        AuthData authData = authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
         return Commands.newConnect(authentication.getAuthMethodName(), authData, this.protocolVersion,
             PulsarVersion.getVersion(), proxyToTargetBrokerAddress, clientAuthRole, clientAuthData,
             clientAuthMethod);
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/PulsarTokenAuthenticationBaseSuite.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/PulsarTokenAuthenticationBaseSuite.java
index 7a923ee..eb96555 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/PulsarTokenAuthenticationBaseSuite.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/PulsarTokenAuthenticationBaseSuite.java
@@ -35,6 +35,8 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.ProducerImpl;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.auth.AuthenticationToken;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.TenantInfo;
@@ -288,6 +290,7 @@ public abstract class PulsarTokenAuthenticationBaseSuite extends PulsarClusterTe
         admin.namespaces().grantPermissionOnNamespace(namespace, REGULAR_USER_ROLE, EnumSet.allOf(AuthAction.class));
 
         String initialToken = this.createClientTokenWithExpiry(5, TimeUnit.SECONDS);
+        String refreshedToken = this.createClientTokenWithExpiry(30, TimeUnit.SECONDS);
 
         @Cleanup
         PulsarClient client = PulsarClient.builder()
@@ -295,7 +298,7 @@ public abstract class PulsarTokenAuthenticationBaseSuite extends PulsarClusterTe
                 .authentication(AuthenticationFactory.token(() -> {
                     if (shouldRefreshToken) {
                         try {
-                            return createClientTokenWithExpiry(5, TimeUnit.SECONDS);
+                            return refreshedToken;
                         } catch (Exception e) {
                             return null;
                         }
@@ -308,17 +311,19 @@ public abstract class PulsarTokenAuthenticationBaseSuite extends PulsarClusterTe
         @Cleanup
         Producer<String> producer = client.newProducer(Schema.STRING)
                 .topic(topic)
-                .sendTimeout(1, TimeUnit.SECONDS)
+                .sendTimeout(3, TimeUnit.SECONDS)
                 .create();
-
         // Initially the token is valid and producer will be able to publish
         producer.send("hello-1");
+        long lastDisconnectedTimestamp = producer.getLastDisconnectedTimestamp();
 
         Thread.sleep(TimeUnit.SECONDS.toMillis(10));
 
         if (shouldRefreshToken) {
             // The token will have been refreshed, so the app won't see any error
             producer.send("hello-2");
+            long timestamp = producer.getLastDisconnectedTimestamp();
+            assertEquals(timestamp, lastDisconnectedTimestamp);
         } else {
             // The token has expired, so this next message will be rejected
             try {