You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/09/17 07:40:54 UTC
[flink-statefun] 02/03: [FLINK-19256] [datastream] Improve
DataStream API example
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit 6b097054147d7cd4a0a32267768867e0e69d675c
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed Sep 16 14:34:38 2020 +0800
[FLINK-19256] [datastream] Improve DataStream API example
---
.../flink/statefun/examples/datastream/Example.java | 17 ++---------------
1 file changed, 2 insertions(+), 15 deletions(-)
diff --git a/statefun-examples/statefun-flink-datastream-example/src/main/java/org/apache/flink/statefun/examples/datastream/Example.java b/statefun-examples/statefun-flink-datastream-example/src/main/java/org/apache/flink/statefun/examples/datastream/Example.java
index f850642..76ffde5 100644
--- a/statefun-examples/statefun-flink-datastream-example/src/main/java/org/apache/flink/statefun/examples/datastream/Example.java
+++ b/statefun-examples/statefun-flink-datastream-example/src/main/java/org/apache/flink/statefun/examples/datastream/Example.java
@@ -18,8 +18,6 @@
package org.apache.flink.statefun.examples.datastream;
-import static org.apache.flink.configuration.CoreOptions.ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL;
-import static org.apache.flink.statefun.flink.core.StatefulFunctionsConfig.USER_MESSAGE_SERIALIZER;
import static org.apache.flink.statefun.flink.datastream.RequestReplyFunctionBuilder.requestReplyFunctionBuilder;
import java.net.URI;
@@ -27,7 +25,6 @@ import java.time.Duration;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig;
import org.apache.flink.statefun.flink.core.message.MessageFactoryType;
import org.apache.flink.statefun.flink.core.message.RoutableMessage;
@@ -55,22 +52,12 @@ public class Example {
public static void main(String... args) throws Exception {
// -----------------------------------------------------------------------------------------
- // set stateful function related configuration in flink-conf.yaml
- // -----------------------------------------------------------------------------------------
-
- Configuration configuration = new Configuration();
- configuration.set(USER_MESSAGE_SERIALIZER, MessageFactoryType.WITH_KRYO_PAYLOADS);
- configuration.set(
- ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL,
- "org.apache.flink.statefun;org.apache.kafka;com.google.protobuf");
-
- StatefulFunctionsConfig statefunConfig = new StatefulFunctionsConfig(configuration);
-
- // -----------------------------------------------------------------------------------------
// obtain the stream execution env and create some data streams
// -----------------------------------------------------------------------------------------
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ StatefulFunctionsConfig statefunConfig = StatefulFunctionsConfig.fromEnvironment(env);
+ statefunConfig.setFactoryType(MessageFactoryType.WITH_KRYO_PAYLOADS);
DataStream<RoutableMessage> names =
env.addSource(new NameSource())