You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/11/06 00:42:23 UTC

[pulsar] branch master updated: Add PulsarClient#isClosed method (#8428)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d7bac05  Add PulsarClient#isClosed method (#8428)
d7bac05 is described below

commit d7bac0571c70876d7939ee1d2efaeaa3a7581517
Author: Enrico Olivelli <eo...@gmail.com>
AuthorDate: Fri Nov 6 01:42:00 2020 +0100

    Add PulsarClient#isClosed method (#8428)
    
    Currently there is no way to know if the Pulsar client has been already closed or not, resulting in AlreadyClosedException errors.
---
 .../org/apache/pulsar/client/api/PulsarClient.java |  9 ++++
 .../pulsar/client/impl/PulsarClientImpl.java       | 30 ++++++++++----
 .../pulsar/client/impl/PulsarClientImplTest.java   | 48 ++++++++++++++++++++++
 3 files changed, 78 insertions(+), 9 deletions(-)

diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClient.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
index fdbf41e..fa2254c 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
@@ -282,4 +282,13 @@ public interface PulsarClient extends Closeable {
      *             if the forceful shutdown fails
      */
     void shutdown() throws PulsarClientException;
+
+    /**
+     * Return internal state of the client. Useful if you want to check that current client is valid.
+     * @return true is the client has been closed
+     * @see #shutdown()
+     * @see #close()
+     * @see #closeAsync()
+     */
+    boolean isClosed();
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 64f1ef9..3c0b105 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -41,10 +41,8 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
@@ -295,7 +293,7 @@ public class PulsarClientImpl implements PulsarClient {
             } else {
                 producer = new ProducerImpl<>(PulsarClientImpl.this, topic, conf, producerCreatedFuture, -1, schema, interceptors);
             }
-    
+
             producers.add(producer);
         }).exceptionally(ex -> {
             log.warn("[{}] Failed to get partitioned topic metadata: {}", topic, ex.getMessage());
@@ -385,7 +383,7 @@ public class PulsarClientImpl implements PulsarClient {
                         consumerSubscribedFuture,null, schema, interceptors,
                         true /* createTopicIfDoesNotExist */);
             }
-            
+
             consumers.add(consumer);
         }).exceptionally(ex -> {
             log.warn("[{}] Failed to get partitioned topic metadata", topic, ex);
@@ -402,7 +400,7 @@ public class PulsarClientImpl implements PulsarClient {
         ConsumerBase<T> consumer = new MultiTopicsConsumerImpl<>(PulsarClientImpl.this, conf,
                 externalExecutorProvider.getExecutor(), consumerSubscribedFuture, schema, interceptors,
                 true /* createTopicIfDoesNotExist */);
-        
+
         consumers.add(consumer);
 
         return consumerSubscribedFuture;
@@ -436,7 +434,7 @@ public class PulsarClientImpl implements PulsarClient {
                     externalExecutorProvider.getExecutor(),
                     consumerSubscribedFuture,
                     schema, subscriptionMode, interceptors);
-                
+
                 consumers.add(consumer);
             })
             .exceptionally(ex -> {
@@ -508,7 +506,7 @@ public class PulsarClientImpl implements PulsarClient {
             // gets the next single threaded executor from the list of executors
             ExecutorService listenerThread = externalExecutorProvider.getExecutor();
             ReaderImpl<T> reader = new ReaderImpl<>(PulsarClientImpl.this, conf, listenerThread, consumerSubscribedFuture, schema);
-            
+
             consumers.add(reader.getConsumer());
 
             consumerSubscribedFuture.thenRun(() -> {
@@ -548,8 +546,16 @@ public class PulsarClientImpl implements PulsarClient {
     public void close() throws PulsarClientException {
         try {
             closeAsync().get();
-        } catch (Exception e) {
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
             throw PulsarClientException.unwrap(e);
+        } catch (ExecutionException e) {
+            PulsarClientException unwrapped = PulsarClientException.unwrap(e);
+            if (unwrapped instanceof PulsarClientException.AlreadyClosedException) {
+                // this is not a problem
+                return;
+            }
+            throw unwrapped;
         }
     }
 
@@ -562,7 +568,7 @@ public class PulsarClientImpl implements PulsarClient {
 
         final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
         List<CompletableFuture<Void>> futures = Lists.newArrayList();
-    
+
         producers.forEach(p -> futures.add(p.closeAsync()));
         consumers.forEach(c -> futures.add(c.closeAsync()));
 
@@ -603,6 +609,12 @@ public class PulsarClientImpl implements PulsarClient {
     }
 
     @Override
+    public boolean isClosed() {
+        State currentState = state.get();
+        return currentState == State.Closed || currentState == State.Closing;
+    }
+
+    @Override
     public synchronized void updateServiceUrl(String serviceUrl) throws PulsarClientException {
         log.info("Updating service URL to {}", serviceUrl);
 
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
new file mode 100644
index 0000000..2d114d0
--- /dev/null
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.util.concurrent.ThreadFactory;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.util.netty.EventLoopUtil;
+import org.testng.annotations.Test;
+
+/**
+ * PulsarClientImpl unit tests.
+ */
+public class PulsarClientImplTest {
+
+    @Test
+    public void testIsClosed() throws Exception {
+        ClientConfigurationData conf = new ClientConfigurationData();
+        conf.setServiceUrl("pulsar://localhost:6650");
+        ThreadFactory threadFactory = new DefaultThreadFactory("client-test-stats", Thread.currentThread().isDaemon());
+        EventLoopGroup eventLoopGroup = EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), threadFactory);
+        PulsarClientImpl clientImpl = new PulsarClientImpl(conf, eventLoopGroup);
+        assertFalse(clientImpl.isClosed());
+        clientImpl.close();
+        assertTrue(clientImpl.isClosed());
+        eventLoopGroup.shutdownGracefully().get();
+    }
+
+}