You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/03/24 06:12:52 UTC

[flink-statefun] branch release-2.0 updated (ef036db -> 47e996d)

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a change to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git.


    from ef036db  [hotfix] [kafka] Non-set Kafka keys should be empty strings
     new 9e20f51  [FLINK-16738] [py] Add kinesis_egress_record builder method to Python SDK
     new 47e996d  [hotfix] [py] Improve PyDoc for kafka_egress_record method

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 statefun-python-sdk/statefun/__init__.py           |  3 +-
 statefun-python-sdk/statefun/core.py               | 32 ++++++++++++++---
 .../{kafka_egress_pb2.py => kinesis_egress_pb2.py} | 41 +++++++++++++---------
 statefun-python-sdk/tests/request_reply_test.py    | 14 ++++++++
 4 files changed, 68 insertions(+), 22 deletions(-)
 copy statefun-python-sdk/statefun/{kafka_egress_pb2.py => kinesis_egress_pb2.py} (63%)


[flink-statefun] 01/02: [FLINK-16738] [py] Add kinesis_egress_record builder method to Python SDK

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 9e20f513a5a71bb9a8e36ac9698996dc58e7457f
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Tue Mar 24 12:51:17 2020 +0800

    [FLINK-16738] [py] Add kinesis_egress_record builder method to Python SDK
---
 statefun-python-sdk/statefun/__init__.py           |   3 +-
 statefun-python-sdk/statefun/core.py               |  26 ++++-
 statefun-python-sdk/statefun/kinesis_egress_pb2.py | 107 +++++++++++++++++++++
 statefun-python-sdk/tests/request_reply_test.py    |  14 +++
 4 files changed, 148 insertions(+), 2 deletions(-)

diff --git a/statefun-python-sdk/statefun/__init__.py b/statefun-python-sdk/statefun/__init__.py
index c0268fa..06ba3ed 100644
--- a/statefun-python-sdk/statefun/__init__.py
+++ b/statefun-python-sdk/statefun/__init__.py
@@ -19,4 +19,5 @@
 from statefun.core import StatefulFunctions
 from statefun.request_reply import RequestReplyHandler
 
-from statefun.core import kafka_egress_record
\ No newline at end of file
+from statefun.core import kafka_egress_record
+from statefun.core import kinesis_egress_record
\ No newline at end of file
diff --git a/statefun-python-sdk/statefun/core.py b/statefun-python-sdk/statefun/core.py
index 0a80e94..1b14a82 100644
--- a/statefun-python-sdk/statefun/core.py
+++ b/statefun-python-sdk/statefun/core.py
@@ -20,7 +20,7 @@ from google.protobuf.any_pb2 import Any
 import inspect
 
 from statefun.kafka_egress_pb2 import KafkaProducerRecord
-
+from statefun.kinesis_egress_pb2 import KinesisEgressRecord
 
 class SdkAddress(object):
     def __init__(self, namespace, type, identity):
@@ -208,3 +208,27 @@ def kafka_egress_record(topic: str, value, key: str = None):
     if key is not None:
         record.key = key
     return record
