You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/03/24 04:27:01 UTC
[flink-statefun] 02/04: [FLINK-16569] [py] Allow empty keys in
kafka_egress_builder method
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit d74c1b24da0ab8621924b50756ef6d68e30aafb7
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Tue Mar 24 12:16:15 2020 +0800
[FLINK-16569] [py] Allow empty keys in kafka_egress_builder method
---
statefun-python-sdk/statefun/core.py | 9 ++++-----
statefun-python-sdk/tests/request_reply_test.py | 2 ++
2 files changed, 6 insertions(+), 5 deletions(-)
diff --git a/statefun-python-sdk/statefun/core.py b/statefun-python-sdk/statefun/core.py
index 20fbf88..e962c6f 100644
--- a/statefun-python-sdk/statefun/core.py
+++ b/statefun-python-sdk/statefun/core.py
@@ -189,23 +189,22 @@ class StatefulFunctions:
return self.functions[(namespace, type)]
-def kafka_egress_builder(topic: str, key: str, value):
+def kafka_egress_builder(topic: str, value, key: str = None):
"""
Build a ProtobufMessage that can be emitted to a Protobuf based egress.
:param topic: The kafka detention topic for that record
- :param key: the utf8 encoded string key to produce
+ :param key: the utf8 encoded string key to produce (can be empty)
:param value: the Protobuf value to produce
:return: A Protobuf message represents the record to be produced via the kafka procurer.
"""
if not topic:
raise ValueError("A destination Kafka topic is missing")
- if not key:
- raise ValueError("A key is missing")
if not value:
raise ValueError("Missing value")
record = KafkaProducerRecord()
record.topic = topic
- record.key = key
record.value_bytes = value.SerializeToString()
+ if key is not None:
+ record.key = key
return record
diff --git a/statefun-python-sdk/tests/request_reply_test.py b/statefun-python-sdk/tests/request_reply_test.py
index a749bcd..6c80d00 100644
--- a/statefun-python-sdk/tests/request_reply_test.py
+++ b/statefun-python-sdk/tests/request_reply_test.py
@@ -127,6 +127,8 @@ class RequestReplyTestCase(unittest.TestCase):
# kafka egress
context.pack_and_send_egress("sdk/kafka",
kafka_egress_builder(topic="hello", key=u"hello world", value=seen))
+ context.pack_and_send_egress("sdk/kafka",
+ kafka_egress_builder(topic="hello", value=seen))
#
# build the invocation