You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/05/08 12:33:58 UTC

[pulsar] 12/38: Ensure that all dangling consumers are cleaned up during failures (#6778)

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 3cfa938bfe06ef12839df7754260f7e0b67e42fa
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Tue Apr 21 17:21:40 2020 -0700

    Ensure that all dangling consumers are cleaned up during failures (#6778)
    
    Co-authored-by: Sanjeev Kulkarni <sa...@splunk.com>(cherry picked from commit 98b818b5fa63ee2e4a67887cf96330ae652dafa2)
---
 .../pulsar/functions/source/PulsarSource.java      |  25 +++--
 .../pulsar/functions/source/PulsarSourceTest.java  | 113 ++++++++++++++++-----
 2 files changed, 103 insertions(+), 35 deletions(-)

diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index 0d23ce1..fa7146d 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -47,7 +47,7 @@ public class PulsarSource<T> extends PushSource<T> implements MessageListener<T>
     private final Map<String, String> properties;
     private final ClassLoader functionClassLoader;
     private List<String> inputTopics;
-    private List<Consumer<T>> inputConsumers = Collections.emptyList();
+    private List<Consumer<T>> inputConsumers = new LinkedList<>();
     private final TopicSchema topicSchema;
 
     public PulsarSource(PulsarClient pulsarClient, PulsarSourceConfig pulsarConfig, Map<String, String> properties,
@@ -65,7 +65,7 @@ public class PulsarSource<T> extends PushSource<T> implements MessageListener<T>
         log.info("Opening pulsar source with config: {}", pulsarSourceConfig);
         Map<String, ConsumerConfig<T>> configs = setupConsumerConfigs();
 
-        inputConsumers = configs.entrySet().stream().map(e -> {
+        for (Map.Entry<String, ConsumerConfig<T>> e : configs.entrySet()) {
             String topic = e.getKey();
             ConsumerConfig<T> conf = e.getValue();
             log.info("Creating consumers for topic : {}, schema : {}, schemaInfo: {}",
@@ -80,17 +80,17 @@ public class PulsarSource<T> extends PushSource<T> implements MessageListener<T>
                     .messageListener(this);
 
             if (conf.isRegexPattern) {
-                cb.topicsPattern(topic);
+                cb = cb.topicsPattern(topic);
             } else {
-                cb.topic(topic);
+                cb = cb.topics(Collections.singletonList(topic));
             }
             if (conf.getReceiverQueueSize() != null) {
-                cb.receiverQueueSize(conf.getReceiverQueueSize());
+                cb = cb.receiverQueueSize(conf.getReceiverQueueSize());
             }
-            cb.properties(properties);
+            cb = cb.properties(properties);
 
             if (pulsarSourceConfig.getTimeoutMs() != null) {
-                cb.ackTimeout(pulsarSourceConfig.getTimeoutMs(), TimeUnit.MILLISECONDS);
+                cb = cb.ackTimeout(pulsarSourceConfig.getTimeoutMs(), TimeUnit.MILLISECONDS);
             }
 
             if (pulsarSourceConfig.getMaxMessageRetries() != null && pulsarSourceConfig.getMaxMessageRetries() >= 0) {
@@ -99,11 +99,12 @@ public class PulsarSource<T> extends PushSource<T> implements MessageListener<T>
                 if (pulsarSourceConfig.getDeadLetterTopic() != null && !pulsarSourceConfig.getDeadLetterTopic().isEmpty()) {
                     deadLetterPolicyBuilder.deadLetterTopic(pulsarSourceConfig.getDeadLetterTopic());
                 }
-                cb.deadLetterPolicy(deadLetterPolicyBuilder.build());
+                cb = cb.deadLetterPolicy(deadLetterPolicyBuilder.build());
             }
 
-            return cb.subscribeAsync();
-        }).collect(Collectors.toList()).stream().map(CompletableFuture::join).collect(Collectors.toList());
+            Consumer<T> consumer = cb.subscribeAsync().join();
+            inputConsumers.add(consumer);
+        }
 
         inputTopics = inputConsumers.stream().flatMap(c -> {
             return (c instanceof MultiTopicsConsumerImpl) ? ((MultiTopicsConsumerImpl<?>) c).getTopics().stream()
@@ -176,6 +177,10 @@ public class PulsarSource<T> extends PushSource<T> implements MessageListener<T>
         return inputTopics;
     }
 
+    public List<Consumer<T>> getInputConsumers() {
+        return inputConsumers;
+    }
+
     @Data
     @Builder
     private static class ConsumerConfig<T> {
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
index d6e03d3..c2e556c 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
@@ -20,19 +20,19 @@ package org.apache.pulsar.functions.source;
 
 
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyList;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.*;
 import static org.testng.AssertJUnit.assertEquals;
 import static org.testng.AssertJUnit.assertTrue;
 import static org.testng.AssertJUnit.fail;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 
 import lombok.Cleanup;
 import lombok.Getter;
@@ -49,6 +49,7 @@ import org.apache.pulsar.common.functions.ConsumerConfig;
 import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.functions.api.SerDe;
 import org.apache.pulsar.io.core.SourceContext;
+import org.mockito.ArgumentMatcher;
 import org.testng.annotations.Test;
 
 @Slf4j
@@ -56,11 +57,21 @@ public class PulsarSourceTest {
 
     private static Map<String, ConsumerConfig> consumerConfigs = new HashMap<>();
     static {
-        consumerConfigs.put("persistent://sample/standalone/ns1/test_result", ConsumerConfig.builder()
+        consumerConfigs.put("persistent://sample/ns1/test_result", ConsumerConfig.builder()
                 .serdeClassName(TopicSchema.DEFAULT_SERDE).isRegexPattern(false).build());
     }
 
-    public static class TestSerDe implements SerDe<String> {
+    private static Map<String, ConsumerConfig> multipleConsumerConfigs = new HashMap<>();
+    static {
+        multipleConsumerConfigs.put("persistent://sample/ns1/test_result1", ConsumerConfig.builder()
+                .serdeClassName(TopicSchema.DEFAULT_SERDE).isRegexPattern(false).build());
+        multipleConsumerConfigs.put("persistent://sample/ns1/test_result2", ConsumerConfig.builder()
+                .serdeClassName(TopicSchema.DEFAULT_SERDE).isRegexPattern(false).build());
+        multipleConsumerConfigs.put("persistent://sample/ns1/test_result3", ConsumerConfig.builder()
+                .serdeClassName(TopicSchema.DEFAULT_SERDE).isRegexPattern(false).build());
+    }
+
+        public static class TestSerDe implements SerDe<String> {
 
         @Override
         public String deserialize(byte[] input) {
@@ -78,26 +89,61 @@ public class PulsarSourceTest {
      */
     private static PulsarClientImpl getPulsarClient() throws PulsarClientException {
         PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class);
-        ConsumerBuilder<?> consumerBuilder = mock(ConsumerBuilder.class);
-        doReturn(consumerBuilder).when(consumerBuilder).topics(anyList());
-        doReturn(consumerBuilder).when(consumerBuilder).cryptoFailureAction(any());
-        doReturn(consumerBuilder).when(consumerBuilder).subscriptionName(any());
-        doReturn(consumerBuilder).when(consumerBuilder).subscriptionInitialPosition(any());
-        doReturn(consumerBuilder).when(consumerBuilder).subscriptionType(any());
-        doReturn(consumerBuilder).when(consumerBuilder).ackTimeout(anyLong(), any());
-        doReturn(consumerBuilder).when(consumerBuilder).messageListener(any());
+        ConsumerBuilder<?> goodConsumerBuilder = mock(ConsumerBuilder.class);
+        ConsumerBuilder<?> badConsumerBuilder = mock(ConsumerBuilder.class);
+        doReturn(goodConsumerBuilder).when(goodConsumerBuilder).topics(argThat(new TopicMatcher("persistent://sample/ns1/test_result")));
+        doReturn(goodConsumerBuilder).when(goodConsumerBuilder).topics(argThat(new TopicMatcher("persistent://sample/ns1/test_result1")));
+        doReturn(badConsumerBuilder).when(goodConsumerBuilder).topics(argThat(new TopicMatcher("persistent://sample/ns1/test_result2")));
+        doReturn(goodConsumerBuilder).when(goodConsumerBuilder).topics(argThat(new TopicMatcher("persistent://sample/ns1/test_result3")));
+        doReturn(goodConsumerBuilder).when(goodConsumerBuilder).cryptoFailureAction(any());
+        doReturn(goodConsumerBuilder).when(goodConsumerBuilder).subscriptionName(any());
+        doReturn(goodConsumerBuilder).when(goodConsumerBuilder).subscriptionInitialPosition(any());
+        doReturn(goodConsumerBuilder).when(goodConsumerBuilder).subscriptionType(any());
+        doReturn(goodConsumerBuilder).when(goodConsumerBuilder).ackTimeout(anyLong(), any());
+        doReturn(goodConsumerBuilder).when(goodConsumerBuilder).messageListener(any());
+        doReturn(goodConsumerBuilder).when(goodConsumerBuilder).properties(any());
+        doReturn(badConsumerBuilder).when(badConsumerBuilder).cryptoFailureAction(any());
+        doReturn(badConsumerBuilder).when(badConsumerBuilder).subscriptionName(any());
+        doReturn(badConsumerBuilder).when(badConsumerBuilder).subscriptionInitialPosition(any());
+        doReturn(badConsumerBuilder).when(badConsumerBuilder).subscriptionType(any());
+        doReturn(badConsumerBuilder).when(badConsumerBuilder).ackTimeout(anyLong(), any());
+        doReturn(badConsumerBuilder).when(badConsumerBuilder).messageListener(any());
+        doReturn(badConsumerBuilder).when(badConsumerBuilder).properties(any());
+
         Consumer<?> consumer = mock(Consumer.class);
-        doReturn(consumer).when(consumerBuilder).subscribe();
-        doReturn(consumerBuilder).when(pulsarClient).newConsumer(any());
-        doReturn(CompletableFuture.completedFuture(consumer)).when(consumerBuilder).subscribeAsync();
+        doReturn(consumer).when(goodConsumerBuilder).subscribe();
+        doReturn(goodConsumerBuilder).when(pulsarClient).newConsumer(any());
+        doReturn(CompletableFuture.completedFuture(consumer)).when(goodConsumerBuilder).subscribeAsync();
+        CompletableFuture<Consumer<?>> badFuture = new CompletableFuture<>();
+        badFuture.completeExceptionally(new PulsarClientException("Some Error"));
+        doReturn(badFuture).when(badConsumerBuilder).subscribeAsync();
+        doThrow(PulsarClientException.class).when(badConsumerBuilder).subscribe();
         doReturn(CompletableFuture.completedFuture(Optional.empty())).when(pulsarClient).getSchema(anyString());
         return pulsarClient;
     }
 
-    private static PulsarSourceConfig getPulsarConfigs() {
+    private static class TopicMatcher implements ArgumentMatcher<List<String>> {
+        private final String topic;
+
+        public TopicMatcher(String topic) {
+            this.topic = topic;
+        }
+
+        @Override
+        public boolean matches(List<String> arg) {
+            return arg.contains(topic);
+        }
+    }
+
+
+    private static PulsarSourceConfig getPulsarConfigs(boolean multiple) {
         PulsarSourceConfig pulsarConfig = new PulsarSourceConfig();
         pulsarConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
-        pulsarConfig.setTopicSchema(consumerConfigs);
+        if (multiple) {
+            pulsarConfig.setTopicSchema(multipleConsumerConfigs);
+        } else {
+            pulsarConfig.setTopicSchema(consumerConfigs);
+        }
         pulsarConfig.setTypeClassName(String.class.getName());
         pulsarConfig.setSubscriptionPosition(SubscriptionInitialPosition.Latest);
         pulsarConfig.setSubscriptionType(SubscriptionType.Shared);
@@ -126,7 +172,7 @@ public class PulsarSourceTest {
 
     @Test
     public void testVoidInputClasses() throws Exception {
-        PulsarSourceConfig pulsarConfig = getPulsarConfigs();
+        PulsarSourceConfig pulsarConfig = getPulsarConfigs(false);
         // set type to void
         pulsarConfig.setTypeClassName(Void.class.getName());
 
@@ -150,11 +196,11 @@ public class PulsarSourceTest {
      */
     @Test
     public void testInconsistentInputType() throws Exception {
-        PulsarSourceConfig pulsarConfig = getPulsarConfigs();
+        PulsarSourceConfig pulsarConfig = getPulsarConfigs(false);
         // set type to be inconsistent to that of SerDe
         pulsarConfig.setTypeClassName(Integer.class.getName());
         Map<String, ConsumerConfig> topicSerdeClassNameMap = new HashMap<>();
-        topicSerdeClassNameMap.put("persistent://sample/standalone/ns1/test_result",
+        topicSerdeClassNameMap.put("persistent://sample/ns1/test_result",
                 ConsumerConfig.builder().serdeClassName(TestSerDe.class.getName()).build());
         pulsarConfig.setTopicSchema(topicSerdeClassNameMap);
 
@@ -178,10 +224,10 @@ public class PulsarSourceTest {
     @Test
     public void testDefaultSerDe() throws Exception {
 
-        PulsarSourceConfig pulsarConfig = getPulsarConfigs();
+        PulsarSourceConfig pulsarConfig = getPulsarConfigs(false);
         // set type to void
         pulsarConfig.setTypeClassName(String.class.getName());
-        consumerConfigs.put("persistent://sample/standalone/ns1/test_result",
+        consumerConfigs.put("persistent://sample/ns1/test_result",
                 ConsumerConfig.builder().serdeClassName(TopicSchema.DEFAULT_SERDE).build());
         pulsarConfig.setTopicSchema(consumerConfigs);
 
@@ -193,10 +239,10 @@ public class PulsarSourceTest {
 
     @Test
     public void testComplexOuputType() throws Exception {
-        PulsarSourceConfig pulsarConfig = getPulsarConfigs();
+        PulsarSourceConfig pulsarConfig = getPulsarConfigs(false);
         // set type to void
         pulsarConfig.setTypeClassName(ComplexUserDefinedType.class.getName());
-        consumerConfigs.put("persistent://sample/standalone/ns1/test_result",
+        consumerConfigs.put("persistent://sample/ns1/test_result",
                 ConsumerConfig.builder().serdeClassName(ComplexSerDe.class.getName()).build());
         pulsarConfig.setTopicSchema(consumerConfigs);
 
@@ -205,4 +251,21 @@ public class PulsarSourceTest {
 
         pulsarSource.setupConsumerConfigs();
     }
+
+    @Test
+    public void testDanglingSubscriptions() throws Exception {
+        PulsarSourceConfig pulsarConfig = getPulsarConfigs(true);
+
+        PulsarSource<?> pulsarSource = new PulsarSource<>(getPulsarClient(), pulsarConfig, new HashMap<>(), Thread.currentThread().getContextClassLoader());
+        try {
+            pulsarSource.open(new HashMap<>(), mock(SourceContext.class));
+            fail();
+        } catch (CompletionException e) {
+            pulsarSource.close();
+            assertEquals(pulsarSource.getInputConsumers().size(), 1);
+        } catch (Exception e) {
+            fail();
+        }
+
+    }
 }