You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xi...@apache.org on 2022/06/22 06:51:33 UTC

[pulsar] branch master updated: [fix][client] Fix changeToReadyState of HandlerState (#15722)

This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 415c6ed244b [fix][client] Fix changeToReadyState of HandlerState (#15722)
415c6ed244b is described below

commit 415c6ed244b2c0bd53c8884b258528cae7b175ce
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Wed Jun 22 14:51:22 2022 +0800

    [fix][client] Fix changeToReadyState of HandlerState (#15722)
    
    * [fix][client] Fix changeToReadyState of HandlerState
    ### Motivation
    When connecting timeout, connectionHandler will reconnect. And then the connectionHandler may receive two response. When the second response arrives, the handler state is the ready state, and then we should allow  handlerState to change state to from Ready to Ready.
    ### Modification
    Allow  handlerState to change state to from Ready to Ready by changeToReadyState method.
---
 .../pulsar/client/impl/TransactionClientConnectTest.java   | 14 ++++++++++++++
 .../java/org/apache/pulsar/client/impl/HandlerState.java   |  9 +++++++--
 2 files changed, 21 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionClientConnectTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionClientConnectTest.java
index db8a117d4ec..30d608c458f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionClientConnectTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionClientConnectTest.java
@@ -43,6 +43,7 @@ import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
 import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
 import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
 import org.awaitility.Awaitility;
+import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -159,6 +160,19 @@ public class TransactionClientConnectTest extends TransactionTestBase {
         }
     }
 
+    @Test
+    public void testHandlerStateChangeToReady() throws Exception {
+        TransactionCoordinatorClientImpl transactionCoordinatorClient =
+                ((PulsarClientImpl) pulsarClient).getTcClient();
+        Field field = TransactionCoordinatorClientImpl.class.getDeclaredField("handlers");
+        field.setAccessible(true);
+        TransactionMetaStoreHandler[] handlers =
+                (TransactionMetaStoreHandler[]) field.get(transactionCoordinatorClient);
+        TransactionMetaStoreHandler transactionMetaStoreHandler = handlers[0];
+        Assert.assertEquals(transactionMetaStoreHandler.getConnectHandleState(), HandlerState.State.Ready);
+        Assert.assertTrue(transactionMetaStoreHandler.changeToReadyState());
+    }
+
     public void start() throws Exception {
         // wait transaction coordinator init success
         pulsarClient.newTransaction()
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerState.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerState.java
index 582df8c112b..822ba411b71 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerState.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerState.java
@@ -51,6 +51,9 @@ abstract class HandlerState {
 
     // moves the state to ready if it wasn't closed
     protected boolean changeToReadyState() {
+        if (STATE_UPDATER.get(this) == State.Ready) {
+            return true;
+        }
         return (STATE_UPDATER.compareAndSet(this, State.Uninitialized, State.Ready)
                 || STATE_UPDATER.compareAndSet(this, State.Connecting, State.Ready)
                 || STATE_UPDATER.compareAndSet(this, State.RegisteringSchema, State.Ready));
@@ -65,10 +68,12 @@ abstract class HandlerState {
     }
 
     protected boolean changeToConnecting() {
+        if (STATE_UPDATER.get(this) == State.Connecting) {
+            return true;
+        }
         return (STATE_UPDATER.compareAndSet(this, State.Uninitialized, State.Connecting)
                 || STATE_UPDATER.compareAndSet(this, State.Ready, State.Connecting)
-                || STATE_UPDATER.compareAndSet(this, State.RegisteringSchema, State.Connecting)
-                || STATE_UPDATER.compareAndSet(this, State.Connecting, State.Connecting));
+                || STATE_UPDATER.compareAndSet(this, State.RegisteringSchema, State.Connecting));
     }
 
     protected void setState(State s) {