You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/11/09 13:39:51 UTC
[pulsar] 02/03: [Transaction] Fix close pulsarClient then close
transaction client connection (#12689)
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit fff54d5593471bf38de915d9f72bae5e988226a7
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Tue Nov 9 19:34:14 2021 +0800
[Transaction] Fix close pulsarClient then close transaction client connection (#12689)
(cherry picked from commit 6162ccf34aa29e9495bf2f0cdc7e88e9e8c1d067)
---
...Test.java => TransactionClientConnectTest.java} | 29 +++++++++++++++++++++-
.../broker/transaction/TransactionTestBase.java | 2 +-
.../pulsar/client/impl/ConnectionHandler.java | 16 +++++++-----
.../apache/pulsar/client/impl/HandlerState.java | 7 ++++++
.../pulsar/client/impl/PulsarClientImpl.java | 16 ++++++------
.../client/impl/TransactionMetaStoreHandler.java | 7 ++++++
6 files changed, 61 insertions(+), 16 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionClientReconnectTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionClientConnectTest.java
similarity index 89%
rename from pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionClientReconnectTest.java
rename to pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionClientConnectTest.java
index 1f5ab15..42eadfe 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionClientReconnectTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionClientConnectTest.java
@@ -26,6 +26,7 @@ import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.TransactionMetaStoreHandler;
import org.apache.pulsar.client.impl.transaction.TransactionCoordinatorClientImpl;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
@@ -39,6 +40,7 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.lang.reflect.Field;
+import java.lang.reflect.Method;
import java.util.Collections;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@@ -47,7 +49,7 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.FileAssert.fail;
-public class TransactionClientReconnectTest extends TransactionTestBase {
+public class TransactionClientConnectTest extends TransactionTestBase {
private static final String RECONNECT_TOPIC = "persistent://public/txn/txn-client-reconnect-test";
private static final int NUM_PARTITIONS = 1;
@@ -223,6 +225,31 @@ public class TransactionClientReconnectTest extends TransactionTestBase {
reconnect();
}
+ @Test
+ public void testPulsarClientCloseThenCloseTcClient() throws Exception {
+ TransactionCoordinatorClientImpl transactionCoordinatorClient = ((PulsarClientImpl) pulsarClient).getTcClient();
+ Field field = TransactionCoordinatorClientImpl.class.getDeclaredField("handlers");
+ field.setAccessible(true);
+ TransactionMetaStoreHandler[] handlers =
+ (TransactionMetaStoreHandler[]) field.get(transactionCoordinatorClient);
+
+ for (TransactionMetaStoreHandler handler : handlers) {
+ handler.newTransactionAsync(10, TimeUnit.SECONDS).get();
+ }
+ pulsarClient.close();
+ for (TransactionMetaStoreHandler handler : handlers) {
+ Method method = TransactionMetaStoreHandler.class.getMethod("getConnectHandleState");
+ method.setAccessible(true);
+ assertEquals(method.invoke(handler).toString(), "Closed");
+ try {
+ handler.newTransactionAsync(10, TimeUnit.SECONDS).get();
+ } catch (ExecutionException | InterruptedException e) {
+ assertTrue(e.getCause()
+ instanceof TransactionCoordinatorClientException.MetaStoreHandlerNotReadyException);
+ }
+ }
+ }
+
public void start() throws Exception {
// wait transaction coordinator init success
Awaitility.await().until(() -> {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
index 622421b..1dba73a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
@@ -246,7 +246,7 @@ public abstract class TransactionTestBase extends TestRetrySupport {
admin = null;
}
if (pulsarClient != null) {
- pulsarClient.shutdown();
+ pulsarClient.close();
pulsarClient = null;
}
if (pulsarServiceList.size() > 0) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
index 8fb7ab4..9babd2a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
@@ -103,12 +103,16 @@ public class ConnectionHandler {
long delayMs = backoff.next();
log.warn("[{}] [{}] Could not get connection to broker: {} -- Will try again in {} s", state.topic, state.getHandlerName(),
exception.getMessage(), delayMs / 1000.0);
- state.setState(State.Connecting);
- state.client.timer().newTimeout(timeout -> {
- log.info("[{}] [{}] Reconnecting after connection was closed", state.topic, state.getHandlerName());
- incrementEpoch();
- grabCnx();
- }, delayMs, TimeUnit.MILLISECONDS);
+ if (state.changeToConnecting()) {
+ state.client.timer().newTimeout(timeout -> {
+ log.info("[{}] [{}] Reconnecting after connection was closed", state.topic, state.getHandlerName());
+ incrementEpoch();
+ grabCnx();
+ }, delayMs, TimeUnit.MILLISECONDS);
+ } else {
+ log.info("[{}] [{}] Ignoring reconnection request (state: {})",
+ state.topic, state.getHandlerName(), state.getState());
+ }
}
protected long incrementEpoch() {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerState.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerState.java
index e72c97f..582df8c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerState.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerState.java
@@ -64,6 +64,13 @@ abstract class HandlerState {
return STATE_UPDATER.get(this);
}
+ protected boolean changeToConnecting() {
+ return (STATE_UPDATER.compareAndSet(this, State.Uninitialized, State.Connecting)
+ || STATE_UPDATER.compareAndSet(this, State.Ready, State.Connecting)
+ || STATE_UPDATER.compareAndSet(this, State.RegisteringSchema, State.Connecting)
+ || STATE_UPDATER.compareAndSet(this, State.Connecting, State.Connecting));
+ }
+
protected void setState(State s) {
STATE_UPDATER.set(this, s);
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 1234b8b..7703afc 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -712,6 +712,14 @@ public class PulsarClientImpl implements PulsarClient {
throwable = t;
}
}
+ if (tcClient != null) {
+ try {
+ tcClient.close();
+ } catch (Throwable t) {
+ log.warn("Failed to close tcClient");
+ throwable = t;
+ }
+ }
try {
// Shutting down eventLoopGroup separately because in some cases, cnxPool might be using different
// eventLoopGroup.
@@ -747,14 +755,6 @@ public class PulsarClientImpl implements PulsarClient {
throwable = t;
}
}
- if (tcClient != null) {
- try {
- tcClient.close();
- } catch (Throwable t) {
- log.warn("Failed to close tcClient");
- throwable = t;
- }
- }
if (throwable != null) {
throw throwable;
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
index f96cf57..ba6ee50 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.impl;
+import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
import io.netty.util.Recycler;
import io.netty.util.ReferenceCountUtil;
@@ -540,6 +541,12 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect
@Override
public void close() throws IOException {
this.requestTimeout.cancel();
+ this.setState(State.Closed);
+ }
+
+ @VisibleForTesting
+ public State getConnectHandleState() {
+ return getState();
}
@Override