You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/02/12 20:57:17 UTC

[pulsar] branch master updated: [client] Cleanup consumer on multitopic subscribe failure (#9419)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new cbe9816  [client] Cleanup consumer on multitopic subscribe failure (#9419)
cbe9816 is described below

commit cbe9816423a23686bdd2091192f5aa5f8bda152d
Author: Addison Higham <ad...@gmail.com>
AuthorDate: Fri Feb 12 13:56:32 2021 -0700

    [client] Cleanup consumer on multitopic subscribe failure (#9419)
    
    Currently, when a multi-topic subscribe fails (via a set of topics or a
    regex) we can leave consumers connected, as the multitopic consumer
    doesn't close any of the topics.
    
    This means we rely on the client to call closeAsync, otherwise, the
    consumer is left in partially open state.
    
    This fix changes that, and ensures we call close in the case of an
    exception
    
    Co-authored-by: Sijie Guo <si...@apache.org>
---
 .../client/impl/MultiTopicsConsumerImpl.java       |  9 ++++--
 .../pulsar/client/impl/ClientTestFixtures.java     |  6 ++++
 .../client/impl/MultiTopicsConsumerImplTest.java   | 35 ++++++++++++++++++++++
 3 files changed, 48 insertions(+), 2 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 6cf6d3d..2c81d0a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -190,8 +190,13 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
                 subscribeFuture().complete(MultiTopicsConsumerImpl.this);
             })
             .exceptionally(ex -> {
-                log.warn("[{}] Failed to subscribe topics: {}", topic, ex.getMessage());
-                subscribeFuture.completeExceptionally(ex);
+                log.warn("[{}] Failed to subscribe topics: {}, closing consumer", topic, ex.getMessage());
+                closeAsync().whenComplete((res, closeEx) -> {
+                    if (closeEx != null) {
+                        log.error("[{}] Failed to unsubscribe after failed consumer creation: {}", topic, closeEx.getMessage());
+                    }
+                    subscribeFuture.completeExceptionally(ex);
+                });
                 return null;
             });
     }
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
index a1653c1..085a4a9 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
@@ -72,6 +72,12 @@ class ClientTestFixtures {
         return future;
     }
 
+    static <T> CompletableFuture<T> createExceptionFuture(Throwable ex, int delayMillis) {
+        CompletableFuture<T> future = new CompletableFuture<>();
+        SCHEDULER.schedule(() -> future.completeExceptionally(ex), delayMillis, TimeUnit.MILLISECONDS);
+        return future;
+    }
+
     public static ExecutorService createMockedExecutor() {
         return mock(ExecutorService.class);
     }
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
index 767cb65..664e78a 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl;
 import com.google.common.collect.Sets;
 import io.netty.channel.EventLoopGroup;
 import io.netty.util.concurrent.DefaultThreadFactory;
+import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Messages;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -31,13 +32,19 @@ import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
 import org.testng.annotations.Test;
 
+import java.util.Arrays;
+import java.util.HashSet;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.regex.Pattern;
 
 import static org.apache.pulsar.client.impl.ClientTestFixtures.createDelayedCompletedFuture;
+import static org.apache.pulsar.client.impl.ClientTestFixtures.createExceptionFuture;
 import static org.apache.pulsar.client.impl.ClientTestFixtures.createPulsarClientMockWithMockedClientCnx;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.*;
@@ -137,4 +144,32 @@ public class MultiTopicsConsumerImplTest {
         assertFalse(consumer.hasPendingBatchReceive());
     }
 
+    @Test
+    public void testConsumerCleanupOnSubscribeFailure() throws InterruptedException, TimeoutException, ExecutionException {
+        ExecutorService listenerExecutor = mock(ExecutorService.class);
+        ConsumerConfigurationData<byte[]> consumerConfData = new ConsumerConfigurationData<>();
+        consumerConfData.setSubscriptionName("subscriptionName");
+        consumerConfData.setTopicNames(new HashSet<>(Arrays.asList("a", "b", "c")));
+        int completionDelayMillis = 10;
+        Schema<byte[]> schema = Schema.BYTES;
+        PulsarClientImpl clientMock = createPulsarClientMockWithMockedClientCnx();
+        when(clientMock.getPartitionedTopicMetadata(any())).thenAnswer(invocation -> createExceptionFuture(
+                new PulsarClientException.InvalidConfigurationException("a mock exception"), completionDelayMillis));
+        CompletableFuture<Consumer<byte[]>> completeFuture = new CompletableFuture<>();
+        MultiTopicsConsumerImpl<byte[]> impl = new MultiTopicsConsumerImpl<byte[]>(clientMock, consumerConfData, listenerExecutor,
+                completeFuture, schema, null, true);
+        // assert that we don't start in closed, then we move to closed and get an exception
+        // indicating that closeAsync was called
+        assertEquals(impl.getState(), HandlerState.State.Uninitialized);
+        try {
+            completeFuture.get(15, TimeUnit.MILLISECONDS);
+        } catch (Throwable ex) {
+            // just ignore the exception
+        }
+        assertTrue(completeFuture.isCompletedExceptionally());
+        assertEquals(impl.getConsumers().size(), 0);
+        assertEquals(impl.getState(), HandlerState.State.Closed);
+        verify(clientMock, times(1)).cleanupConsumer(any());
+    }
+
 }