You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/03/24 04:27:01 UTC

[flink-statefun] 02/04: [FLINK-16569] [py] Allow empty keys in kafka_egress_builder method

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit d74c1b24da0ab8621924b50756ef6d68e30aafb7
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Tue Mar 24 12:16:15 2020 +0800

    [FLINK-16569] [py] Allow empty keys in kafka_egress_builder method
---
 statefun-python-sdk/statefun/core.py            | 9 ++++-----
 statefun-python-sdk/tests/request_reply_test.py | 2 ++
 2 files changed, 6 insertions(+), 5 deletions(-)

diff --git a/statefun-python-sdk/statefun/core.py b/statefun-python-sdk/statefun/core.py
index 20fbf88..e962c6f 100644
--- a/statefun-python-sdk/statefun/core.py
+++ b/statefun-python-sdk/statefun/core.py
@@ -189,23 +189,22 @@ class StatefulFunctions:
         return self.functions[(namespace, type)]
 
 
-def kafka_egress_builder(topic: str, key: str, value):
+def kafka_egress_builder(topic: str, value, key: str = None):
     """
     Build a ProtobufMessage that can be emitted to a Protobuf based egress.
 
     :param topic: The kafka detention topic for that record
-    :param key: the utf8 encoded string key to produce
+    :param key: the utf8 encoded string key to produce (can be empty)
     :param value: the Protobuf value to produce
     :return: A Protobuf message represents the record to be produced via the kafka procurer.
     """
     if not topic:
         raise ValueError("A destination Kafka topic is missing")
-    if not key:
-        raise ValueError("A key is missing")
     if not value:
         raise ValueError("Missing value")
     record = KafkaProducerRecord()
     record.topic = topic
-    record.key = key
     record.value_bytes = value.SerializeToString()
+    if key is not None:
+        record.key = key
     return record
diff --git a/statefun-python-sdk/tests/request_reply_test.py b/statefun-python-sdk/tests/request_reply_test.py
index a749bcd..6c80d00 100644
--- a/statefun-python-sdk/tests/request_reply_test.py
+++ b/statefun-python-sdk/tests/request_reply_test.py
@@ -127,6 +127,8 @@ class RequestReplyTestCase(unittest.TestCase):
             # kafka egress
             context.pack_and_send_egress("sdk/kafka",
                                          kafka_egress_builder(topic="hello", key=u"hello world", value=seen))
+            context.pack_and_send_egress("sdk/kafka",
+                                         kafka_egress_builder(topic="hello", value=seen))
 
         #
         # build the invocation