You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/05/08 12:33:58 UTC
[pulsar] 12/38: Ensure that all dangling consumers are cleaned up
during failures (#6778)
This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 3cfa938bfe06ef12839df7754260f7e0b67e42fa
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Tue Apr 21 17:21:40 2020 -0700
Ensure that all dangling consumers are cleaned up during failures (#6778)
Co-authored-by: Sanjeev Kulkarni <sa...@splunk.com>(cherry picked from commit 98b818b5fa63ee2e4a67887cf96330ae652dafa2)
---
.../pulsar/functions/source/PulsarSource.java | 25 +++--
.../pulsar/functions/source/PulsarSourceTest.java | 113 ++++++++++++++++-----
2 files changed, 103 insertions(+), 35 deletions(-)
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index 0d23ce1..fa7146d 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -47,7 +47,7 @@ public class PulsarSource<T> extends PushSource<T> implements MessageListener<T>
private final Map<String, String> properties;
private final ClassLoader functionClassLoader;
private List<String> inputTopics;
- private List<Consumer<T>> inputConsumers = Collections.emptyList();
+ private List<Consumer<T>> inputConsumers = new LinkedList<>();
private final TopicSchema topicSchema;
public PulsarSource(PulsarClient pulsarClient, PulsarSourceConfig pulsarConfig, Map<String, String> properties,
@@ -65,7 +65,7 @@ public class PulsarSource<T> extends PushSource<T> implements MessageListener<T>
log.info("Opening pulsar source with config: {}", pulsarSourceConfig);
Map<String, ConsumerConfig<T>> configs = setupConsumerConfigs();
- inputConsumers = configs.entrySet().stream().map(e -> {
+ for (Map.Entry<String, ConsumerConfig<T>> e : configs.entrySet()) {
String topic = e.getKey();
ConsumerConfig<T> conf = e.getValue();
log.info("Creating consumers for topic : {}, schema : {}, schemaInfo: {}",
@@ -80,17 +80,17 @@ public class PulsarSource<T> extends PushSource<T> implements MessageListener<T>
.messageListener(this);
if (conf.isRegexPattern) {
- cb.topicsPattern(topic);
+ cb = cb.topicsPattern(topic);
} else {
- cb.topic(topic);
+ cb = cb.topics(Collections.singletonList(topic));
}
if (conf.getReceiverQueueSize() != null) {
- cb.receiverQueueSize(conf.getReceiverQueueSize());
+ cb = cb.receiverQueueSize(conf.getReceiverQueueSize());
}
- cb.properties(properties);
+ cb = cb.properties(properties);
if (pulsarSourceConfig.getTimeoutMs() != null) {
- cb.ackTimeout(pulsarSourceConfig.getTimeoutMs(), TimeUnit.MILLISECONDS);
+ cb = cb.ackTimeout(pulsarSourceConfig.getTimeoutMs(), TimeUnit.MILLISECONDS);
}
if (pulsarSourceConfig.getMaxMessageRetries() != null && pulsarSourceConfig.getMaxMessageRetries() >= 0) {
@@ -99,11 +99,12 @@ public class PulsarSource<T> extends PushSource<T> implements MessageListener<T>
if (pulsarSourceConfig.getDeadLetterTopic() != null && !pulsarSourceConfig.getDeadLetterTopic().isEmpty()) {
deadLetterPolicyBuilder.deadLetterTopic(pulsarSourceConfig.getDeadLetterTopic());
}
- cb.deadLetterPolicy(deadLetterPolicyBuilder.build());
+ cb = cb.deadLetterPolicy(deadLetterPolicyBuilder.build());
}
- return cb.subscribeAsync();
- }).collect(Collectors.toList()).stream().map(CompletableFuture::join).collect(Collectors.toList());
+ Consumer<T> consumer = cb.subscribeAsync().join();
+ inputConsumers.add(consumer);
+ }
inputTopics = inputConsumers.stream().flatMap(c -> {
return (c instanceof MultiTopicsConsumerImpl) ? ((MultiTopicsConsumerImpl<?>) c).getTopics().stream()
@@ -176,6 +177,10 @@ public class PulsarSource<T> extends PushSource<T> implements MessageListener<T>
return inputTopics;
}
+ public List<Consumer<T>> getInputConsumers() {
+ return inputConsumers;
+ }
+
@Data
@Builder
private static class ConsumerConfig<T> {
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
index d6e03d3..c2e556c 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
@@ -20,19 +20,19 @@ package org.apache.pulsar.functions.source;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.*;
import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertTrue;
import static org.testng.AssertJUnit.fail;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import lombok.Cleanup;
import lombok.Getter;
@@ -49,6 +49,7 @@ import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.functions.api.SerDe;
import org.apache.pulsar.io.core.SourceContext;
+import org.mockito.ArgumentMatcher;
import org.testng.annotations.Test;
@Slf4j
@@ -56,11 +57,21 @@ public class PulsarSourceTest {
private static Map<String, ConsumerConfig> consumerConfigs = new HashMap<>();
static {
- consumerConfigs.put("persistent://sample/standalone/ns1/test_result", ConsumerConfig.builder()
+ consumerConfigs.put("persistent://sample/ns1/test_result", ConsumerConfig.builder()
.serdeClassName(TopicSchema.DEFAULT_SERDE).isRegexPattern(false).build());
}
- public static class TestSerDe implements SerDe<String> {
+ private static Map<String, ConsumerConfig> multipleConsumerConfigs = new HashMap<>();
+ static {
+ multipleConsumerConfigs.put("persistent://sample/ns1/test_result1", ConsumerConfig.builder()
+ .serdeClassName(TopicSchema.DEFAULT_SERDE).isRegexPattern(false).build());
+ multipleConsumerConfigs.put("persistent://sample/ns1/test_result2", ConsumerConfig.builder()
+ .serdeClassName(TopicSchema.DEFAULT_SERDE).isRegexPattern(false).build());
+ multipleConsumerConfigs.put("persistent://sample/ns1/test_result3", ConsumerConfig.builder()
+ .serdeClassName(TopicSchema.DEFAULT_SERDE).isRegexPattern(false).build());
+ }
+
+ public static class TestSerDe implements SerDe<String> {
@Override
public String deserialize(byte[] input) {
@@ -78,26 +89,61 @@ public class PulsarSourceTest {
*/
private static PulsarClientImpl getPulsarClient() throws PulsarClientException {
PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class);
- ConsumerBuilder<?> consumerBuilder = mock(ConsumerBuilder.class);
- doReturn(consumerBuilder).when(consumerBuilder).topics(anyList());
- doReturn(consumerBuilder).when(consumerBuilder).cryptoFailureAction(any());
- doReturn(consumerBuilder).when(consumerBuilder).subscriptionName(any());
- doReturn(consumerBuilder).when(consumerBuilder).subscriptionInitialPosition(any());
- doReturn(consumerBuilder).when(consumerBuilder).subscriptionType(any());
- doReturn(consumerBuilder).when(consumerBuilder).ackTimeout(anyLong(), any());
- doReturn(consumerBuilder).when(consumerBuilder).messageListener(any());
+ ConsumerBuilder<?> goodConsumerBuilder = mock(ConsumerBuilder.class);
+ ConsumerBuilder<?> badConsumerBuilder = mock(ConsumerBuilder.class);
+ doReturn(goodConsumerBuilder).when(goodConsumerBuilder).topics(argThat(new TopicMatcher("persistent://sample/ns1/test_result")));
+ doReturn(goodConsumerBuilder).when(goodConsumerBuilder).topics(argThat(new TopicMatcher("persistent://sample/ns1/test_result1")));
+ doReturn(badConsumerBuilder).when(goodConsumerBuilder).topics(argThat(new TopicMatcher("persistent://sample/ns1/test_result2")));
+ doReturn(goodConsumerBuilder).when(goodConsumerBuilder).topics(argThat(new TopicMatcher("persistent://sample/ns1/test_result3")));
+ doReturn(goodConsumerBuilder).when(goodConsumerBuilder).cryptoFailureAction(any());
+ doReturn(goodConsumerBuilder).when(goodConsumerBuilder).subscriptionName(any());
+ doReturn(goodConsumerBuilder).when(goodConsumerBuilder).subscriptionInitialPosition(any());
+ doReturn(goodConsumerBuilder).when(goodConsumerBuilder).subscriptionType(any());
+ doReturn(goodConsumerBuilder).when(goodConsumerBuilder).ackTimeout(anyLong(), any());
+ doReturn(goodConsumerBuilder).when(goodConsumerBuilder).messageListener(any());
+ doReturn(goodConsumerBuilder).when(goodConsumerBuilder).properties(any());
+ doReturn(badConsumerBuilder).when(badConsumerBuilder).cryptoFailureAction(any());
+ doReturn(badConsumerBuilder).when(badConsumerBuilder).subscriptionName(any());
+ doReturn(badConsumerBuilder).when(badConsumerBuilder).subscriptionInitialPosition(any());
+ doReturn(badConsumerBuilder).when(badConsumerBuilder).subscriptionType(any());
+ doReturn(badConsumerBuilder).when(badConsumerBuilder).ackTimeout(anyLong(), any());
+ doReturn(badConsumerBuilder).when(badConsumerBuilder).messageListener(any());
+ doReturn(badConsumerBuilder).when(badConsumerBuilder).properties(any());
+
Consumer<?> consumer = mock(Consumer.class);
- doReturn(consumer).when(consumerBuilder).subscribe();
- doReturn(consumerBuilder).when(pulsarClient).newConsumer(any());
- doReturn(CompletableFuture.completedFuture(consumer)).when(consumerBuilder).subscribeAsync();
+ doReturn(consumer).when(goodConsumerBuilder).subscribe();
+ doReturn(goodConsumerBuilder).when(pulsarClient).newConsumer(any());
+ doReturn(CompletableFuture.completedFuture(consumer)).when(goodConsumerBuilder).subscribeAsync();
+ CompletableFuture<Consumer<?>> badFuture = new CompletableFuture<>();
+ badFuture.completeExceptionally(new PulsarClientException("Some Error"));
+ doReturn(badFuture).when(badConsumerBuilder).subscribeAsync();
+ doThrow(PulsarClientException.class).when(badConsumerBuilder).subscribe();
doReturn(CompletableFuture.completedFuture(Optional.empty())).when(pulsarClient).getSchema(anyString());
return pulsarClient;
}
- private static PulsarSourceConfig getPulsarConfigs() {
+ private static class TopicMatcher implements ArgumentMatcher<List<String>> {
+ private final String topic;
+
+ public TopicMatcher(String topic) {
+ this.topic = topic;
+ }
+
+ @Override
+ public boolean matches(List<String> arg) {
+ return arg.contains(topic);
+ }
+ }
+
+
+ private static PulsarSourceConfig getPulsarConfigs(boolean multiple) {
PulsarSourceConfig pulsarConfig = new PulsarSourceConfig();
pulsarConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
- pulsarConfig.setTopicSchema(consumerConfigs);
+ if (multiple) {
+ pulsarConfig.setTopicSchema(multipleConsumerConfigs);
+ } else {
+ pulsarConfig.setTopicSchema(consumerConfigs);
+ }
pulsarConfig.setTypeClassName(String.class.getName());
pulsarConfig.setSubscriptionPosition(SubscriptionInitialPosition.Latest);
pulsarConfig.setSubscriptionType(SubscriptionType.Shared);
@@ -126,7 +172,7 @@ public class PulsarSourceTest {
@Test
public void testVoidInputClasses() throws Exception {
- PulsarSourceConfig pulsarConfig = getPulsarConfigs();
+ PulsarSourceConfig pulsarConfig = getPulsarConfigs(false);
// set type to void
pulsarConfig.setTypeClassName(Void.class.getName());
@@ -150,11 +196,11 @@ public class PulsarSourceTest {
*/
@Test
public void testInconsistentInputType() throws Exception {
- PulsarSourceConfig pulsarConfig = getPulsarConfigs();
+ PulsarSourceConfig pulsarConfig = getPulsarConfigs(false);
// set type to be inconsistent to that of SerDe
pulsarConfig.setTypeClassName(Integer.class.getName());
Map<String, ConsumerConfig> topicSerdeClassNameMap = new HashMap<>();
- topicSerdeClassNameMap.put("persistent://sample/standalone/ns1/test_result",
+ topicSerdeClassNameMap.put("persistent://sample/ns1/test_result",
ConsumerConfig.builder().serdeClassName(TestSerDe.class.getName()).build());
pulsarConfig.setTopicSchema(topicSerdeClassNameMap);
@@ -178,10 +224,10 @@ public class PulsarSourceTest {
@Test
public void testDefaultSerDe() throws Exception {
- PulsarSourceConfig pulsarConfig = getPulsarConfigs();
+ PulsarSourceConfig pulsarConfig = getPulsarConfigs(false);
// set type to void
pulsarConfig.setTypeClassName(String.class.getName());
- consumerConfigs.put("persistent://sample/standalone/ns1/test_result",
+ consumerConfigs.put("persistent://sample/ns1/test_result",
ConsumerConfig.builder().serdeClassName(TopicSchema.DEFAULT_SERDE).build());
pulsarConfig.setTopicSchema(consumerConfigs);
@@ -193,10 +239,10 @@ public class PulsarSourceTest {
@Test
public void testComplexOuputType() throws Exception {
- PulsarSourceConfig pulsarConfig = getPulsarConfigs();
+ PulsarSourceConfig pulsarConfig = getPulsarConfigs(false);
// set type to void
pulsarConfig.setTypeClassName(ComplexUserDefinedType.class.getName());
- consumerConfigs.put("persistent://sample/standalone/ns1/test_result",
+ consumerConfigs.put("persistent://sample/ns1/test_result",
ConsumerConfig.builder().serdeClassName(ComplexSerDe.class.getName()).build());
pulsarConfig.setTopicSchema(consumerConfigs);
@@ -205,4 +251,21 @@ public class PulsarSourceTest {
pulsarSource.setupConsumerConfigs();
}
+
+ @Test
+ public void testDanglingSubscriptions() throws Exception {
+ PulsarSourceConfig pulsarConfig = getPulsarConfigs(true);
+
+ PulsarSource<?> pulsarSource = new PulsarSource<>(getPulsarClient(), pulsarConfig, new HashMap<>(), Thread.currentThread().getContextClassLoader());
+ try {
+ pulsarSource.open(new HashMap<>(), mock(SourceContext.class));
+ fail();
+ } catch (CompletionException e) {
+ pulsarSource.close();
+ assertEquals(pulsarSource.getInputConsumers().size(), 1);
+ } catch (Exception e) {
+ fail();
+ }
+
+ }
}