You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2019/03/15 22:05:58 UTC
[pulsar] branch master updated: Fix topic name logic for
partitioned topics (#3693)
This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new fdaa9e3 Fix topic name logic for partitioned topics (#3693)
fdaa9e3 is described below
commit fdaa9e3728e463bc67f5e946833d1dac392412e2
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Fri Mar 15 15:05:53 2019 -0700
Fix topic name logic for partitioned topics (#3693)
* Since partitioned topics have a -partition-<partitionid> affixed to the topic name,
when doing explicit acking, check for the case to determine the right topic name
* added unittests
---
.../instance/src/main/python/contextimpl.py | 21 +++++++++++++++------
.../src/scripts/run_python_instance_tests.sh | 3 ++-
.../src/test/python/test_python_instance.py | 22 +++++++++++++++++++++-
3 files changed, 38 insertions(+), 8 deletions(-)
diff --git a/pulsar-functions/instance/src/main/python/contextimpl.py b/pulsar-functions/instance/src/main/python/contextimpl.py
index 638e64f..f3a9710 100644
--- a/pulsar-functions/instance/src/main/python/contextimpl.py
+++ b/pulsar-functions/instance/src/main/python/contextimpl.py
@@ -23,6 +23,7 @@
"""contextimpl.py: ContextImpl class that implements the Context interface
"""
+import re
import time
import os
import json
@@ -54,7 +55,6 @@ class ContextImpl(pulsar.Context):
self.publish_producers = {}
self.publish_serializers = {}
self.message = None
- self.current_input_topic_name = None
self.current_start_time = None
self.user_config = json.loads(instance_config.function_details.userConfig) \
if instance_config.function_details.userConfig \
@@ -73,7 +73,6 @@ class ContextImpl(pulsar.Context):
# Called on a per message basis to set the context for the current message
def set_current_message_context(self, message, topic):
self.message = message
- self.current_input_topic_name = topic
self.current_start_time = time.time()
def get_message_id(self):
@@ -89,7 +88,7 @@ class ContextImpl(pulsar.Context):
return self.message.properties()
def get_current_message_topic_name(self):
- return self.current_input_topic_name
+ return self.message.topic_name()
def get_function_name(self):
return self.instance_config.function_details.name
@@ -176,9 +175,19 @@ class ContextImpl(pulsar.Context):
self.publish_producers[topic_name].send_async(output_bytes, partial(self.callback_wrapper, callback, topic_name, self.get_message_id()), properties=properties)
def ack(self, msgid, topic):
- if topic not in self.consumers:
- raise ValueError('Invalid topicname %s' % topic)
- self.consumers[topic].acknowledge(msgid)
+ topic_consumer = None
+ if topic in self.consumers:
+ topic_consumer = self.consumers[topic]
+ else:
+ # if this topic is a partitioned topic
+ m = re.search('(.+)-partition-(\d+)', topic)
+ if not m:
+ raise ValueError('Invalid topicname %s' % topic)
+ elif m.group(1) in self.consumers:
+ topic_consumer = self.consumers[m.group(1)]
+ else:
+ raise ValueError('Invalid topicname %s' % topic)
+ topic_consumer.acknowledge(msgid)
def get_and_reset_metrics(self):
metrics = self.get_metrics()
diff --git a/pulsar-functions/instance/src/scripts/run_python_instance_tests.sh b/pulsar-functions/instance/src/scripts/run_python_instance_tests.sh
index 9b33c24..7005b9b 100644
--- a/pulsar-functions/instance/src/scripts/run_python_instance_tests.sh
+++ b/pulsar-functions/instance/src/scripts/run_python_instance_tests.sh
@@ -22,9 +22,10 @@
# Make sure dependencies are installed
pip install mock --user
pip install protobuf --user
+pip install fastavro --user
CUR_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null && pwd )"
PULSAR_HOME=$CUR_DIR/../../../../
# run instance tests
-PULSAR_HOME=${PULSAR_HOME} PYTHONPATH=${PULSAR_HOME}/pulsar-functions/instance/target/python-instance python -m unittest discover -v ${PULSAR_HOME}/pulsar-functions/instance/target/python-instance/tests
\ No newline at end of file
+PULSAR_HOME=${PULSAR_HOME} PYTHONPATH=${PULSAR_HOME}/pulsar-functions/instance/target/python-instance python -m unittest discover -v ${PULSAR_HOME}/pulsar-functions/instance/target/python-instance/tests
diff --git a/pulsar-functions/instance/src/test/python/test_python_instance.py b/pulsar-functions/instance/src/test/python/test_python_instance.py
index 0071e2f..8b92fa8 100644
--- a/pulsar-functions/instance/src/test/python/test_python_instance.py
+++ b/pulsar-functions/instance/src/test/python/test_python_instance.py
@@ -20,9 +20,12 @@
# DEPENDENCIES: unittest2,mock
+from mock import Mock
+import sys
+sys.modules['prometheus_client'] = Mock()
+
from contextimpl import ContextImpl
from python_instance import InstanceConfig
-from mock import Mock
from pulsar import Message
import Function_pb2
@@ -68,4 +71,21 @@ class TestContextImpl(unittest.TestCase):
self.assertEqual(args[1].args[1], "test_topic_name")
self.assertEqual(args[1].args[2], "test_message_id")
+ def test_context_ack_partitionedtopic(self):
+ instance_id = 'test_instance_id'
+ function_id = 'test_function_id'
+ function_version = 'test_function_version'
+ function_details = Function_pb2.FunctionDetails()
+ max_buffered_tuples = 100;
+ instance_config = InstanceConfig(instance_id, function_id, function_version, function_details, max_buffered_tuples)
+ logger = log.Log
+ pulsar_client = Mock()
+ user_code=__file__
+ consumer = Mock()
+ consumer.acknowledge = Mock(return_value=None)
+ consumers = {"mytopic" : consumer}
+ context_impl = ContextImpl(instance_config, logger, pulsar_client, user_code, consumers, None, None, None, None)
+ context_impl.ack("test_message_id", "mytopic-partition-3")
+ args, kwargs = consumer.acknowledge.call_args
+ self.assertEqual(args[0], "test_message_id")
\ No newline at end of file