+
+def kinesis_egress_record(stream: str, value, partition_key: str, explicit_hash_key: str = None):
+    """
+    Build a ProtobufMessage that can be emitted to a Kinesis generic egress.
+
+    :param stream: The AWS Kinesis destination stream for that record
+    :param partition_key: the utf8 encoded string partition key to use
+    :param value: the Protobuf value to produce
+    :param explicit_hash_key: a utf8 encoded string explicit hash key to use (can be empty)
+    :return: A Protobuf message representing the record to be produced to AWS Kinesis via the Kinesis generic egress.
+    """
+    if not stream:
+        raise ValueError("Missing destination Kinesis stream")
+    if not value:
+        raise ValueError("Missing value")
+    if not partition_key:
+        raise ValueError("Missung partition key")
+    record = KinesisEgressRecord()
+    record.stream = stream
+    record.value_bytes = value.SerializeToString()
+    record.partition_key = partition_key
+    if explicit_hash_key is not None:
+        record.explicit_hash_key = explicit_hash_key
+    return record
diff --git a/statefun-python-sdk/statefun/kinesis_egress_pb2.py b/statefun-python-sdk/statefun/kinesis_egress_pb2.py
new file mode 100644
index 0000000..0a5b5ac
--- /dev/null
+++ b/statefun-python-sdk/statefun/kinesis_egress_pb2.py
@@ -0,0 +1,107 @@
+# -*- coding: utf-8 -*-
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+# Generated by the protocol buffer compiler.  DO NOT EDIT!
+# source: kinesis-egress.proto
+
+from google.protobuf import descriptor as _descriptor
+from google.protobuf import message as _message
+from google.protobuf import reflection as _reflection
+from google.protobuf import symbol_database as _symbol_database
+# @@protoc_insertion_point(imports)
+
+_sym_db = _symbol_database.Default()
+
+
+
+
+DESCRIPTOR = _descriptor.FileDescriptor(
+  name='kinesis-egress.proto',
+  package='org.apache.flink.statefun.flink.io',
+  syntax='proto3',
+  serialized_options=b'\n,org.apache.flink.statefun.flink.io.generatedP\001',
+  serialized_pb=b'\n\x14kinesis-egress.proto\x12\"org.apache.flink.statefun.flink.io\"l\n\x13KinesisEgressRecord\x12\x15\n\rpartition_key\x18\x01 \x01(\t\x12\x13\n\x0bvalue_bytes\x18\x02 \x01(\x0c\x12\x0e\n\x06stream\x18\x03 \x01(\t\x12\x19\n\x11\x65xplicit_hash_key\x18\x04 \x01(\tB0\n,org.apache.flink.statefun.flink.io.generatedP\x01\x62\x06proto3'
+)
+
+
+
+
+_KINESISEGRESSRECORD = _descriptor.Descriptor(
+  name='KinesisEgressRecord',
+  full_name='org.apache.flink.statefun.flink.io.KinesisEgressRecord',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='partition_key', full_name='org.apache.flink.statefun.flink.io.KinesisEgressRecord.partition_key', index=0,
+      number=1, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=b"".decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      serialized_options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='value_bytes', full_name='org.apache.flink.statefun.flink.io.KinesisEgressRecord.value_bytes', index=1,
+      number=2, type=12, cpp_type=9, label=1,
+      has_default_value=False, default_value=b"",
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      serialized_options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='stream', full_name='org.apache.flink.statefun.flink.io.KinesisEgressRecord.stream', index=2,
+      number=3, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=b"".decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      serialized_options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='explicit_hash_key', full_name='org.apache.flink.statefun.flink.io.KinesisEgressRecord.explicit_hash_key', index=3,
+      number=4, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=b"".decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      serialized_options=None, file=DESCRIPTOR),
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  serialized_options=None,
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=60,
+  serialized_end=168,
+)
+
+DESCRIPTOR.message_types_by_name['KinesisEgressRecord'] = _KINESISEGRESSRECORD
+_sym_db.RegisterFileDescriptor(DESCRIPTOR)
+
+KinesisEgressRecord = _reflection.GeneratedProtocolMessageType('KinesisEgressRecord', (_message.Message,), {
+  'DESCRIPTOR' : _KINESISEGRESSRECORD,
+  '__module__' : 'kinesis_egress_pb2'
+  # @@protoc_insertion_point(class_scope:org.apache.flink.statefun.flink.io.KinesisEgressRecord)
+  })
+_sym_db.RegisterMessage(KinesisEgressRecord)
+
+
+DESCRIPTOR._options = None
+# @@protoc_insertion_point(module_scope)
diff --git a/statefun-python-sdk/tests/request_reply_test.py b/statefun-python-sdk/tests/request_reply_test.py
index 3e7437a..2a2a956 100644
--- a/statefun-python-sdk/tests/request_reply_test.py
+++ b/statefun-python-sdk/tests/request_reply_test.py
@@ -26,6 +26,7 @@ from tests.examples_pb2 import LoginEvent, SeenCount
 from statefun.request_reply_pb2 import ToFunction, FromFunction
 from statefun import RequestReplyHandler
 from statefun.core import StatefulFunctions, kafka_egress_record
+from statefun.core import StatefulFunctions, kinesis_egress_record
 
 
 class InvocationBuilder(object):
@@ -130,6 +131,19 @@ class RequestReplyTestCase(unittest.TestCase):
             context.pack_and_send_egress("sdk/kafka",
                                          kafka_egress_record(topic="hello", value=seen))
 
+            # AWS Kinesis generic egress
+            context.pack_and_send_egress("sdk/kinesis",
+                                         kinesis_egress_record(
+                                             stream="hello",
+                                             partition_key=u"hello world",
+                                             value=seen,
+                                             explicit_hash_key=u"1234"))
+            context.pack_and_send_egress("sdk/kinesis",
+                                         kinesis_egress_record(
+                                             stream="hello",
+                                             partition_key=u"hello world",
+                                             value=seen))
+
         #
         # build the invocation
         #


[flink-statefun] 02/02: [hotfix] [py] Improve PyDoc for kafka_egress_record method

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 47e996d4fdabb8a0b45e047b88d92cbeb0a0395c
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Tue Mar 24 12:53:24 2020 +0800

    [hotfix] [py] Improve PyDoc for kafka_egress_record method
---
 statefun-python-sdk/statefun/core.py | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/statefun-python-sdk/statefun/core.py b/statefun-python-sdk/statefun/core.py
index 1b14a82..50c1492 100644
--- a/statefun-python-sdk/statefun/core.py
+++ b/statefun-python-sdk/statefun/core.py
@@ -191,12 +191,12 @@ class StatefulFunctions:
 
 def kafka_egress_record(topic: str, value, key: str = None):
     """
-    Build a ProtobufMessage that can be emitted to a Protobuf based egress.
+    Build a ProtobufMessage that can be emitted to a Kafka generic egress.
 
-    :param topic: The kafka detention topic for that record
+    :param topic: The Kafka destination topic for that record
     :param key: the utf8 encoded string key to produce (can be empty)
     :param value: the Protobuf value to produce
-    :return: A Protobuf message represents the record to be produced via the kafka procurer.
+    :return: A Protobuf message representing the record to be produced via the Kafka generic egress.
     """
     if not topic:
         raise ValueError("A destination Kafka topic is missing")