You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/12/28 05:43:14 UTC

[GitHub] sijie closed pull request #3256: Support multi-topic and pattern-topic for PulsarConsumerSource in pulsar

sijie closed pull request #3256: Support multi-topic and pattern-topic for PulsarConsumerSource in pulsar
URL: https://github.com/apache/pulsar/pull/3256
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
index 84e0e50664..6479bf0b1a 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
@@ -22,6 +22,7 @@
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.shaded.curator.org.apache.curator.shaded.com.google.common.collect.Lists;
 import org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.util.IOUtils;
@@ -41,6 +42,7 @@
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
 
 /**
  * Pulsar source (consumer) which receives messages from a topic and acknowledges messages.
@@ -55,7 +57,8 @@
 
     private final int messageReceiveTimeoutMs = 100;
     private final String serviceUrl;
-    private final String topic;
+    private final Set<String> topicNames;
+    private final Pattern topicsPattern;
     private final String subscriptionName;
     private final DeserializationSchema<T> deserializer;
 
@@ -72,7 +75,8 @@
     PulsarConsumerSource(PulsarSourceBuilder<T> builder) {
         super(MessageId.class);
         this.serviceUrl = builder.serviceUrl;
-        this.topic = builder.topic;
+        this.topicNames = builder.topicNames;
+        this.topicsPattern = builder.topicsPattern;
         this.deserializer = builder.deserializationSchema;
         this.subscriptionName = builder.subscriptionName;
         this.acknowledgementBatchSize = builder.acknowledgementBatchSize;
@@ -191,10 +195,17 @@ PulsarClient createClient() throws PulsarClientException {
     }
 
     Consumer<byte[]> createConsumer(PulsarClient client) throws PulsarClientException {
-        return client.newConsumer()
-            .topic(topic)
-            .subscriptionName(subscriptionName)
-            .subscriptionType(SubscriptionType.Failover)
-            .subscribe();
+        if (topicsPattern != null) {
+            return client.newConsumer().topicsPattern(topicsPattern)
+                    .subscriptionName(subscriptionName)
+                    .subscriptionType(SubscriptionType.Failover)
+                    .subscribe();
+        } else {
+            return client.newConsumer()
+                    .topics(Lists.newArrayList(topicNames))
+                    .subscriptionName(subscriptionName)
+                    .subscriptionType(SubscriptionType.Failover)
+                    .subscribe();
+        }
     }
 }
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java
index 9605f079a3..7d06806d60 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java
@@ -24,6 +24,12 @@
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.util.Preconditions;
 
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.regex.Pattern;
+
 /**
  * A class for building a pulsar source.
  */
@@ -36,7 +42,8 @@
 
     final DeserializationSchema<T> deserializationSchema;
     String serviceUrl = SERVICE_URL;
-    String topic;
+    final Set<String> topicNames = new TreeSet<>();
+    Pattern topicsPattern;
     String subscriptionName = "flink-sub";
     long acknowledgementBatchSize = ACKNOWLEDGEMENT_BATCH_SIZE;
 
