You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/11/09 13:39:51 UTC

[pulsar] 02/03: [Transaction] Fix close pulsarClient then close transaction client connection (#12689)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit fff54d5593471bf38de915d9f72bae5e988226a7
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Tue Nov 9 19:34:14 2021 +0800

    [Transaction] Fix close pulsarClient then close transaction client connection (#12689)
    
    (cherry picked from commit 6162ccf34aa29e9495bf2f0cdc7e88e9e8c1d067)
---
 ...Test.java => TransactionClientConnectTest.java} | 29 +++++++++++++++++++++-
 .../broker/transaction/TransactionTestBase.java    |  2 +-
 .../pulsar/client/impl/ConnectionHandler.java      | 16 +++++++-----
 .../apache/pulsar/client/impl/HandlerState.java    |  7 ++++++
 .../pulsar/client/impl/PulsarClientImpl.java       | 16 ++++++------
 .../client/impl/TransactionMetaStoreHandler.java   |  7 ++++++
 6 files changed, 61 insertions(+), 16 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionClientReconnectTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionClientConnectTest.java
similarity index 89%
rename from pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionClientReconnectTest.java
rename to pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionClientConnectTest.java
index 1f5ab15..42eadfe 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionClientReconnectTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionClientConnectTest.java
@@ -26,6 +26,7 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.TransactionMetaStoreHandler;
 import org.apache.pulsar.client.impl.transaction.TransactionCoordinatorClientImpl;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
@@ -39,6 +40,7 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import java.lang.reflect.Field;
+import java.lang.reflect.Method;
 import java.util.Collections;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -47,7 +49,7 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 import static org.testng.FileAssert.fail;
 
-public class TransactionClientReconnectTest extends TransactionTestBase {
+public class TransactionClientConnectTest extends TransactionTestBase {
 
     private static final String RECONNECT_TOPIC = "persistent://public/txn/txn-client-reconnect-test";
     private static final int NUM_PARTITIONS = 1;
@@ -223,6 +225,31 @@ public class TransactionClientReconnectTest extends TransactionTestBase {
         reconnect();
     }
 
+    @Test
+    public void testPulsarClientCloseThenCloseTcClient() throws Exception {
+        TransactionCoordinatorClientImpl transactionCoordinatorClient = ((PulsarClientImpl) pulsarClient).getTcClient();
+        Field field = TransactionCoordinatorClientImpl.class.getDeclaredField("handlers");
+        field.setAccessible(true);
+        TransactionMetaStoreHandler[] handlers =
+                (TransactionMetaStoreHandler[]) field.get(transactionCoordinatorClient);
+
+        for (TransactionMetaStoreHandler handler : handlers) {
+            handler.newTransactionAsync(10, TimeUnit.SECONDS).get();
+        }
+        pulsarClient.close();
+        for (TransactionMetaStoreHandler handler : handlers) {
+            Method method = TransactionMetaStoreHandler.class.getMethod("getConnectHandleState");
+            method.setAccessible(true);
+            assertEquals(method.invoke(handler).toString(), "Closed");
+            try {
+                handler.newTransactionAsync(10, TimeUnit.SECONDS).get();
+            } catch (ExecutionException | InterruptedException e) {
+                assertTrue(e.getCause()
+                        instanceof TransactionCoordinatorClientException.MetaStoreHandlerNotReadyException);
+            }
+        }
+    }
+
     public void start() throws Exception {
         // wait transaction coordinator init success
         Awaitility.await().until(() -> {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
index 622421b..1dba73a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
@@ -246,7 +246,7 @@ public abstract class TransactionTestBase extends TestRetrySupport {
                 admin = null;
             }
             if (pulsarClient != null) {
-                pulsarClient.shutdown();
+                pulsarClient.close();
                 pulsarClient = null;
             }
             if (pulsarServiceList.size() > 0) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
index 8fb7ab4..9babd2a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
@@ -103,12 +103,16 @@ public class ConnectionHandler {
         long delayMs = backoff.next();
         log.warn("[{}] [{}] Could not get connection to broker: {} -- Will try again in {} s", state.topic, state.getHandlerName(),
                 exception.getMessage(), delayMs / 1000.0);
-        state.setState(State.Connecting);
-        state.client.timer().newTimeout(timeout -> {
-            log.info("[{}] [{}] Reconnecting after connection was closed", state.topic, state.getHandlerName());
-            incrementEpoch();
-            grabCnx();
-        }, delayMs, TimeUnit.MILLISECONDS);
+        if (state.changeToConnecting()) {
+            state.client.timer().newTimeout(timeout -> {
+                log.info("[{}] [{}] Reconnecting after connection was closed", state.topic, state.getHandlerName());
+                incrementEpoch();
+                grabCnx();
+            }, delayMs, TimeUnit.MILLISECONDS);
+        } else {
+            log.info("[{}] [{}] Ignoring reconnection request (state: {})",
+                    state.topic, state.getHandlerName(), state.getState());
+        }
     }
 
     protected long incrementEpoch() {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerState.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerState.java
index e72c97f..582df8c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerState.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerState.java
@@ -64,6 +64,13 @@ abstract class HandlerState {
         return STATE_UPDATER.get(this);
     }
 
+    protected boolean changeToConnecting() {
+        return (STATE_UPDATER.compareAndSet(this, State.Uninitialized, State.Connecting)
+                || STATE_UPDATER.compareAndSet(this, State.Ready, State.Connecting)
+                || STATE_UPDATER.compareAndSet(this, State.RegisteringSchema, State.Connecting)
+                || STATE_UPDATER.compareAndSet(this, State.Connecting, State.Connecting));
+    }
+
     protected void setState(State s) {
         STATE_UPDATER.set(this, s);
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 1234b8b..7703afc 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -712,6 +712,14 @@ public class PulsarClientImpl implements PulsarClient {
                     throwable = t;
                 }
             }
+            if (tcClient != null) {
+                try {
+                    tcClient.close();
+                } catch (Throwable t) {
+                    log.warn("Failed to close tcClient");
+                    throwable = t;
+                }
+            }
             try {
                 // Shutting down eventLoopGroup separately because in some cases, cnxPool might be using different
                 // eventLoopGroup.
@@ -747,14 +755,6 @@ public class PulsarClientImpl implements PulsarClient {
                     throwable = t;
                 }
             }
-            if (tcClient != null) {
-                try {
-                    tcClient.close();
-                } catch (Throwable t) {
-                    log.warn("Failed to close tcClient");
-                    throwable = t;
-                }
-            }
             if (throwable != null) {
                 throw throwable;
             }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
index f96cf57..ba6ee50 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.client.impl;
 
+import com.google.common.annotations.VisibleForTesting;
 import io.netty.buffer.ByteBuf;
 import io.netty.util.Recycler;
 import io.netty.util.ReferenceCountUtil;
@@ -540,6 +541,12 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect
     @Override
     public void close() throws IOException {
         this.requestTimeout.cancel();
+        this.setState(State.Closed);
+    }
+
+    @VisibleForTesting
+    public State getConnectHandleState() {
+        return getState();
     }
 
     @Override