You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/03/24 04:30:22 UTC
[flink-statefun] branch release-2.0 updated (b9752c8 -> ef036db)
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a change to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git.
from b9752c8 [FLINK-16685] [py] Add a k8s example to statefun-python-sdk
new d7c2ec1 [FLINK-16723] Move python-k8s example to statefun-examples
new 7bff0b8 [FLINK-16569] [py] Allow empty keys in kafka_egress_builder method
new b9dfe68 [hotfix] [py] Rename kafka_egress_builder to kafka_egress_record
new ef036db [hotfix] [kafka] Non-set Kafka keys should be empty strings
The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
.../statefun-python-k8s}/Dockerfile.python-worker | 0
.../statefun-python-k8s}/Dockerfile.statefun | 0
.../k8s => statefun-examples/statefun-python-k8s}/README.md | 0
.../statefun-python-k8s}/build-example.sh | 2 +-
.../statefun-python-k8s}/event-generator.py | 0
.../k8s => statefun-examples/statefun-python-k8s}/main.py | 4 ++--
.../k8s => statefun-examples/statefun-python-k8s}/messages.proto | 0
.../statefun-python-k8s}/messages_pb2.py | 0
.../k8s => statefun-examples/statefun-python-k8s}/module.yaml | 0
.../statefun-python-k8s}/requirements.txt | 0
.../statefun-python-k8s}/resources/Chart.yaml | 0
.../statefun-python-k8s}/resources/templates/config-map.yaml | 0
.../resources/templates/master-deployment.yaml | 0
.../resources/templates/master-rest-service.yaml | 0
.../statefun-python-k8s}/resources/templates/master-service.yaml | 0
.../resources/templates/python-worker-deployment.yaml | 0
.../resources/templates/python-worker-service.yaml | 0
.../resources/templates/worker-deployment.yaml | 0
.../statefun-python-k8s}/resources/values.yaml | 0
.../statefun/flink/io/kafka/GenericKafkaEgressSerializer.java | 2 +-
statefun-python-sdk/statefun/__init__.py | 2 +-
statefun-python-sdk/statefun/core.py | 9 ++++-----
statefun-python-sdk/tests/request_reply_test.py | 6 ++++--
23 files changed, 13 insertions(+), 12 deletions(-)
rename {statefun-python-sdk/examples/k8s => statefun-examples/statefun-python-k8s}/Dockerfile.python-worker (100%)
rename {statefun-python-sdk/examples/k8s => statefun-examples/statefun-python-k8s}/Dockerfile.statefun (100%)
rename {statefun-python-sdk/examples/k8s => statefun-examples/statefun-python-k8s}/README.md (100%)
rename {statefun-python-sdk/examples/k8s => statefun-examples/statefun-python-k8s}/build-example.sh (93%)
rename {statefun-python-sdk/examples/k8s => statefun-examples/statefun-python-k8s}/event-generator.py (100%)
rename {statefun-python-sdk/examples/k8s => statefun-examples/statefun-python-k8s}/main.py (93%)
rename {statefun-python-sdk/examples/k8s => statefun-examples/statefun-python-k8s}/messages.proto (100%)
rename {statefun-python-sdk/examples/k8s => statefun-examples/statefun-python-k8s}/messages_pb2.py (100%)
rename {statefun-python-sdk/examples/k8s => statefun-examples/statefun-python-k8s}/module.yaml (100%)
rename {statefun-python-sdk/examples/k8s => statefun-examples/statefun-python-k8s}/requirements.txt (100%)
rename {statefun-python-sdk/examples/k8s => statefun-examples/statefun-python-k8s}/resources/Chart.yaml (100%)
rename {statefun-python-sdk/examples/k8s => statefun-examples/statefun-python-k8s}/resources/templates/config-map.yaml (100%)
rename {statefun-python-sdk/examples/k8s => statefun-examples/statefun-python-k8s}/resources/templates/master-deployment.yaml (100%)
rename {statefun-python-sdk/examples/k8s => statefun-examples/statefun-python-k8s}/resources/templates/master-rest-service.yaml (100%)
rename {statefun-python-sdk/examples/k8s => statefun-examples/statefun-python-k8s}/resources/templates/master-service.yaml (100%)
rename {statefun-python-sdk/examples/k8s => statefun-examples/statefun-python-k8s}/resources/templates/python-worker-deployment.yaml (100%)
rename {statefun-python-sdk/examples/k8s => statefun-examples/statefun-python-k8s}/resources/templates/python-worker-service.yaml (100%)
rename {statefun-python-sdk/examples/k8s => statefun-examples/statefun-python-k8s}/resources/templates/worker-deployment.yaml (100%)
rename {statefun-python-sdk/examples/k8s => statefun-examples/statefun-python-k8s}/resources/values.yaml (100%)
[flink-statefun] 01/04: [FLINK-16723] Move python-k8s example to
statefun-examples
Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit d7c2ec1d725d2c6ba6e64e7eec60190c7bb648be
Author: Igal Shilman <ig...@gmail.com>
AuthorDate: Mon Mar 23 12:42:58 2020 +0100
[FLINK-16723] Move python-k8s example to statefun-examples
This closes #67.
---
.../statefun-python-k8s}/Dockerfile.python-worker | 0
.../k8s => statefun-examples/statefun-python-k8s}/Dockerfile.statefun | 0
.../examples/k8s => statefun-examples/statefun-python-k8s}/README.md | 0
.../k8s => statefun-examples/statefun-python-k8s}/build-example.sh | 2 +-
.../k8s => statefun-examples/statefun-python-k8s}/event-generator.py | 0
.../examples/k8s => statefun-examples/statefun-python-k8s}/main.py | 0
.../k8s => statefun-examples/statefun-python-k8s}/messages.proto | 0
.../k8s => statefun-examples/statefun-python-k8s}/messages_pb2.py | 0
.../examples/k8s => statefun-examples/statefun-python-k8s}/module.yaml | 0
.../k8s => statefun-examples/statefun-python-k8s}/requirements.txt | 0
.../k8s => statefun-examples/statefun-python-k8s}/resources/Chart.yaml | 0
.../statefun-python-k8s}/resources/templates/config-map.yaml | 0
.../statefun-python-k8s}/resources/templates/master-deployment.yaml | 0
.../statefun-python-k8s}/resources/templates/master-rest-service.yaml | 0
.../statefun-python-k8s}/resources/templates/master-service.yaml | 0
.../resources/templates/python-worker-deployment.yaml | 0
.../statefun-python-k8s}/resources/templates/python-worker-service.yaml | 0
.../statefun-python-k8s}/resources/templates/worker-deployment.yaml | 0
.../k8s => statefun-examples/statefun-python-k8s}/resources/values.yaml | 0
19 files changed, 1 insertion(+), 1 deletion(-)
diff --git a/statefun-python-sdk/examples/k8s/Dockerfile.python-worker b/statefun-examples/statefun-python-k8s/Dockerfile.python-worker
similarity index 100%
rename from statefun-python-sdk/examples/k8s/Dockerfile.python-worker
rename to statefun-examples/statefun-python-k8s/Dockerfile.python-worker
diff --git a/statefun-python-sdk/examples/k8s/Dockerfile.statefun b/statefun-examples/statefun-python-k8s/Dockerfile.statefun
similarity index 100%
rename from statefun-python-sdk/examples/k8s/Dockerfile.statefun
rename to statefun-examples/statefun-python-k8s/Dockerfile.statefun
diff --git a/statefun-python-sdk/examples/k8s/README.md b/statefun-examples/statefun-python-k8s/README.md
similarity index 100%
rename from statefun-python-sdk/examples/k8s/README.md
rename to statefun-examples/statefun-python-k8s/README.md
diff --git a/statefun-python-sdk/examples/k8s/build-example.sh b/statefun-examples/statefun-python-k8s/build-example.sh
similarity index 93%
rename from statefun-python-sdk/examples/k8s/build-example.sh
rename to statefun-examples/statefun-python-k8s/build-example.sh
index 07e5fc0..0ba540d 100755
--- a/statefun-python-sdk/examples/k8s/build-example.sh
+++ b/statefun-examples/statefun-python-k8s/build-example.sh
@@ -26,7 +26,7 @@ rm -f apache_flink_statefun-*-py3-none-any.whl
rm -rf __pycache__
# copy the whl distribution, it must be first built by calling build-distribution.sh
-cp ../../dist/apache_flink_statefun-*-py3-none-any.whl apache_flink_statefun-snapshot-py3-none-any.whl 2>/dev/null
+cp ../../statefun-python-sdk/dist/apache_flink_statefun-*-py3-none-any.whl apache_flink_statefun-snapshot-py3-none-any.whl 2>/dev/null
rc=$?
if [[ ${rc} -ne 0 ]]; then
echo "Failed copying the whl distribution, please build the distribution first by calling ./build-distribution.sh"
diff --git a/statefun-python-sdk/examples/k8s/event-generator.py b/statefun-examples/statefun-python-k8s/event-generator.py
similarity index 100%
rename from statefun-python-sdk/examples/k8s/event-generator.py
rename to statefun-examples/statefun-python-k8s/event-generator.py
diff --git a/statefun-python-sdk/examples/k8s/main.py b/statefun-examples/statefun-python-k8s/main.py
similarity index 100%
rename from statefun-python-sdk/examples/k8s/main.py
rename to statefun-examples/statefun-python-k8s/main.py
diff --git a/statefun-python-sdk/examples/k8s/messages.proto b/statefun-examples/statefun-python-k8s/messages.proto
similarity index 100%
rename from statefun-python-sdk/examples/k8s/messages.proto
rename to statefun-examples/statefun-python-k8s/messages.proto
diff --git a/statefun-python-sdk/examples/k8s/messages_pb2.py b/statefun-examples/statefun-python-k8s/messages_pb2.py
similarity index 100%
rename from statefun-python-sdk/examples/k8s/messages_pb2.py
rename to statefun-examples/statefun-python-k8s/messages_pb2.py
diff --git a/statefun-python-sdk/examples/k8s/module.yaml b/statefun-examples/statefun-python-k8s/module.yaml
similarity index 100%
rename from statefun-python-sdk/examples/k8s/module.yaml
rename to statefun-examples/statefun-python-k8s/module.yaml
diff --git a/statefun-python-sdk/examples/k8s/requirements.txt b/statefun-examples/statefun-python-k8s/requirements.txt
similarity index 100%
rename from statefun-python-sdk/examples/k8s/requirements.txt
rename to statefun-examples/statefun-python-k8s/requirements.txt
diff --git a/statefun-python-sdk/examples/k8s/resources/Chart.yaml b/statefun-examples/statefun-python-k8s/resources/Chart.yaml
similarity index 100%
rename from statefun-python-sdk/examples/k8s/resources/Chart.yaml
rename to statefun-examples/statefun-python-k8s/resources/Chart.yaml
diff --git a/statefun-python-sdk/examples/k8s/resources/templates/config-map.yaml b/statefun-examples/statefun-python-k8s/resources/templates/config-map.yaml
similarity index 100%
rename from statefun-python-sdk/examples/k8s/resources/templates/config-map.yaml
rename to statefun-examples/statefun-python-k8s/resources/templates/config-map.yaml
diff --git a/statefun-python-sdk/examples/k8s/resources/templates/master-deployment.yaml b/statefun-examples/statefun-python-k8s/resources/templates/master-deployment.yaml
similarity index 100%
rename from statefun-python-sdk/examples/k8s/resources/templates/master-deployment.yaml
rename to statefun-examples/statefun-python-k8s/resources/templates/master-deployment.yaml
diff --git a/statefun-python-sdk/examples/k8s/resources/templates/master-rest-service.yaml b/statefun-examples/statefun-python-k8s/resources/templates/master-rest-service.yaml
similarity index 100%
rename from statefun-python-sdk/examples/k8s/resources/templates/master-rest-service.yaml
rename to statefun-examples/statefun-python-k8s/resources/templates/master-rest-service.yaml
diff --git a/statefun-python-sdk/examples/k8s/resources/templates/master-service.yaml b/statefun-examples/statefun-python-k8s/resources/templates/master-service.yaml
similarity index 100%
rename from statefun-python-sdk/examples/k8s/resources/templates/master-service.yaml
rename to statefun-examples/statefun-python-k8s/resources/templates/master-service.yaml
diff --git a/statefun-python-sdk/examples/k8s/resources/templates/python-worker-deployment.yaml b/statefun-examples/statefun-python-k8s/resources/templates/python-worker-deployment.yaml
similarity index 100%
rename from statefun-python-sdk/examples/k8s/resources/templates/python-worker-deployment.yaml
rename to statefun-examples/statefun-python-k8s/resources/templates/python-worker-deployment.yaml
diff --git a/statefun-python-sdk/examples/k8s/resources/templates/python-worker-service.yaml b/statefun-examples/statefun-python-k8s/resources/templates/python-worker-service.yaml
similarity index 100%
rename from statefun-python-sdk/examples/k8s/resources/templates/python-worker-service.yaml
rename to statefun-examples/statefun-python-k8s/resources/templates/python-worker-service.yaml
diff --git a/statefun-python-sdk/examples/k8s/resources/templates/worker-deployment.yaml b/statefun-examples/statefun-python-k8s/resources/templates/worker-deployment.yaml
similarity index 100%
rename from statefun-python-sdk/examples/k8s/resources/templates/worker-deployment.yaml
rename to statefun-examples/statefun-python-k8s/resources/templates/worker-deployment.yaml
diff --git a/statefun-python-sdk/examples/k8s/resources/values.yaml b/statefun-examples/statefun-python-k8s/resources/values.yaml
similarity index 100%
rename from statefun-python-sdk/examples/k8s/resources/values.yaml
rename to statefun-examples/statefun-python-k8s/resources/values.yaml
[flink-statefun] 04/04: [hotfix] [kafka] Non-set Kafka keys should
be empty strings
Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit ef036dba86413fdfa3ecf6af2201d349f8a43dc0
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Tue Mar 24 12:23:04 2020 +0800
[hotfix] [kafka] Non-set Kafka keys should be empty strings
---
.../flink/statefun/flink/io/kafka/GenericKafkaEgressSerializer.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/GenericKafkaEgressSerializer.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/GenericKafkaEgressSerializer.java
index 19af022..e20bdf1 100644
--- a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/GenericKafkaEgressSerializer.java
+++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/GenericKafkaEgressSerializer.java
@@ -61,7 +61,7 @@ public final class GenericKafkaEgressSerializer implements KafkaEgressSerializer
final String topic = protobufProducerRecord.getTopic();
final byte[] valueBytes = protobufProducerRecord.getValueBytes().toByteArray();
- if (key == null) {
+ if (key == null || key.isEmpty()) {
return new ProducerRecord<>(topic, valueBytes);
} else {
return new ProducerRecord<>(topic, key.getBytes(StandardCharsets.UTF_8), valueBytes);
[flink-statefun] 02/04: [FLINK-16569] [py] Allow empty keys in
kafka_egress_builder method
Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit 7bff0b877a628b376ede9afca72ed2c4d69c7bb5
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Tue Mar 24 12:16:15 2020 +0800
[FLINK-16569] [py] Allow empty keys in kafka_egress_builder method
---
statefun-python-sdk/statefun/core.py | 9 ++++-----
statefun-python-sdk/tests/request_reply_test.py | 2 ++
2 files changed, 6 insertions(+), 5 deletions(-)
diff --git a/statefun-python-sdk/statefun/core.py b/statefun-python-sdk/statefun/core.py
index 20fbf88..e962c6f 100644
--- a/statefun-python-sdk/statefun/core.py
+++ b/statefun-python-sdk/statefun/core.py
@@ -189,23 +189,22 @@ class StatefulFunctions:
return self.functions[(namespace, type)]
-def kafka_egress_builder(topic: str, key: str, value):
+def kafka_egress_builder(topic: str, value, key: str = None):
"""
Build a ProtobufMessage that can be emitted to a Protobuf based egress.
:param topic: The kafka detention topic for that record
- :param key: the utf8 encoded string key to produce
+ :param key: the utf8 encoded string key to produce (can be empty)
:param value: the Protobuf value to produce
:return: A Protobuf message represents the record to be produced via the kafka procurer.
"""
if not topic:
raise ValueError("A destination Kafka topic is missing")
- if not key:
- raise ValueError("A key is missing")
if not value:
raise ValueError("Missing value")
record = KafkaProducerRecord()
record.topic = topic
- record.key = key
record.value_bytes = value.SerializeToString()
+ if key is not None:
+ record.key = key
return record
diff --git a/statefun-python-sdk/tests/request_reply_test.py b/statefun-python-sdk/tests/request_reply_test.py
index a749bcd..6c80d00 100644
--- a/statefun-python-sdk/tests/request_reply_test.py
+++ b/statefun-python-sdk/tests/request_reply_test.py
@@ -127,6 +127,8 @@ class RequestReplyTestCase(unittest.TestCase):
# kafka egress
context.pack_and_send_egress("sdk/kafka",
kafka_egress_builder(topic="hello", key=u"hello world", value=seen))
+ context.pack_and_send_egress("sdk/kafka",
+ kafka_egress_builder(topic="hello", value=seen))
#
# build the invocation
[flink-statefun] 03/04: [hotfix] [py] Rename kafka_egress_builder
to kafka_egress_record
Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit b9dfe680a07699065d8c8aef6d58616a1aa9a1f7
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Tue Mar 24 12:21:02 2020 +0800
[hotfix] [py] Rename kafka_egress_builder to kafka_egress_record
Renamed since the method doesn't create a builder, but a record
directly.
---
statefun-examples/statefun-python-k8s/main.py | 4 ++--
statefun-python-sdk/statefun/__init__.py | 2 +-
statefun-python-sdk/statefun/core.py | 2 +-
statefun-python-sdk/tests/request_reply_test.py | 6 +++---
4 files changed, 7 insertions(+), 7 deletions(-)
diff --git a/statefun-examples/statefun-python-k8s/main.py b/statefun-examples/statefun-python-k8s/main.py
index 6867dd8..67d3fce 100644
--- a/statefun-examples/statefun-python-k8s/main.py
+++ b/statefun-examples/statefun-python-k8s/main.py
@@ -21,7 +21,7 @@ from messages_pb2 import SeenCount
from statefun import StatefulFunctions
from statefun import RequestReplyHandler
-from statefun import kafka_egress_builder
+from statefun import kafka_egress_record
functions = StatefulFunctions()
@@ -36,7 +36,7 @@ def greet(context, message: LoginEvent):
state.seen += 1
context.state('seen_count').pack(state)
- egress_message = kafka_egress_builder(topic="seen", key=message.user_name, value=state)
+ egress_message = kafka_egress_record(topic="seen", key=message.user_name, value=state)
context.pack_and_send_egress("k8s-demo/greets-egress", egress_message)
diff --git a/statefun-python-sdk/statefun/__init__.py b/statefun-python-sdk/statefun/__init__.py
index cbde207..c0268fa 100644
--- a/statefun-python-sdk/statefun/__init__.py
+++ b/statefun-python-sdk/statefun/__init__.py
@@ -19,4 +19,4 @@
from statefun.core import StatefulFunctions
from statefun.request_reply import RequestReplyHandler
-from statefun.core import kafka_egress_builder
\ No newline at end of file
+from statefun.core import kafka_egress_record
\ No newline at end of file
diff --git a/statefun-python-sdk/statefun/core.py b/statefun-python-sdk/statefun/core.py
index e962c6f..0a80e94 100644
--- a/statefun-python-sdk/statefun/core.py
+++ b/statefun-python-sdk/statefun/core.py
@@ -189,7 +189,7 @@ class StatefulFunctions:
return self.functions[(namespace, type)]
-def kafka_egress_builder(topic: str, value, key: str = None):
+def kafka_egress_record(topic: str, value, key: str = None):
"""
Build a ProtobufMessage that can be emitted to a Protobuf based egress.
diff --git a/statefun-python-sdk/tests/request_reply_test.py b/statefun-python-sdk/tests/request_reply_test.py
index 6c80d00..3e7437a 100644
--- a/statefun-python-sdk/tests/request_reply_test.py
+++ b/statefun-python-sdk/tests/request_reply_test.py
@@ -25,7 +25,7 @@ from google.protobuf.any_pb2 import Any
from tests.examples_pb2 import LoginEvent, SeenCount
from statefun.request_reply_pb2 import ToFunction, FromFunction
from statefun import RequestReplyHandler
-from statefun.core import StatefulFunctions, kafka_egress_builder
+from statefun.core import StatefulFunctions, kafka_egress_record
class InvocationBuilder(object):
@@ -126,9 +126,9 @@ class RequestReplyTestCase(unittest.TestCase):
# kafka egress
context.pack_and_send_egress("sdk/kafka",
- kafka_egress_builder(topic="hello", key=u"hello world", value=seen))
+ kafka_egress_record(topic="hello", key=u"hello world", value=seen))
context.pack_and_send_egress("sdk/kafka",
- kafka_egress_builder(topic="hello", value=seen))
+ kafka_egress_record(topic="hello", value=seen))
#
# build the invocation