You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/03/24 04:26:59 UTC
[flink-statefun] branch master updated (6cbef16 -> 34bcee8)
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git.
from 6cbef16 [release] Update version to 2.1-SNAPSHOT
new a1085a7 [FLINK-16723] Move python-k8s example to statefun-examples
new d74c1b2 [FLINK-16569] [py] Allow empty keys in kafka_egress_builder method
new b92f6d1 [hotfix] [py] Rename kafka_egress_builder to kafka_egress_record
new 34bcee8 [hotfix] [kafka] Non-set Kafka keys should be empty strings
The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
.../statefun-python-k8s}/Dockerfile.python-worker | 0
.../statefun-python-k8s}/Dockerfile.statefun | 0
.../k8s => statefun-examples/statefun-python-k8s}/README.md | 0
.../statefun-python-k8s}/build-example.sh | 2 +-
.../statefun-python-k8s}/event-generator.py | 0
.../k8s => statefun-examples/statefun-python-k8s}/main.py | 4 ++--
.../k8s => statefun-examples/statefun-python-k8s}/messages.proto | 0
.../statefun-python-k8s}/messages_pb2.py | 0
.../k8s => statefun-examples/statefun-python-k8s}/module.yaml | 0
.../statefun-python-k8s}/requirements.txt | 0
.../statefun-python-k8s}/resources/Chart.yaml | 0
.../statefun-python-k8s}/resources/templates/config-map.yaml | 0
.../resources/templates/master-deployment.yaml | 0
.../resources/templates/master-rest-service.yaml | 0
.../statefun-python-k8s}/resources/templates/master-service.yaml | 0
.../resources/templates/python-worker-deployment.yaml | 0
.../resources/templates/python-worker-service.yaml | 0
.../resources/templates/worker-deployment.yaml | 0
.../statefun-python-k8s}/resources/values.yaml | 0
.../statefun/flink/io/kafka/GenericKafkaEgressSerializer.java | 2 +-
statefun-python-sdk/statefun/__init__.py | 2 +-
statefun-python-sdk/statefun/core.py | 9 ++++-----
statefun-python-sdk/tests/request_reply_test.py | 6 ++++--
23 files changed, 13 insertions(+), 12 deletions(-)
rename {statefun-python-sdk/examples/k8s => statefun-examples/statefun-python-k8s}/Dockerfile.python-worker (100%)
rename {statefun-python-sdk/examples/k8s => statefun-examples/statefun-python-k8s}/Dockerfile.statefun (100%)
rename {statefun-python-sdk/examples/k8s => statefun-examples/statefun-python-k8s}/README.md (100%)
rename {statefun-python-sdk/examples/k8s => statefun-examples/statefun-python-k8s}/build-example.sh (93%)
rename {statefun-python-sdk/examples/k8s => statefun-examples/statefun-python-k8s}/event-generator.py (100%)
rename {statefun-python-sdk/examples/k8s => statefun-examples/statefun-python-k8s}/main.py (93%)
rename {statefun-python-sdk/examples/k8s => statefun-examples/statefun-python-k8s}/messages.proto (100%)
rename {statefun-python-sdk/examples/k8s => statefun-examples/statefun-python-k8s}/messages_pb2.py (100%)
rename {statefun-python-sdk/examples/k8s => statefun-examples/statefun-python-k8s}/module.yaml (100%)
rename {statefun-python-sdk/examples/k8s => statefun-examples/statefun-python-k8s}/requirements.txt (100%)
rename {statefun-python-sdk/examples/k8s => statefun-examples/statefun-python-k8s}/resources/Chart.yaml (100%)
rename {statefun-python-sdk/examples/k8s => statefun-examples/statefun-python-k8s}/resources/templates/config-map.yaml (100%)
rename {statefun-python-sdk/examples/k8s => statefun-examples/statefun-python-k8s}/resources/templates/master-deployment.yaml (100%)
rename {statefun-python-sdk/examples/k8s => statefun-examples/statefun-python-k8s}/resources/templates/master-rest-service.yaml (100%)
rename {statefun-python-sdk/examples/k8s => statefun-examples/statefun-python-k8s}/resources/templates/master-service.yaml (100%)
rename {statefun-python-sdk/examples/k8s => statefun-examples/statefun-python-k8s}/resources/templates/python-worker-deployment.yaml (100%)
rename {statefun-python-sdk/examples/k8s => statefun-examples/statefun-python-k8s}/resources/templates/python-worker-service.yaml (100%)
rename {statefun-python-sdk/examples/k8s => statefun-examples/statefun-python-k8s}/resources/templates/worker-deployment.yaml (100%)
rename {statefun-python-sdk/examples/k8s => statefun-examples/statefun-python-k8s}/resources/values.yaml (100%)
[flink-statefun] 03/04: [hotfix] [py] Rename kafka_egress_builder
to kafka_egress_record
Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit b92f6d16e38e952f7e70a472fe3e075da0385fdb
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Tue Mar 24 12:21:02 2020 +0800
[hotfix] [py] Rename kafka_egress_builder to kafka_egress_record
Renamed since the method doesn't create a builder, but a record
directly.
---
statefun-examples/statefun-python-k8s/main.py | 4 ++--
statefun-python-sdk/statefun/__init__.py | 2 +-
statefun-python-sdk/statefun/core.py | 2 +-
statefun-python-sdk/tests/request_reply_test.py | 6 +++---
4 files changed, 7 insertions(+), 7 deletions(-)
diff --git a/statefun-examples/statefun-python-k8s/main.py b/statefun-examples/statefun-python-k8s/main.py
index 6867dd8..67d3fce 100644
--- a/statefun-examples/statefun-python-k8s/main.py
+++ b/statefun-examples/statefun-python-k8s/main.py
@@ -21,7 +21,7 @@ from messages_pb2 import SeenCount
from statefun import StatefulFunctions
from statefun import RequestReplyHandler
-from statefun import kafka_egress_builder
+from statefun import kafka_egress_record
functions = StatefulFunctions()
@@ -36,7 +36,7 @@ def greet(context, message: LoginEvent):
state.seen += 1
context.state('seen_count').pack(state)
- egress_message = kafka_egress_builder(topic="seen", key=message.user_name, value=state)
+ egress_message = kafka_egress_record(topic="seen", key=message.user_name, value=state)
context.pack_and_send_egress("k8s-demo/greets-egress", egress_message)
diff --git a/statefun-python-sdk/statefun/__init__.py b/statefun-python-sdk/statefun/__init__.py
index cbde207..c0268fa 100644
--- a/statefun-python-sdk/statefun/__init__.py
+++ b/statefun-python-sdk/statefun/__init__.py
@@ -19,4 +19,4 @@
from statefun.core import StatefulFunctions
from statefun.request_reply import RequestReplyHandler
-from statefun.core import kafka_egress_builder
\ No newline at end of file
+from statefun.core import kafka_egress_record
\ No newline at end of file
diff --git a/statefun-python-sdk/statefun/core.py b/statefun-python-sdk/statefun/core.py
index e962c6f..0a80e94 100644
--- a/statefun-python-sdk/statefun/core.py
+++ b/statefun-python-sdk/statefun/core.py
@@ -189,7 +189,7 @@ class StatefulFunctions:
return self.functions[(namespace, type)]
-def kafka_egress_builder(topic: str, value, key: str = None):
+def kafka_egress_record(topic: str, value, key: str = None):
"""
Build a ProtobufMessage that can be emitted to a Protobuf based egress.
diff --git a/statefun-python-sdk/tests/request_reply_test.py b/statefun-python-sdk/tests/request_reply_test.py
index 6c80d00..3e7437a 100644
--- a/statefun-python-sdk/tests/request_reply_test.py
+++ b/statefun-python-sdk/tests/request_reply_test.py
@@ -25,7 +25,7 @@ from google.protobuf.any_pb2 import Any
from tests.examples_pb2 import LoginEvent, SeenCount
from statefun.request_reply_pb2 import ToFunction, FromFunction
from statefun import RequestReplyHandler
-from statefun.core import StatefulFunctions, kafka_egress_builder
+from statefun.core import StatefulFunctions, kafka_egress_record
class InvocationBuilder(object):
@@ -126,9 +126,9 @@ class RequestReplyTestCase(unittest.TestCase):
# kafka egress
context.pack_and_send_egress("sdk/kafka",
- kafka_egress_builder(topic="hello", key=u"hello world", value=seen))
+ kafka_egress_record(topic="hello", key=u"hello world", value=seen))
context.pack_and_send_egress("sdk/kafka",
- kafka_egress_builder(topic="hello", value=seen))
+ kafka_egress_record(topic="hello", value=seen))
#
# build the invocation
[flink-statefun] 04/04: [hotfix] [kafka] Non-set Kafka keys should
be empty strings
Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit 34bcee8ffe5dff7c52ab0551021c82334fb15e98
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Tue Mar 24 12:23:04 2020 +0800
[hotfix] [kafka] Non-set Kafka keys should be empty strings
---
.../flink/statefun/flink/io/kafka/GenericKafkaEgressSerializer.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/GenericKafkaEgressSerializer.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/GenericKafkaEgressSerializer.java
index 19af022..e20bdf1 100644
--- a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/GenericKafkaEgressSerializer.java
+++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/GenericKafkaEgressSerializer.java
@@ -61,7 +61,7 @@ public final class GenericKafkaEgressSerializer implements KafkaEgressSerializer
final String topic = protobufProducerRecord.getTopic();
final byte[] valueBytes = protobufProducerRecord.getValueBytes().toByteArray();
- if (key == null) {
+ if (key == null || key.isEmpty()) {
return new ProducerRecord<>(topic, valueBytes);
} else {
return new ProducerRecord<>(topic, key.getBytes(StandardCharsets.UTF_8), valueBytes);
[flink-statefun] 01/04: [FLINK-16723] Move python-k8s example to
statefun-examples
Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit a1085a7ace148dc7a6e19e052fa259b3381cfcb9
Author: Igal Shilman <ig...@gmail.com>
AuthorDate: Mon Mar 23 12:42:58 2020 +0100
[FLINK-16723] Move python-k8s example to statefun-examples
This closes #67.
---
.../statefun-python-k8s}/Dockerfile.python-worker | 0
.../k8s => statefun-examples/statefun-python-k8s}/Dockerfile.statefun | 0
.../examples/k8s => statefun-examples/statefun-python-k8s}/README.md | 0
.../k8s => statefun-examples/statefun-python-k8s}/build-example.sh | 2 +-
.../k8s => statefun-examples/statefun-python-k8s}/event-generator.py | 0
.../examples/k8s => statefun-examples/statefun-python-k8s}/main.py | 0
.../k8s => statefun-examples/statefun-python-k8s}/messages.proto | 0
.../k8s => statefun-examples/statefun-python-k8s}/messages_pb2.py | 0
.../examples/k8s => statefun-examples/statefun-python-k8s}/module.yaml | 0
.../k8s => statefun-examples/statefun-python-k8s}/requirements.txt | 0
.../k8s => statefun-examples/statefun-python-k8s}/resources/Chart.yaml | 0
.../statefun-python-k8s}/resources/templates/config-map.yaml | 0
.../statefun-python-k8s}/resources/templates/master-deployment.yaml | 0
.../statefun-python-k8s}/resources/templates/master-rest-service.yaml | 0
.../statefun-python-k8s}/resources/templates/master-service.yaml | 0
.../resources/templates/python-worker-deployment.yaml | 0
.../statefun-python-k8s}/resources/templates/python-worker-service.yaml | 0
.../statefun-python-k8s}/resources/templates/worker-deployment.yaml | 0
.../k8s => statefun-examples/statefun-python-k8s}/resources/values.yaml | 0
19 files changed, 1 insertion(+), 1 deletion(-)
diff --git a/statefun-python-sdk/examples/k8s/Dockerfile.python-worker b/statefun-examples/statefun-python-k8s/Dockerfile.python-worker
similarity index 100%
rename from statefun-python-sdk/examples/k8s/Dockerfile.python-worker
rename to statefun-examples/statefun-python-k8s/Dockerfile.python-worker
diff --git a/statefun-python-sdk/examples/k8s/Dockerfile.statefun b/statefun-examples/statefun-python-k8s/Dockerfile.statefun
similarity index 100%
rename from statefun-python-sdk/examples/k8s/Dockerfile.statefun
rename to statefun-examples/statefun-python-k8s/Dockerfile.statefun
diff --git a/statefun-python-sdk/examples/k8s/README.md b/statefun-examples/statefun-python-k8s/README.md
similarity index 100%
rename from statefun-python-sdk/examples/k8s/README.md
rename to statefun-examples/statefun-python-k8s/README.md
diff --git a/statefun-python-sdk/examples/k8s/build-example.sh b/statefun-examples/statefun-python-k8s/build-example.sh
similarity index 93%
rename from statefun-python-sdk/examples/k8s/build-example.sh
rename to statefun-examples/statefun-python-k8s/build-example.sh
index 07e5fc0..0ba540d 100755
--- a/statefun-python-sdk/examples/k8s/build-example.sh
+++ b/statefun-examples/statefun-python-k8s/build-example.sh
@@ -26,7 +26,7 @@ rm -f apache_flink_statefun-*-py3-none-any.whl
rm -rf __pycache__
# copy the whl distribution, it must be first built by calling build-distribution.sh
-cp ../../dist/apache_flink_statefun-*-py3-none-any.whl apache_flink_statefun-snapshot-py3-none-any.whl 2>/dev/null
+cp ../../statefun-python-sdk/dist/apache_flink_statefun-*-py3-none-any.whl apache_flink_statefun-snapshot-py3-none-any.whl 2>/dev/null
rc=$?
if [[ ${rc} -ne 0 ]]; then
echo "Failed copying the whl distribution, please build the distribution first by calling ./build-distribution.sh"
diff --git a/statefun-python-sdk/examples/k8s/event-generator.py b/statefun-examples/statefun-python-k8s/event-generator.py
similarity index 100%
rename from statefun-python-sdk/examples/k8s/event-generator.py
rename to statefun-examples/statefun-python-k8s/event-generator.py
diff --git a/statefun-python-sdk/examples/k8s/main.py b/statefun-examples/statefun-python-k8s/main.py
similarity index 100%
rename from statefun-python-sdk/examples/k8s/main.py
rename to statefun-examples/statefun-python-k8s/main.py
diff --git a/statefun-python-sdk/examples/k8s/messages.proto b/statefun-examples/statefun-python-k8s/messages.proto
similarity index 100%
rename from statefun-python-sdk/examples/k8s/messages.proto
rename to statefun-examples/statefun-python-k8s/messages.proto
diff --git a/statefun-python-sdk/examples/k8s/messages_pb2.py b/statefun-examples/statefun-python-k8s/messages_pb2.py
similarity index 100%
rename from statefun-python-sdk/examples/k8s/messages_pb2.py
rename to statefun-examples/statefun-python-k8s/messages_pb2.py
diff --git a/statefun-python-sdk/examples/k8s/module.yaml b/statefun-examples/statefun-python-k8s/module.yaml
similarity index 100%
rename from statefun-python-sdk/examples/k8s/module.yaml
rename to statefun-examples/statefun-python-k8s/module.yaml
diff --git a/statefun-python-sdk/examples/k8s/requirements.txt b/statefun-examples/statefun-python-k8s/requirements.txt
similarity index 100%
rename from statefun-python-sdk/examples/k8s/requirements.txt
rename to statefun-examples/statefun-python-k8s/requirements.txt
diff --git a/statefun-python-sdk/examples/k8s/resources/Chart.yaml b/statefun-examples/statefun-python-k8s/resources/Chart.yaml
similarity index 100%
rename from statefun-python-sdk/examples/k8s/resources/Chart.yaml
rename to statefun-examples/statefun-python-k8s/resources/Chart.yaml
diff --git a/statefun-python-sdk/examples/k8s/resources/templates/config-map.yaml b/statefun-examples/statefun-python-k8s/resources/templates/config-map.yaml
similarity index 100%
rename from statefun-python-sdk/examples/k8s/resources/templates/config-map.yaml
rename to statefun-examples/statefun-python-k8s/resources/templates/config-map.yaml
diff --git a/statefun-python-sdk/examples/k8s/resources/templates/master-deployment.yaml b/statefun-examples/statefun-python-k8s/resources/templates/master-deployment.yaml
similarity index 100%
rename from statefun-python-sdk/examples/k8s/resources/templates/master-deployment.yaml
rename to statefun-examples/statefun-python-k8s/resources/templates/master-deployment.yaml
diff --git a/statefun-python-sdk/examples/k8s/resources/templates/master-rest-service.yaml b/statefun-examples/statefun-python-k8s/resources/templates/master-rest-service.yaml
similarity index 100%
rename from statefun-python-sdk/examples/k8s/resources/templates/master-rest-service.yaml
rename to statefun-examples/statefun-python-k8s/resources/templates/master-rest-service.yaml
diff --git a/statefun-python-sdk/examples/k8s/resources/templates/master-service.yaml b/statefun-examples/statefun-python-k8s/resources/templates/master-service.yaml
similarity index 100%
rename from statefun-python-sdk/examples/k8s/resources/templates/master-service.yaml
rename to statefun-examples/statefun-python-k8s/resources/templates/master-service.yaml
diff --git a/statefun-python-sdk/examples/k8s/resources/templates/python-worker-deployment.yaml b/statefun-examples/statefun-python-k8s/resources/templates/python-worker-deployment.yaml
similarity index 100%
rename from statefun-python-sdk/examples/k8s/resources/templates/python-worker-deployment.yaml
rename to statefun-examples/statefun-python-k8s/resources/templates/python-worker-deployment.yaml
diff --git a/statefun-python-sdk/examples/k8s/resources/templates/python-worker-service.yaml b/statefun-examples/statefun-python-k8s/resources/templates/python-worker-service.yaml
similarity index 100%
rename from statefun-python-sdk/examples/k8s/resources/templates/python-worker-service.yaml
rename to statefun-examples/statefun-python-k8s/resources/templates/python-worker-service.yaml
diff --git a/statefun-python-sdk/examples/k8s/resources/templates/worker-deployment.yaml b/statefun-examples/statefun-python-k8s/resources/templates/worker-deployment.yaml
similarity index 100%
rename from statefun-python-sdk/examples/k8s/resources/templates/worker-deployment.yaml
rename to statefun-examples/statefun-python-k8s/resources/templates/worker-deployment.yaml
diff --git a/statefun-python-sdk/examples/k8s/resources/values.yaml b/statefun-examples/statefun-python-k8s/resources/values.yaml
similarity index 100%
rename from statefun-python-sdk/examples/k8s/resources/values.yaml
rename to statefun-examples/statefun-python-k8s/resources/values.yaml
[flink-statefun] 02/04: [FLINK-16569] [py] Allow empty keys in
kafka_egress_builder method
Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit d74c1b24da0ab8621924b50756ef6d68e30aafb7
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Tue Mar 24 12:16:15 2020 +0800
[FLINK-16569] [py] Allow empty keys in kafka_egress_builder method
---
statefun-python-sdk/statefun/core.py | 9 ++++-----
statefun-python-sdk/tests/request_reply_test.py | 2 ++
2 files changed, 6 insertions(+), 5 deletions(-)
diff --git a/statefun-python-sdk/statefun/core.py b/statefun-python-sdk/statefun/core.py
index 20fbf88..e962c6f 100644
--- a/statefun-python-sdk/statefun/core.py
+++ b/statefun-python-sdk/statefun/core.py
@@ -189,23 +189,22 @@ class StatefulFunctions:
return self.functions[(namespace, type)]
-def kafka_egress_builder(topic: str, key: str, value):
+def kafka_egress_builder(topic: str, value, key: str = None):
"""
Build a ProtobufMessage that can be emitted to a Protobuf based egress.
:param topic: The kafka detention topic for that record
- :param key: the utf8 encoded string key to produce
+ :param key: the utf8 encoded string key to produce (can be empty)
:param value: the Protobuf value to produce
:return: A Protobuf message represents the record to be produced via the kafka procurer.
"""
if not topic:
raise ValueError("A destination Kafka topic is missing")
- if not key:
- raise ValueError("A key is missing")
if not value:
raise ValueError("Missing value")
record = KafkaProducerRecord()
record.topic = topic
- record.key = key
record.value_bytes = value.SerializeToString()
+ if key is not None:
+ record.key = key
return record
diff --git a/statefun-python-sdk/tests/request_reply_test.py b/statefun-python-sdk/tests/request_reply_test.py
index a749bcd..6c80d00 100644
--- a/statefun-python-sdk/tests/request_reply_test.py
+++ b/statefun-python-sdk/tests/request_reply_test.py
@@ -127,6 +127,8 @@ class RequestReplyTestCase(unittest.TestCase):
# kafka egress
context.pack_and_send_egress("sdk/kafka",
kafka_egress_builder(topic="hello", key=u"hello world", value=seen))
+ context.pack_and_send_egress("sdk/kafka",
+ kafka_egress_builder(topic="hello", value=seen))
#
# build the invocation