You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/10/10 23:45:11 UTC

[GitHub] srkukarni closed pull request #2760: Fix Topic Pattern functionality for Python functions

srkukarni closed pull request #2760: Fix Topic Pattern functionality for Python functions
URL: https://github.com/apache/pulsar/pull/2760
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-functions/instance/src/main/python/python_instance.py b/pulsar-functions/instance/src/main/python/python_instance.py
index 54a7329265..5a1bff541b 100644
--- a/pulsar-functions/instance/src/main/python/python_instance.py
+++ b/pulsar-functions/instance/src/main/python/python_instance.py
@@ -36,6 +36,7 @@
 from threading import Timer
 import traceback
 import sys
+import re
 
 import pulsar
 import contextimpl
@@ -202,8 +203,8 @@ def run(self):
       self.input_serdes[topic] = serde_kclass()
       Log.info("Setting up consumer for topic %s with subname %s" % (topic, subscription_name))
       if consumer_conf.isRegexPattern:
-        self.consumers[topic] = self.pulsar_client.subscribe_pattern(
-          str(topic), subscription_name,
+        self.consumers[topic] = self.pulsar_client.subscribe(
+          re.compile(str(topic)), subscription_name,
           consumer_type=mode,
           message_listener=partial(self.message_listener, self.input_serdes[topic]),
           unacked_messages_timeout_ms=int(self.timeout_ms) if self.timeout_ms else None
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index b8a1daefd2..0ea5404913 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -24,8 +24,11 @@
 
 import com.google.common.base.Stopwatch;
 import com.google.gson.Gson;
+
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
@@ -581,22 +584,48 @@ protected void getSourceInfoNotFound(String tenant, String namespace, String sou
 
     @Test(enabled = false)
     public void testPythonExclamationFunction() throws Exception {
-        testExclamationFunction(Runtime.PYTHON);
+        testExclamationFunction(Runtime.PYTHON, false);
+    }
+
+    @Test(enabled = false)
+    public void testPythonExclamationTopicPatternFunction() throws Exception {
+        testExclamationFunction(Runtime.PYTHON, true);
     }
 
     @Test
     public void testJavaExclamationFunction() throws Exception {
-        testExclamationFunction(Runtime.JAVA);
+        testExclamationFunction(Runtime.JAVA, false);
+    }
+
+    @Test
+    public void testJavaExclamationTopicPatternFunction() throws Exception {
+        testExclamationFunction(Runtime.JAVA, true);
     }
 
-    private void testExclamationFunction(Runtime runtime) throws Exception {
+    private void testExclamationFunction(Runtime runtime, boolean isTopicPattern) throws Exception {
         if (functionRuntimeType == FunctionRuntimeType.THREAD && runtime == Runtime.PYTHON) {
             // python can only run on process mode
             return;
         }
 
-        String inputTopicName = "test-exclamation-" + runtime + "-input-" + randomName(8);
+        String inputTopicName = "persistent://public/default/test-exclamation-" + runtime + "-input-" + randomName(8);
         String outputTopicName = "test-exclamation-" + runtime + "-output-" + randomName(8);
+        if (isTopicPattern) {
+            @Cleanup PulsarClient client = PulsarClient.builder()
+                    .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+                    .build();
+            @Cleanup Consumer<String> consumer1 = client.newConsumer(Schema.STRING)
+                    .topic(inputTopicName + "1")
+                    .subscriptionType(SubscriptionType.Exclusive)
+                    .subscriptionName("test-sub")
+                    .subscribe();
+            @Cleanup Consumer<String> consumer2 = client.newConsumer(Schema.STRING)
+                    .topic(inputTopicName + "2")
+                    .subscriptionType(SubscriptionType.Exclusive)
+                    .subscriptionName("test-sub")
+                    .subscribe();
+            inputTopicName = inputTopicName + ".*";
+        }
         String functionName = "test-exclamation-fn-" + randomName(8);
         final int numMessages = 10;
 
@@ -640,7 +669,11 @@ private static void submitExclamationFunction(Runtime runtime,
                                            String functionClass,
                                            Schema<T> inputTopicSchema) throws Exception {
         CommandGenerator generator;
-        generator = CommandGenerator.createDefaultGenerator(inputTopicName, functionClass);
+        if (inputTopicName.endsWith(".*")) {
+            generator = CommandGenerator.createTopicPatternGenerator(inputTopicName, functionClass);
+        } else {
+            generator = CommandGenerator.createDefaultGenerator(inputTopicName, functionClass);
+        }
         generator.setSinkTopic(outputTopicName);
         generator.setFunctionName(functionName);
         String command;
@@ -731,17 +764,40 @@ private static void publishAndConsumeMessages(String inputTopic,
             .subscriptionType(SubscriptionType.Exclusive)
             .subscriptionName("test-sub")
             .subscribe();
-        @Cleanup Producer<String> producer = client.newProducer(Schema.STRING)
-            .topic(inputTopic)
-            .create();
+        if (inputTopic.endsWith(".*")) {
+            @Cleanup Producer<String> producer1 = client.newProducer(Schema.STRING)
+                    .topic(inputTopic.substring(0, inputTopic.length() - 2) + "1")
+                    .create();
+            @Cleanup Producer<String> producer2 = client.newProducer(Schema.STRING)
+                    .topic(inputTopic.substring(0, inputTopic.length() - 2) + "2")
+                    .create();
+
+            for (int i = 0; i < numMessages / 2; i++) {
+                producer1.send("message-" + i);
+            }
+
+            for (int i = numMessages / 2; i < numMessages; i++) {
+                producer2.send("message-" + i);
+            }
+        } else {
+            @Cleanup Producer<String> producer = client.newProducer(Schema.STRING)
+                    .topic(inputTopic)
+                    .create();
 
+            for (int i = 0; i < numMessages; i++) {
+                producer.send("message-" + i);
+            }
+        }
+
+        Set<String> expectedMessages = new HashSet<>();
         for (int i = 0; i < numMessages; i++) {
-            producer.send("message-" + i);
+            expectedMessages.add("message-" + i + "!");
         }
 
         for (int i = 0; i < numMessages; i++) {
-            Message<String> msg = consumer.receive();
-            assertEquals("message-" + i + "!", msg.getValue());
+            Message<String> msg = consumer.receive(10, TimeUnit.SECONDS);
+            assertTrue(expectedMessages.contains(msg.getValue()));
+            expectedMessages.remove(msg.getValue());
         }
     }
 
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java
index 6f4d012e24..64b70a63f9 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java
@@ -39,6 +39,7 @@
     private String namespace = "default";
     private String functionClassName;
     private String sourceTopic;
+    private String sourceTopicPattern;
     private Map<String, String> customSereSourceTopics;
     private String sinkTopic;
     private String logTopic;
@@ -64,28 +65,14 @@ public static CommandGenerator createDefaultGenerator(String sourceTopic, String
         return generator;
     }
 
-    public static CommandGenerator createDefaultGenerator(Map<String, String> customSereSourceTopics,
-                                                          String functionClassName) {
+    public static CommandGenerator createTopicPatternGenerator(String sourceTopicPattern, String functionClassName) {
         CommandGenerator generator = new CommandGenerator();
-        generator.setCustomSereSourceTopics(customSereSourceTopics);
+        generator.setSourceTopicPattern(sourceTopicPattern);
         generator.setFunctionClassName(functionClassName);
         generator.setRuntime(Runtime.JAVA);
         return generator;
     }
 
-    public static CommandGenerator createDefaultGenerator(String tenant, String namespace, String functionName) {
-        CommandGenerator generator = new CommandGenerator();
-        generator.setTenant(tenant);
-        generator.setNamespace(namespace);
-        generator.setFunctionName(functionName);
-        generator.setRuntime(Runtime.JAVA);
-        return generator;
-    }
-
-    public void createAdminUrl(String workerHost, int port) {
-        adminUrl = "http://" + workerHost + ":" + port;
-    }
-
     public String generateCreateFunctionCommand() {
         return generateCreateFunctionCommand(null);
     }
@@ -110,6 +97,9 @@ public String generateCreateFunctionCommand(String codeFile) {
         if (sourceTopic != null) {
             commandBuilder.append(" --inputs " + sourceTopic);
         }
+        if (sourceTopicPattern != null) {
+            commandBuilder.append(" --topics-pattern " + sourceTopicPattern);
+        }
         if (logTopic != null) {
             commandBuilder.append(" --logTopic " + logTopic);
         }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services