You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/11/06 00:42:23 UTC
[pulsar] branch master updated: Add PulsarClient#isClosed method
(#8428)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d7bac05 Add PulsarClient#isClosed method (#8428)
d7bac05 is described below
commit d7bac0571c70876d7939ee1d2efaeaa3a7581517
Author: Enrico Olivelli <eo...@gmail.com>
AuthorDate: Fri Nov 6 01:42:00 2020 +0100
Add PulsarClient#isClosed method (#8428)
Currently there is no way to know if the Pulsar client has been already closed or not, resulting in AlreadyClosedException errors.
---
.../org/apache/pulsar/client/api/PulsarClient.java | 9 ++++
.../pulsar/client/impl/PulsarClientImpl.java | 30 ++++++++++----
.../pulsar/client/impl/PulsarClientImplTest.java | 48 ++++++++++++++++++++++
3 files changed, 78 insertions(+), 9 deletions(-)
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClient.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
index fdbf41e..fa2254c 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
@@ -282,4 +282,13 @@ public interface PulsarClient extends Closeable {
* if the forceful shutdown fails
*/
void shutdown() throws PulsarClientException;
+
+ /**
+ * Return internal state of the client. Useful if you want to check that current client is valid.
+ * @return true is the client has been closed
+ * @see #shutdown()
+ * @see #close()
+ * @see #closeAsync()
+ */
+ boolean isClosed();
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 64f1ef9..3c0b105 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -41,10 +41,8 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -295,7 +293,7 @@ public class PulsarClientImpl implements PulsarClient {
} else {
producer = new ProducerImpl<>(PulsarClientImpl.this, topic, conf, producerCreatedFuture, -1, schema, interceptors);
}
-
+
producers.add(producer);
}).exceptionally(ex -> {
log.warn("[{}] Failed to get partitioned topic metadata: {}", topic, ex.getMessage());
@@ -385,7 +383,7 @@ public class PulsarClientImpl implements PulsarClient {
consumerSubscribedFuture,null, schema, interceptors,
true /* createTopicIfDoesNotExist */);
}
-
+
consumers.add(consumer);
}).exceptionally(ex -> {
log.warn("[{}] Failed to get partitioned topic metadata", topic, ex);
@@ -402,7 +400,7 @@ public class PulsarClientImpl implements PulsarClient {
ConsumerBase<T> consumer = new MultiTopicsConsumerImpl<>(PulsarClientImpl.this, conf,
externalExecutorProvider.getExecutor(), consumerSubscribedFuture, schema, interceptors,
true /* createTopicIfDoesNotExist */);
-
+
consumers.add(consumer);
return consumerSubscribedFuture;
@@ -436,7 +434,7 @@ public class PulsarClientImpl implements PulsarClient {
externalExecutorProvider.getExecutor(),
consumerSubscribedFuture,
schema, subscriptionMode, interceptors);
-
+
consumers.add(consumer);
})
.exceptionally(ex -> {
@@ -508,7 +506,7 @@ public class PulsarClientImpl implements PulsarClient {
// gets the next single threaded executor from the list of executors
ExecutorService listenerThread = externalExecutorProvider.getExecutor();
ReaderImpl<T> reader = new ReaderImpl<>(PulsarClientImpl.this, conf, listenerThread, consumerSubscribedFuture, schema);
-
+
consumers.add(reader.getConsumer());
consumerSubscribedFuture.thenRun(() -> {
@@ -548,8 +546,16 @@ public class PulsarClientImpl implements PulsarClient {
public void close() throws PulsarClientException {
try {
closeAsync().get();
- } catch (Exception e) {
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
throw PulsarClientException.unwrap(e);
+ } catch (ExecutionException e) {
+ PulsarClientException unwrapped = PulsarClientException.unwrap(e);
+ if (unwrapped instanceof PulsarClientException.AlreadyClosedException) {
+ // this is not a problem
+ return;
+ }
+ throw unwrapped;
}
}
@@ -562,7 +568,7 @@ public class PulsarClientImpl implements PulsarClient {
final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
List<CompletableFuture<Void>> futures = Lists.newArrayList();
-
+
producers.forEach(p -> futures.add(p.closeAsync()));
consumers.forEach(c -> futures.add(c.closeAsync()));
@@ -603,6 +609,12 @@ public class PulsarClientImpl implements PulsarClient {
}
@Override
+ public boolean isClosed() {
+ State currentState = state.get();
+ return currentState == State.Closed || currentState == State.Closing;
+ }
+
+ @Override
public synchronized void updateServiceUrl(String serviceUrl) throws PulsarClientException {
log.info("Updating service URL to {}", serviceUrl);
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
new file mode 100644
index 0000000..2d114d0
--- /dev/null
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.util.concurrent.ThreadFactory;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.util.netty.EventLoopUtil;
+import org.testng.annotations.Test;
+
+/**
+ * PulsarClientImpl unit tests.
+ */
+public class PulsarClientImplTest {
+
+ @Test
+ public void testIsClosed() throws Exception {
+ ClientConfigurationData conf = new ClientConfigurationData();
+ conf.setServiceUrl("pulsar://localhost:6650");
+ ThreadFactory threadFactory = new DefaultThreadFactory("client-test-stats", Thread.currentThread().isDaemon());
+ EventLoopGroup eventLoopGroup = EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), threadFactory);
+ PulsarClientImpl clientImpl = new PulsarClientImpl(conf, eventLoopGroup);
+ assertFalse(clientImpl.isClosed());
+ clientImpl.close();
+ assertTrue(clientImpl.isClosed());
+ eventLoopGroup.shutdownGracefully().get();
+ }
+
+}