You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/03/13 06:10:51 UTC
[pulsar] 10/17: [Flink-Connector]Get PulsarClient from cache should
always return an open instance (#6436)
This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 462453bd7eb297c4117e98de65f55b90c63428c8
Author: Yijie Shen <he...@gmail.com>
AuthorDate: Sun Mar 1 17:28:56 2020 +0800
[Flink-Connector]Get PulsarClient from cache should always return an open instance (#6436)
(cherry picked from commit 2ed2eb86e50d4515bee570c339b2719614a86ecc)
---
.../pulsar/client/impl/PulsarClientImpl.java | 6 +++++-
.../connectors/pulsar/CachedPulsarClient.java | 8 ++++++-
.../connectors/pulsar/CachedPulsarClientTest.java | 25 ++++++++++++++++++++++
3 files changed, 37 insertions(+), 2 deletions(-)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index f51fb6b..db66c90 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -94,7 +94,7 @@ public class PulsarClientImpl implements PulsarClient {
private final Timer timer;
private final ExecutorProvider externalExecutorProvider;
- enum State {
+ public enum State {
Open, Closing, Closed
}
@@ -167,6 +167,10 @@ public class PulsarClientImpl implements PulsarClient {
return clientClock;
}
+ public AtomicReference<State> getState() {
+ return state;
+ }
+
@Override
public ProducerBuilder<byte[]> newProducer() {
return new ProducerBuilderImpl<>(this, Schema.BYTES);
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/CachedPulsarClient.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/CachedPulsarClient.java
index 613d4cc..facfbef 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/CachedPulsarClient.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/CachedPulsarClient.java
@@ -79,7 +79,13 @@ public class CachedPulsarClient {
}
public static PulsarClientImpl getOrCreate(ClientConfigurationData config) throws ExecutionException {
- return guavaCache.get(config);
+ PulsarClientImpl instance = guavaCache.get(config);
+ if (instance.getState().get() == PulsarClientImpl.State.Open) {
+ return instance;
+ } else {
+ guavaCache.invalidate(config);
+ return guavaCache.get(config);
+ }
}
private static void close(ClientConfigurationData clientConfig, PulsarClientImpl client) {
diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/CachedPulsarClientTest.java b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/CachedPulsarClientTest.java
index a41609f..39cdca1 100644
--- a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/CachedPulsarClientTest.java
+++ b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/CachedPulsarClientTest.java
@@ -100,4 +100,29 @@ public class CachedPulsarClientTest {
assertEquals(map2.values().iterator().next(), client1);
}
+
+ @Test
+ public void getClientFromCacheShouldAlwaysReturnAnOpenedInstance() throws Exception {
+ PulsarClientImpl impl1 = Mockito.mock(PulsarClientImpl.class);
+
+ ClientConfigurationData conf1 = new ClientConfigurationData();
+ conf1.setServiceUrl(SERVICE_URL);
+
+ PowerMockito.whenNew(PulsarClientImpl.class)
+ .withArguments(conf1).thenReturn(impl1);
+
+ PulsarClientImpl client1 = CachedPulsarClient.getOrCreate(conf1);
+
+ ConcurrentMap<ClientConfigurationData, PulsarClientImpl> map1 = CachedPulsarClient.getAsMap();
+ assertEquals(map1.size(), 1);
+
+ client1.getState().set(PulsarClientImpl.State.Closed);
+
+ PulsarClientImpl client2 = CachedPulsarClient.getOrCreate(conf1);
+
+ assertNotEquals(client1, client2);
+
+ ConcurrentMap<ClientConfigurationData, PulsarClientImpl> map2 = CachedPulsarClient.getAsMap();
+ assertEquals(map2.size(), 1);
+ }
}