@@ -57,18 +64,74 @@ private PulsarSourceBuilder(DeserializationSchema<T> deserializationSchema) {
     }
 
     /**
-     * Sets the topic to consumer from. This is required.
+     * Sets topics to consumer from. This is required.
+     *
+     * <p>Topic names (https://pulsar.apache.org/docs/latest/getting-started/ConceptsAndArchitecture/#Topics)
+     * are in the following format:
+     * {persistent|non-persistent}://tenant/namespace/topic
+     *
+     * @param topics the topic to consumer from
+     * @return this builder
+     */
+    public PulsarSourceBuilder<T> topic(String... topics) {
+        Preconditions.checkArgument(topics != null && topics.length > 0,
+                "topics cannot be blank");
+        for (String topic : topics) {
+            Preconditions.checkArgument(StringUtils.isNotBlank(topic), "topicNames cannot have blank topic");
+        }
+        this.topicNames.addAll(Arrays.asList(topics));
+        return this;
+    }
+
+    /**
+     * Sets topics to consumer from. This is required.
+     *
+     * <p>Topic names (https://pulsar.apache.org/docs/latest/getting-started/ConceptsAndArchitecture/#Topics)
+     * are in the following format:
+     * {persistent|non-persistent}://tenant/namespace/topic
+     *
+     * @param topics the topic to consumer from
+     * @return this builder
+     */
+    public PulsarSourceBuilder<T> topics(List<String> topics) {
+        Preconditions.checkArgument(topics != null && !topics.isEmpty(), "topics cannot be blank");
+        topics.forEach(topicName ->
+                Preconditions.checkArgument(StringUtils.isNotBlank(topicName), "topicNames cannot have blank topic"));
+        this.topicNames.addAll(topics);
+        return this;
+    }
+
+    /**
+     * Use topic pattern to config sets of topics to consumer
+     *
+     * <p>Topic names (https://pulsar.apache.org/docs/latest/getting-started/ConceptsAndArchitecture/#Topics)
+     * are in the following format:
+     * {persistent|non-persistent}://tenant/namespace/topic
+     *
+     * @param topicsPattern topic pattern to consumer from
+     * @return this builder
+     */
+    public PulsarSourceBuilder<T> topicsPattern(Pattern topicsPattern) {
+        Preconditions.checkArgument(topicsPattern != null, "Param topicsPattern cannot be null");
+        Preconditions.checkArgument(this.topicsPattern == null, "Pattern has already been set.");
+        this.topicsPattern = topicsPattern;
+        return this;
+    }
+
+    /**
+     * Use topic pattern to config sets of topics to consumer
      *
      * <p>Topic names (https://pulsar.apache.org/docs/latest/getting-started/ConceptsAndArchitecture/#Topics)
      * are in the following format:
      * {persistent|non-persistent}://tenant/namespace/topic
      *
-     * @param topic the topic to consumer from
+     * @param topicsPattern topic pattern string to consumer from
      * @return this builder
      */
-    public PulsarSourceBuilder<T> topic(String topic) {
-        Preconditions.checkArgument(StringUtils.isNotBlank(topic), "topic cannot be blank");
-        this.topic = topic;
+    public PulsarSourceBuilder<T> topicsPatternString(String topicsPattern) {
+        Preconditions.checkArgument(StringUtils.isNotBlank(topicsPattern), "Topics pattern string cannot be blank");
+        Preconditions.checkArgument(this.topicsPattern == null, "Pattern has already been set.");
+        this.topicsPattern = Pattern.compile(topicsPattern);
         return this;
     }
 
@@ -101,9 +164,9 @@ private PulsarSourceBuilder(DeserializationSchema<T> deserializationSchema) {
 
     public SourceFunction<T> build() {
         Preconditions.checkNotNull(serviceUrl, "a service url is required");
-        Preconditions.checkNotNull(topic, "a topic is required");
+        Preconditions.checkArgument((topicNames != null && !topicNames.isEmpty()) || topicsPattern != null,
+                "At least one topic or topics pattern is required");
         Preconditions.checkNotNull(subscriptionName, "a subscription name is required");
-
         return new PulsarConsumerSource<>(this);
     }
 
diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilderTest.java b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilderTest.java
index 5a916e8809..b6b159d99e 100644
--- a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilderTest.java
+++ b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilderTest.java
@@ -26,6 +26,8 @@
 import org.junit.Test;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.regex.Pattern;
 
 /**
  * Tests for PulsarSourceBuilder
@@ -49,7 +51,7 @@ public void testBuild() {
         Assert.assertNotNull(sourceFunction);
     }
 
-    @Test(expected = NullPointerException.class)
+    @Test(expected = IllegalArgumentException.class)
     public void testBuildWithoutSettingRequiredProperties() {
         pulsarSourceBuilder.build();
     }
@@ -74,6 +76,32 @@ public void testTopicWithBlank() {
         pulsarSourceBuilder.topic(" ");
     }
 
+    @Test(expected = IllegalArgumentException.class)
+    public void testTopicsWithNull() {
+        pulsarSourceBuilder.topics(null);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testTopicsWithBlank() {
+        pulsarSourceBuilder.topics(Arrays.asList(" ", " "));
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testTopicPatternWithNull() {
+        pulsarSourceBuilder.topicsPattern(null);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testTopicPatternAlreadySet() {
+        pulsarSourceBuilder.topicsPattern(Pattern.compile("persistent://tenants/ns/topic-*"));
+        pulsarSourceBuilder.topicsPattern(Pattern.compile("persistent://tenants/ns/topic-my-*"));
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testTopicPattenStringWithNull() {
+        pulsarSourceBuilder.topicsPatternString(null);
+    }
+
     @Test(expected = IllegalArgumentException.class)
     public void testSubscriptionNameWithNull() {
         pulsarSourceBuilder.subscriptionName(null);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services