You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2018/10/10 23:45:13 UTC

[pulsar] branch master updated: Fix Topic Pattern functionality for Python functions (#2760)

This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 4ed2e33  Fix Topic Pattern functionality for Python functions (#2760)
4ed2e33 is described below

commit 4ed2e336a27a5e987878955d033f1f396fa3b184
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Wed Oct 10 16:45:09 2018 -0700

    Fix Topic Pattern functionality for Python functions (#2760)
    
    * pulsar client no longer has subscribe_pattern interface
    
    * Added integration tests for topic pattern
    
    * Fixed integration test
---
 .../instance/src/main/python/python_instance.py    |  5 +-
 .../integration/functions/PulsarFunctionsTest.java | 78 +++++++++++++++++++---
 .../functions/utils/CommandGenerator.java          | 22 ++----
 3 files changed, 76 insertions(+), 29 deletions(-)

diff --git a/pulsar-functions/instance/src/main/python/python_instance.py b/pulsar-functions/instance/src/main/python/python_instance.py
index 54a7329..5a1bff5 100644
--- a/pulsar-functions/instance/src/main/python/python_instance.py
+++ b/pulsar-functions/instance/src/main/python/python_instance.py
@@ -36,6 +36,7 @@ from collections import namedtuple
 from threading import Timer
 import traceback
 import sys
+import re
 
 import pulsar
 import contextimpl
@@ -202,8 +203,8 @@ class PythonInstance(object):
       self.input_serdes[topic] = serde_kclass()
       Log.info("Setting up consumer for topic %s with subname %s" % (topic, subscription_name))
       if consumer_conf.isRegexPattern:
-        self.consumers[topic] = self.pulsar_client.subscribe_pattern(
-          str(topic), subscription_name,
+        self.consumers[topic] = self.pulsar_client.subscribe(
+          re.compile(str(topic)), subscription_name,
           consumer_type=mode,
           message_listener=partial(self.message_listener, self.input_serdes[topic]),
           unacked_messages_timeout_ms=int(self.timeout_ms) if self.timeout_ms else None
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index b8a1dae..0ea5404 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -24,8 +24,11 @@ import static org.testng.Assert.fail;
 
 import com.google.common.base.Stopwatch;
 import com.google.gson.Gson;
+
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
@@ -581,22 +584,48 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
 
     @Test(enabled = false)
     public void testPythonExclamationFunction() throws Exception {
-        testExclamationFunction(Runtime.PYTHON);
+        testExclamationFunction(Runtime.PYTHON, false);
+    }
+
+    @Test(enabled = false)
+    public void testPythonExclamationTopicPatternFunction() throws Exception {
+        testExclamationFunction(Runtime.PYTHON, true);
     }
 
     @Test
     public void testJavaExclamationFunction() throws Exception {
-        testExclamationFunction(Runtime.JAVA);
+        testExclamationFunction(Runtime.JAVA, false);
+    }
+
+    @Test
+    public void testJavaExclamationTopicPatternFunction() throws Exception {
+        testExclamationFunction(Runtime.JAVA, true);
     }
 
-    private void testExclamationFunction(Runtime runtime) throws Exception {
+    private void testExclamationFunction(Runtime runtime, boolean isTopicPattern) throws Exception {
         if (functionRuntimeType == FunctionRuntimeType.THREAD && runtime == Runtime.PYTHON) {
             // python can only run on process mode
             return;
         }
 
-        String inputTopicName = "test-exclamation-" + runtime + "-input-" + randomName(8);
+        String inputTopicName = "persistent://public/default/test-exclamation-" + runtime + "-input-" + randomName(8);
         String outputTopicName = "test-exclamation-" + runtime + "-output-" + randomName(8);
+        if (isTopicPattern) {
+            @Cleanup PulsarClient client = PulsarClient.builder()
+                    .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+                    .build();
+            @Cleanup Consumer<String> consumer1 = client.newConsumer(Schema.STRING)
+                    .topic(inputTopicName + "1")
+                    .subscriptionType(SubscriptionType.Exclusive)
+                    .subscriptionName("test-sub")
+                    .subscribe();
+            @Cleanup Consumer<String> consumer2 = client.newConsumer(Schema.STRING)
+                    .topic(inputTopicName + "2")
+                    .subscriptionType(SubscriptionType.Exclusive)
+                    .subscriptionName("test-sub")
+                    .subscribe();
+            inputTopicName = inputTopicName + ".*";
+        }
         String functionName = "test-exclamation-fn-" + randomName(8);
         final int numMessages = 10;
 
@@ -640,7 +669,11 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
                                            String functionClass,
                                            Schema<T> inputTopicSchema) throws Exception {
         CommandGenerator generator;
-        generator = CommandGenerator.createDefaultGenerator(inputTopicName, functionClass);
+        if (inputTopicName.endsWith(".*")) {
+            generator = CommandGenerator.createTopicPatternGenerator(inputTopicName, functionClass);
+        } else {
+            generator = CommandGenerator.createDefaultGenerator(inputTopicName, functionClass);
+        }
         generator.setSinkTopic(outputTopicName);
         generator.setFunctionName(functionName);
         String command;
@@ -731,17 +764,40 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
             .subscriptionType(SubscriptionType.Exclusive)
             .subscriptionName("test-sub")
             .subscribe();
-        @Cleanup Producer<String> producer = client.newProducer(Schema.STRING)
-            .topic(inputTopic)
-            .create();
+        if (inputTopic.endsWith(".*")) {
+            @Cleanup Producer<String> producer1 = client.newProducer(Schema.STRING)
+                    .topic(inputTopic.substring(0, inputTopic.length() - 2) + "1")
+                    .create();
+            @Cleanup Producer<String> producer2 = client.newProducer(Schema.STRING)
+                    .topic(inputTopic.substring(0, inputTopic.length() - 2) + "2")
+                    .create();
+
+            for (int i = 0; i < numMessages / 2; i++) {
+                producer1.send("message-" + i);
+            }
+
+            for (int i = numMessages / 2; i < numMessages; i++) {
+                producer2.send("message-" + i);
+            }
+        } else {
+            @Cleanup Producer<String> producer = client.newProducer(Schema.STRING)
+                    .topic(inputTopic)
+                    .create();
 
+            for (int i = 0; i < numMessages; i++) {
+                producer.send("message-" + i);
+            }
+        }
+
+        Set<String> expectedMessages = new HashSet<>();
         for (int i = 0; i < numMessages; i++) {
-            producer.send("message-" + i);
+            expectedMessages.add("message-" + i + "!");
         }
 
         for (int i = 0; i < numMessages; i++) {
-            Message<String> msg = consumer.receive();
-            assertEquals("message-" + i + "!", msg.getValue());
+            Message<String> msg = consumer.receive(10, TimeUnit.SECONDS);
+            assertTrue(expectedMessages.contains(msg.getValue()));
+            expectedMessages.remove(msg.getValue());
         }
     }
 
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java
index 6f4d012..64b70a6 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java
@@ -39,6 +39,7 @@ public class CommandGenerator {
     private String namespace = "default";
     private String functionClassName;
     private String sourceTopic;
+    private String sourceTopicPattern;
     private Map<String, String> customSereSourceTopics;
     private String sinkTopic;
     private String logTopic;
@@ -64,28 +65,14 @@ public class CommandGenerator {
         return generator;
     }
 
-    public static CommandGenerator createDefaultGenerator(Map<String, String> customSereSourceTopics,
-                                                          String functionClassName) {
+    public static CommandGenerator createTopicPatternGenerator(String sourceTopicPattern, String functionClassName) {
         CommandGenerator generator = new CommandGenerator();
-        generator.setCustomSereSourceTopics(customSereSourceTopics);
+        generator.setSourceTopicPattern(sourceTopicPattern);
         generator.setFunctionClassName(functionClassName);
         generator.setRuntime(Runtime.JAVA);
         return generator;
     }
 
-    public static CommandGenerator createDefaultGenerator(String tenant, String namespace, String functionName) {
-        CommandGenerator generator = new CommandGenerator();
-        generator.setTenant(tenant);
-        generator.setNamespace(namespace);
-        generator.setFunctionName(functionName);
-        generator.setRuntime(Runtime.JAVA);
-        return generator;
-    }
-
-    public void createAdminUrl(String workerHost, int port) {
-        adminUrl = "http://" + workerHost + ":" + port;
-    }
-
     public String generateCreateFunctionCommand() {
         return generateCreateFunctionCommand(null);
     }
@@ -110,6 +97,9 @@ public class CommandGenerator {
         if (sourceTopic != null) {
             commandBuilder.append(" --inputs " + sourceTopic);
         }
+        if (sourceTopicPattern != null) {
+            commandBuilder.append(" --topics-pattern " + sourceTopicPattern);
+        }
         if (logTopic != null) {
             commandBuilder.append(" --logTopic " + logTopic);
         }