You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/12/28 05:43:17 UTC

[pulsar] branch master updated: Support multi-topic and pattern-topic for PulsarConsumerSource in pulsar (#3256)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a010166  Support multi-topic and pattern-topic for PulsarConsumerSource in pulsar (#3256)
a010166 is described below

commit a010166f9e67587a065fbe50657a9c76fb7403a2
Author: penghui <co...@gmail.com>
AuthorDate: Fri Dec 28 13:43:12 2018 +0800

    Support multi-topic and pattern-topic for PulsarConsumerSource in pulsar (#3256)
    
    ### Motivation
    
    Fixes #3255
    
    Easier to consume multiple topics by PulsarConsumerSource.
    
    ### Modifications
    
    Add multi-topic and pattern-topic set for PulsarConsumerSource.
    
    ### Result
    
    UT passed.
---
 .../connectors/pulsar/PulsarConsumerSource.java    | 25 +++++--
 .../connectors/pulsar/PulsarSourceBuilder.java     | 79 +++++++++++++++++++---
 .../connectors/pulsar/PulsarSourceBuilderTest.java | 30 +++++++-
 3 files changed, 118 insertions(+), 16 deletions(-)

diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
index 84e0e50..6479bf0 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.shaded.curator.org.apache.curator.shaded.com.google.common.collect.Lists;
 import org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.util.IOUtils;
@@ -41,6 +42,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
 
 /**
  * Pulsar source (consumer) which receives messages from a topic and acknowledges messages.
@@ -55,7 +57,8 @@ class PulsarConsumerSource<T> extends MessageAcknowledgingSourceBase<T, MessageI
 
     private final int messageReceiveTimeoutMs = 100;
     private final String serviceUrl;
-    private final String topic;
+    private final Set<String> topicNames;
+    private final Pattern topicsPattern;
     private final String subscriptionName;
     private final DeserializationSchema<T> deserializer;
 
@@ -72,7 +75,8 @@ class PulsarConsumerSource<T> extends MessageAcknowledgingSourceBase<T, MessageI
     PulsarConsumerSource(PulsarSourceBuilder<T> builder) {
         super(MessageId.class);
         this.serviceUrl = builder.serviceUrl;
-        this.topic = builder.topic;
+        this.topicNames = builder.topicNames;
+        this.topicsPattern = builder.topicsPattern;
         this.deserializer = builder.deserializationSchema;
         this.subscriptionName = builder.subscriptionName;
         this.acknowledgementBatchSize = builder.acknowledgementBatchSize;
@@ -191,10 +195,17 @@ class PulsarConsumerSource<T> extends MessageAcknowledgingSourceBase<T, MessageI
     }
 
     Consumer<byte[]> createConsumer(PulsarClient client) throws PulsarClientException {
-        return client.newConsumer()
-            .topic(topic)
-            .subscriptionName(subscriptionName)
-            .subscriptionType(SubscriptionType.Failover)
-            .subscribe();
+        if (topicsPattern != null) {
+            return client.newConsumer().topicsPattern(topicsPattern)
+                    .subscriptionName(subscriptionName)
+                    .subscriptionType(SubscriptionType.Failover)
+                    .subscribe();
+        } else {
+            return client.newConsumer()
+                    .topics(Lists.newArrayList(topicNames))
+                    .subscriptionName(subscriptionName)
+                    .subscriptionType(SubscriptionType.Failover)
+                    .subscribe();
+        }
     }
 }
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java
index 9605f07..7d06806 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java
@@ -24,6 +24,12 @@ import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.util.Preconditions;
 
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.regex.Pattern;
+
 /**
  * A class for building a pulsar source.
  */
@@ -36,7 +42,8 @@ public class PulsarSourceBuilder<T> {
 
     final DeserializationSchema<T> deserializationSchema;
     String serviceUrl = SERVICE_URL;
-    String topic;
+    final Set<String> topicNames = new TreeSet<>();
+    Pattern topicsPattern;
     String subscriptionName = "flink-sub";
     long acknowledgementBatchSize = ACKNOWLEDGEMENT_BATCH_SIZE;
 
@@ -57,18 +64,74 @@ public class PulsarSourceBuilder<T> {
     }
 
     /**
-     * Sets the topic to consumer from. This is required.
+     * Sets topics to consumer from. This is required.
+     *
+     * <p>Topic names (https://pulsar.apache.org/docs/latest/getting-started/ConceptsAndArchitecture/#Topics)
+     * are in the following format:
+     * {persistent|non-persistent}://tenant/namespace/topic
+     *
+     * @param topics the topic to consumer from
+     * @return this builder
+     */
+    public PulsarSourceBuilder<T> topic(String... topics) {
+        Preconditions.checkArgument(topics != null && topics.length > 0,
+                "topics cannot be blank");
+        for (String topic : topics) {
+            Preconditions.checkArgument(StringUtils.isNotBlank(topic), "topicNames cannot have blank topic");
+        }
+        this.topicNames.addAll(Arrays.asList(topics));
+        return this;
+    }
+
+    /**
+     * Sets topics to consumer from. This is required.
+     *
+     * <p>Topic names (https://pulsar.apache.org/docs/latest/getting-started/ConceptsAndArchitecture/#Topics)
+     * are in the following format:
+     * {persistent|non-persistent}://tenant/namespace/topic
+     *
+     * @param topics the topic to consumer from
+     * @return this builder
+     */
+    public PulsarSourceBuilder<T> topics(List<String> topics) {
+        Preconditions.checkArgument(topics != null && !topics.isEmpty(), "topics cannot be blank");
+        topics.forEach(topicName ->
+                Preconditions.checkArgument(StringUtils.isNotBlank(topicName), "topicNames cannot have blank topic"));
+        this.topicNames.addAll(topics);
+        return this;
+    }
+
+    /**
+     * Use topic pattern to config sets of topics to consumer
+     *
+     * <p>Topic names (https://pulsar.apache.org/docs/latest/getting-started/ConceptsAndArchitecture/#Topics)
+     * are in the following format:
+     * {persistent|non-persistent}://tenant/namespace/topic
+     *
+     * @param topicsPattern topic pattern to consumer from
+     * @return this builder
+     */
+    public PulsarSourceBuilder<T> topicsPattern(Pattern topicsPattern) {
+        Preconditions.checkArgument(topicsPattern != null, "Param topicsPattern cannot be null");
+        Preconditions.checkArgument(this.topicsPattern == null, "Pattern has already been set.");
+        this.topicsPattern = topicsPattern;
+        return this;
+    }
+
+    /**
+     * Use topic pattern to config sets of topics to consumer
      *
      * <p>Topic names (https://pulsar.apache.org/docs/latest/getting-started/ConceptsAndArchitecture/#Topics)
      * are in the following format:
      * {persistent|non-persistent}://tenant/namespace/topic
      *
-     * @param topic the topic to consumer from
+     * @param topicsPattern topic pattern string to consumer from
      * @return this builder
      */
-    public PulsarSourceBuilder<T> topic(String topic) {
-        Preconditions.checkArgument(StringUtils.isNotBlank(topic), "topic cannot be blank");
-        this.topic = topic;
+    public PulsarSourceBuilder<T> topicsPatternString(String topicsPattern) {
+        Preconditions.checkArgument(StringUtils.isNotBlank(topicsPattern), "Topics pattern string cannot be blank");
+        Preconditions.checkArgument(this.topicsPattern == null, "Pattern has already been set.");
+        this.topicsPattern = Pattern.compile(topicsPattern);
         return this;
     }
 
@@ -101,9 +164,9 @@ public class PulsarSourceBuilder<T> {
 
     public SourceFunction<T> build() {
         Preconditions.checkNotNull(serviceUrl, "a service url is required");
-        Preconditions.checkNotNull(topic, "a topic is required");
+        Preconditions.checkArgument((topicNames != null && !topicNames.isEmpty()) || topicsPattern != null,
+                "At least one topic or topics pattern is required");
         Preconditions.checkNotNull(subscriptionName, "a subscription name is required");
-
         return new PulsarConsumerSource<>(this);
     }
 
diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilderTest.java b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilderTest.java
index 5a916e8..b6b159d 100644
--- a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilderTest.java
+++ b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilderTest.java
@@ -26,6 +26,8 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.regex.Pattern;
 
 /**
  * Tests for PulsarSourceBuilder
@@ -49,7 +51,7 @@ public class PulsarSourceBuilderTest {
         Assert.assertNotNull(sourceFunction);
     }
 
-    @Test(expected = NullPointerException.class)
+    @Test(expected = IllegalArgumentException.class)
     public void testBuildWithoutSettingRequiredProperties() {
         pulsarSourceBuilder.build();
     }
@@ -75,6 +77,32 @@ public class PulsarSourceBuilderTest {
     }
 
     @Test(expected = IllegalArgumentException.class)
+    public void testTopicsWithNull() {
+        pulsarSourceBuilder.topics(null);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testTopicsWithBlank() {
+        pulsarSourceBuilder.topics(Arrays.asList(" ", " "));
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testTopicPatternWithNull() {
+        pulsarSourceBuilder.topicsPattern(null);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testTopicPatternAlreadySet() {
+        pulsarSourceBuilder.topicsPattern(Pattern.compile("persistent://tenants/ns/topic-*"));
+        pulsarSourceBuilder.topicsPattern(Pattern.compile("persistent://tenants/ns/topic-my-*"));
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testTopicPattenStringWithNull() {
+        pulsarSourceBuilder.topicsPatternString(null);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
     public void testSubscriptionNameWithNull() {
         pulsarSourceBuilder.subscriptionName(null);
     }