You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/12/28 05:43:17 UTC
[pulsar] branch master updated: Support multi-topic and
pattern-topic for PulsarConsumerSource in pulsar (#3256)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a010166 Support multi-topic and pattern-topic for PulsarConsumerSource in pulsar (#3256)
a010166 is described below
commit a010166f9e67587a065fbe50657a9c76fb7403a2
Author: penghui <co...@gmail.com>
AuthorDate: Fri Dec 28 13:43:12 2018 +0800
Support multi-topic and pattern-topic for PulsarConsumerSource in pulsar (#3256)
### Motivation
Fixes #3255
Easier to consume multiple topics by PulsarConsumerSource.
### Modifications
Add multi-topic and pattern-topic set for PulsarConsumerSource.
### Result
UT passed.
---
.../connectors/pulsar/PulsarConsumerSource.java | 25 +++++--
.../connectors/pulsar/PulsarSourceBuilder.java | 79 +++++++++++++++++++---
.../connectors/pulsar/PulsarSourceBuilderTest.java | 30 +++++++-
3 files changed, 118 insertions(+), 16 deletions(-)
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
index 84e0e50..6479bf0 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.shaded.curator.org.apache.curator.shaded.com.google.common.collect.Lists;
import org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.util.IOUtils;
@@ -41,6 +42,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
/**
* Pulsar source (consumer) which receives messages from a topic and acknowledges messages.
@@ -55,7 +57,8 @@ class PulsarConsumerSource<T> extends MessageAcknowledgingSourceBase<T, MessageI
private final int messageReceiveTimeoutMs = 100;
private final String serviceUrl;
- private final String topic;
+ private final Set<String> topicNames;
+ private final Pattern topicsPattern;
private final String subscriptionName;
private final DeserializationSchema<T> deserializer;
@@ -72,7 +75,8 @@ class PulsarConsumerSource<T> extends MessageAcknowledgingSourceBase<T, MessageI
PulsarConsumerSource(PulsarSourceBuilder<T> builder) {
super(MessageId.class);
this.serviceUrl = builder.serviceUrl;
- this.topic = builder.topic;
+ this.topicNames = builder.topicNames;
+ this.topicsPattern = builder.topicsPattern;
this.deserializer = builder.deserializationSchema;
this.subscriptionName = builder.subscriptionName;
this.acknowledgementBatchSize = builder.acknowledgementBatchSize;
@@ -191,10 +195,17 @@ class PulsarConsumerSource<T> extends MessageAcknowledgingSourceBase<T, MessageI
}
Consumer<byte[]> createConsumer(PulsarClient client) throws PulsarClientException {
- return client.newConsumer()
- .topic(topic)
- .subscriptionName(subscriptionName)
- .subscriptionType(SubscriptionType.Failover)
- .subscribe();
+ if (topicsPattern != null) {
+ return client.newConsumer().topicsPattern(topicsPattern)
+ .subscriptionName(subscriptionName)
+ .subscriptionType(SubscriptionType.Failover)
+ .subscribe();
+ } else {
+ return client.newConsumer()
+ .topics(Lists.newArrayList(topicNames))
+ .subscriptionName(subscriptionName)
+ .subscriptionType(SubscriptionType.Failover)
+ .subscribe();
+ }
}
}
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java
index 9605f07..7d06806 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java
@@ -24,6 +24,12 @@ import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Preconditions;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.regex.Pattern;
+
/**
* A class for building a pulsar source.
*/
@@ -36,7 +42,8 @@ public class PulsarSourceBuilder<T> {
final DeserializationSchema<T> deserializationSchema;
String serviceUrl = SERVICE_URL;
- String topic;
+ final Set<String> topicNames = new TreeSet<>();
+ Pattern topicsPattern;
String subscriptionName = "flink-sub";
long acknowledgementBatchSize = ACKNOWLEDGEMENT_BATCH_SIZE;
@@ -57,18 +64,74 @@ public class PulsarSourceBuilder<T> {
}
/**
- * Sets the topic to consumer from. This is required.
+ * Sets topics to consumer from. This is required.
+ *
+ * <p>Topic names (https://pulsar.apache.org/docs/latest/getting-started/ConceptsAndArchitecture/#Topics)
+ * are in the following format:
+ * {persistent|non-persistent}://tenant/namespace/topic
+ *
+ * @param topics the topic to consumer from
+ * @return this builder
+ */
+ public PulsarSourceBuilder<T> topic(String... topics) {
+ Preconditions.checkArgument(topics != null && topics.length > 0,
+ "topics cannot be blank");
+ for (String topic : topics) {
+ Preconditions.checkArgument(StringUtils.isNotBlank(topic), "topicNames cannot have blank topic");
+ }
+ this.topicNames.addAll(Arrays.asList(topics));
+ return this;
+ }
+
+ /**
+ * Sets topics to consumer from. This is required.
+ *
+ * <p>Topic names (https://pulsar.apache.org/docs/latest/getting-started/ConceptsAndArchitecture/#Topics)
+ * are in the following format:
+ * {persistent|non-persistent}://tenant/namespace/topic
+ *
+ * @param topics the topic to consumer from
+ * @return this builder
+ */
+ public PulsarSourceBuilder<T> topics(List<String> topics) {
+ Preconditions.checkArgument(topics != null && !topics.isEmpty(), "topics cannot be blank");
+ topics.forEach(topicName ->
+ Preconditions.checkArgument(StringUtils.isNotBlank(topicName), "topicNames cannot have blank topic"));
+ this.topicNames.addAll(topics);
+ return this;
+ }
+
+ /**
+ * Use topic pattern to config sets of topics to consumer
+ *
+ * <p>Topic names (https://pulsar.apache.org/docs/latest/getting-started/ConceptsAndArchitecture/#Topics)
+ * are in the following format:
+ * {persistent|non-persistent}://tenant/namespace/topic
+ *
+ * @param topicsPattern topic pattern to consumer from
+ * @return this builder
+ */
+ public PulsarSourceBuilder<T> topicsPattern(Pattern topicsPattern) {
+ Preconditions.checkArgument(topicsPattern != null, "Param topicsPattern cannot be null");
+ Preconditions.checkArgument(this.topicsPattern == null, "Pattern has already been set.");
+ this.topicsPattern = topicsPattern;
+ return this;
+ }
+
+ /**
+ * Use topic pattern to config sets of topics to consumer
*
* <p>Topic names (https://pulsar.apache.org/docs/latest/getting-started/ConceptsAndArchitecture/#Topics)
* are in the following format:
* {persistent|non-persistent}://tenant/namespace/topic
*
- * @param topic the topic to consumer from
+ * @param topicsPattern topic pattern string to consumer from
* @return this builder
*/
- public PulsarSourceBuilder<T> topic(String topic) {
- Preconditions.checkArgument(StringUtils.isNotBlank(topic), "topic cannot be blank");
- this.topic = topic;
+ public PulsarSourceBuilder<T> topicsPatternString(String topicsPattern) {
+ Preconditions.checkArgument(StringUtils.isNotBlank(topicsPattern), "Topics pattern string cannot be blank");
+ Preconditions.checkArgument(this.topicsPattern == null, "Pattern has already been set.");
+ this.topicsPattern = Pattern.compile(topicsPattern);
return this;
}
@@ -101,9 +164,9 @@ public class PulsarSourceBuilder<T> {
public SourceFunction<T> build() {
Preconditions.checkNotNull(serviceUrl, "a service url is required");
- Preconditions.checkNotNull(topic, "a topic is required");
+ Preconditions.checkArgument((topicNames != null && !topicNames.isEmpty()) || topicsPattern != null,
+ "At least one topic or topics pattern is required");
Preconditions.checkNotNull(subscriptionName, "a subscription name is required");
-
return new PulsarConsumerSource<>(this);
}
diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilderTest.java b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilderTest.java
index 5a916e8..b6b159d 100644
--- a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilderTest.java
+++ b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilderTest.java
@@ -26,6 +26,8 @@ import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
+import java.util.Arrays;
+import java.util.regex.Pattern;
/**
* Tests for PulsarSourceBuilder
@@ -49,7 +51,7 @@ public class PulsarSourceBuilderTest {
Assert.assertNotNull(sourceFunction);
}
- @Test(expected = NullPointerException.class)
+ @Test(expected = IllegalArgumentException.class)
public void testBuildWithoutSettingRequiredProperties() {
pulsarSourceBuilder.build();
}
@@ -75,6 +77,32 @@ public class PulsarSourceBuilderTest {
}
@Test(expected = IllegalArgumentException.class)
+ public void testTopicsWithNull() {
+ pulsarSourceBuilder.topics(null);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testTopicsWithBlank() {
+ pulsarSourceBuilder.topics(Arrays.asList(" ", " "));
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testTopicPatternWithNull() {
+ pulsarSourceBuilder.topicsPattern(null);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testTopicPatternAlreadySet() {
+ pulsarSourceBuilder.topicsPattern(Pattern.compile("persistent://tenants/ns/topic-*"));
+ pulsarSourceBuilder.topicsPattern(Pattern.compile("persistent://tenants/ns/topic-my-*"));
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testTopicPattenStringWithNull() {
+ pulsarSourceBuilder.topicsPatternString(null);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
public void testSubscriptionNameWithNull() {
pulsarSourceBuilder.subscriptionName(null);
}