You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/09/06 08:01:06 UTC

[incubator-pulsar] branch master updated: TopicPatterns are now supported by python functions (#2506)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 669196c  TopicPatterns are now supported by python functions (#2506)
669196c is described below

commit 669196c27d3c2028a112d939774744dae8ab1b88
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Thu Sep 6 01:01:03 2018 -0700

    TopicPatterns are now supported by python functions (#2506)
---
 .../instance/src/main/python/python_instance.py    | 26 ++++++++++++++--------
 .../src/main/python/python_instance_main.py        |  3 ---
 .../functions/utils/validation/ValidatorImpls.java |  9 --------
 3 files changed, 17 insertions(+), 21 deletions(-)

diff --git a/pulsar-functions/instance/src/main/python/python_instance.py b/pulsar-functions/instance/src/main/python/python_instance.py
index b43b882..ddd546e 100644
--- a/pulsar-functions/instance/src/main/python/python_instance.py
+++ b/pulsar-functions/instance/src/main/python/python_instance.py
@@ -172,7 +172,7 @@ class PythonInstance(object):
       self.consumers[topic] = self.pulsar_client.subscribe(
         str(topic), subscription_name,
         consumer_type=mode,
-        message_listener=partial(self.message_listener, topic, self.input_serdes[topic]),
+        message_listener=partial(self.message_listener, self.input_serdes[topic]),
         unacked_messages_timeout_ms=int(self.timeout_ms) if self.timeout_ms else None
       )
 
@@ -183,12 +183,20 @@ class PythonInstance(object):
         serde_kclass = util.import_class(os.path.dirname(self.user_code), consumer_conf.serdeClassName)
       self.input_serdes[topic] = serde_kclass()
       Log.info("Setting up consumer for topic %s with subname %s" % (topic, subscription_name))
-      self.consumers[topic] = self.pulsar_client.subscribe(
-        str(topic), subscription_name,
-        consumer_type=mode,
-        message_listener=partial(self.message_listener, topic, self.input_serdes[topic]),
-        unacked_messages_timeout_ms=int(self.timeout_ms) if self.timeout_ms else None
-      )
+      if consumer_conf.isRegexPattern:
+        self.consumers[topic] = self.pulsar_client.subscribe_pattern(
+          str(topic), subscription_name,
+          consumer_type=mode,
+          message_listener=partial(self.message_listener, self.input_serdes[topic]),
+          unacked_messages_timeout_ms=int(self.timeout_ms) if self.timeout_ms else None
+        )
+      else:
+        self.consumers[topic] = self.pulsar_client.subscribe(
+          str(topic), subscription_name,
+          consumer_type=mode,
+          message_listener=partial(self.message_listener, self.input_serdes[topic]),
+          unacked_messages_timeout_ms=int(self.timeout_ms) if self.timeout_ms else None
+        )
 
     function_kclass = util.import_class(os.path.dirname(self.user_code), self.instance_config.function_details.className)
     if function_kclass is None:
@@ -301,8 +309,8 @@ class PythonInstance(object):
         batching_max_publish_delay_ms=1,
         max_pending_messages=100000)
 
-  def message_listener(self, topic, serde, consumer, message):
-    item = InternalMessage(message, topic, serde, consumer)
+  def message_listener(self, serde, consumer, message):
+    item = InternalMessage(message, consumer.topic(), serde, consumer)
     self.queue.put(item, True)
     if self.atmost_once and self.auto_ack:
       consumer.acknowledge(message)
diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py b/pulsar-functions/instance/src/main/python/python_instance_main.py
index 514d7fe..d9f1132 100644
--- a/pulsar-functions/instance/src/main/python/python_instance_main.py
+++ b/pulsar-functions/instance/src/main/python/python_instance_main.py
@@ -82,9 +82,6 @@ def main():
 
   Log.info("Starting Python instance with %s" % str(args))
 
-  if function_details.source.topicsPattern:
-    raise ValueError('topicsPattern is not supported by python client')
-
   authentication = None
   use_tls = False
   tls_allow_insecure_connection = False
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
index cbeb970..f264d2a 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
@@ -506,15 +506,6 @@ public class ValidatorImpls {
             if (functionConfig.getWindowConfig() != null) {
                 throw new IllegalArgumentException("There is currently no support windowing in python");
             }
-
-            if (functionConfig.getTopicsPattern() != null && !functionConfig.getTopicsPattern().isEmpty()) {
-                throw new IllegalArgumentException("Topic-patterns is not supported for python runtime");
-            }
-            functionConfig.getInputSpecs().forEach((topic, conf) -> {
-                if (conf.isRegexPattern()) {
-                    throw new IllegalArgumentException("Topic-patterns is not supported for python runtime");
-                }
-            });
         }
 
         private static void verifyNoTopicClash(Collection<String> inputTopics, String outputTopic) throws IllegalArgumentException {