You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/09/06 08:01:06 UTC
[incubator-pulsar] branch master updated: TopicPatterns are now
supported by python functions (#2506)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 669196c TopicPatterns are now supported by python functions (#2506)
669196c is described below
commit 669196c27d3c2028a112d939774744dae8ab1b88
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Thu Sep 6 01:01:03 2018 -0700
TopicPatterns are now supported by python functions (#2506)
---
.../instance/src/main/python/python_instance.py | 26 ++++++++++++++--------
.../src/main/python/python_instance_main.py | 3 ---
.../functions/utils/validation/ValidatorImpls.java | 9 --------
3 files changed, 17 insertions(+), 21 deletions(-)
diff --git a/pulsar-functions/instance/src/main/python/python_instance.py b/pulsar-functions/instance/src/main/python/python_instance.py
index b43b882..ddd546e 100644
--- a/pulsar-functions/instance/src/main/python/python_instance.py
+++ b/pulsar-functions/instance/src/main/python/python_instance.py
@@ -172,7 +172,7 @@ class PythonInstance(object):
self.consumers[topic] = self.pulsar_client.subscribe(
str(topic), subscription_name,
consumer_type=mode,
- message_listener=partial(self.message_listener, topic, self.input_serdes[topic]),
+ message_listener=partial(self.message_listener, self.input_serdes[topic]),
unacked_messages_timeout_ms=int(self.timeout_ms) if self.timeout_ms else None
)
@@ -183,12 +183,20 @@ class PythonInstance(object):
serde_kclass = util.import_class(os.path.dirname(self.user_code), consumer_conf.serdeClassName)
self.input_serdes[topic] = serde_kclass()
Log.info("Setting up consumer for topic %s with subname %s" % (topic, subscription_name))
- self.consumers[topic] = self.pulsar_client.subscribe(
- str(topic), subscription_name,
- consumer_type=mode,
- message_listener=partial(self.message_listener, topic, self.input_serdes[topic]),
- unacked_messages_timeout_ms=int(self.timeout_ms) if self.timeout_ms else None
- )
+ if consumer_conf.isRegexPattern:
+ self.consumers[topic] = self.pulsar_client.subscribe_pattern(
+ str(topic), subscription_name,
+ consumer_type=mode,
+ message_listener=partial(self.message_listener, self.input_serdes[topic]),
+ unacked_messages_timeout_ms=int(self.timeout_ms) if self.timeout_ms else None
+ )
+ else:
+ self.consumers[topic] = self.pulsar_client.subscribe(
+ str(topic), subscription_name,
+ consumer_type=mode,
+ message_listener=partial(self.message_listener, self.input_serdes[topic]),
+ unacked_messages_timeout_ms=int(self.timeout_ms) if self.timeout_ms else None
+ )
function_kclass = util.import_class(os.path.dirname(self.user_code), self.instance_config.function_details.className)
if function_kclass is None:
@@ -301,8 +309,8 @@ class PythonInstance(object):
batching_max_publish_delay_ms=1,
max_pending_messages=100000)
- def message_listener(self, topic, serde, consumer, message):
- item = InternalMessage(message, topic, serde, consumer)
+ def message_listener(self, serde, consumer, message):
+ item = InternalMessage(message, consumer.topic(), serde, consumer)
self.queue.put(item, True)
if self.atmost_once and self.auto_ack:
consumer.acknowledge(message)
diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py b/pulsar-functions/instance/src/main/python/python_instance_main.py
index 514d7fe..d9f1132 100644
--- a/pulsar-functions/instance/src/main/python/python_instance_main.py
+++ b/pulsar-functions/instance/src/main/python/python_instance_main.py
@@ -82,9 +82,6 @@ def main():
Log.info("Starting Python instance with %s" % str(args))
- if function_details.source.topicsPattern:
- raise ValueError('topicsPattern is not supported by python client')
-
authentication = None
use_tls = False
tls_allow_insecure_connection = False
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
index cbeb970..f264d2a 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
@@ -506,15 +506,6 @@ public class ValidatorImpls {
if (functionConfig.getWindowConfig() != null) {
throw new IllegalArgumentException("There is currently no support windowing in python");
}
-
- if (functionConfig.getTopicsPattern() != null && !functionConfig.getTopicsPattern().isEmpty()) {
- throw new IllegalArgumentException("Topic-patterns is not supported for python runtime");
- }
- functionConfig.getInputSpecs().forEach((topic, conf) -> {
- if (conf.isRegexPattern()) {
- throw new IllegalArgumentException("Topic-patterns is not supported for python runtime");
- }
- });
}
private static void verifyNoTopicClash(Collection<String> inputTopics, String outputTopic) throws IllegalArgumentException {