You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/09/17 07:40:54 UTC

[flink-statefun] 02/03: [FLINK-19256] [datastream] Improve DataStream API example

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 6b097054147d7cd4a0a32267768867e0e69d675c
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed Sep 16 14:34:38 2020 +0800

    [FLINK-19256] [datastream] Improve DataStream API example
---
 .../flink/statefun/examples/datastream/Example.java     | 17 ++---------------
 1 file changed, 2 insertions(+), 15 deletions(-)

diff --git a/statefun-examples/statefun-flink-datastream-example/src/main/java/org/apache/flink/statefun/examples/datastream/Example.java b/statefun-examples/statefun-flink-datastream-example/src/main/java/org/apache/flink/statefun/examples/datastream/Example.java
index f850642..76ffde5 100644
--- a/statefun-examples/statefun-flink-datastream-example/src/main/java/org/apache/flink/statefun/examples/datastream/Example.java
+++ b/statefun-examples/statefun-flink-datastream-example/src/main/java/org/apache/flink/statefun/examples/datastream/Example.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.statefun.examples.datastream;
 
-import static org.apache.flink.configuration.CoreOptions.ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL;
-import static org.apache.flink.statefun.flink.core.StatefulFunctionsConfig.USER_MESSAGE_SERIALIZER;
 import static org.apache.flink.statefun.flink.datastream.RequestReplyFunctionBuilder.requestReplyFunctionBuilder;
 
 import java.net.URI;
@@ -27,7 +25,6 @@ import java.time.Duration;
 import java.util.concurrent.ThreadLocalRandom;
 import javax.annotation.Nullable;
 import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig;
 import org.apache.flink.statefun.flink.core.message.MessageFactoryType;
 import org.apache.flink.statefun.flink.core.message.RoutableMessage;
@@ -55,22 +52,12 @@ public class Example {
   public static void main(String... args) throws Exception {
 
     // -----------------------------------------------------------------------------------------
-    // set stateful function related configuration in flink-conf.yaml
-    // -----------------------------------------------------------------------------------------
-
-    Configuration configuration = new Configuration();
-    configuration.set(USER_MESSAGE_SERIALIZER, MessageFactoryType.WITH_KRYO_PAYLOADS);
-    configuration.set(
-        ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL,
-        "org.apache.flink.statefun;org.apache.kafka;com.google.protobuf");
-
-    StatefulFunctionsConfig statefunConfig = new StatefulFunctionsConfig(configuration);
-
-    // -----------------------------------------------------------------------------------------
     // obtain the stream execution env and create some data streams
     // -----------------------------------------------------------------------------------------
 
     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+    StatefulFunctionsConfig statefunConfig = StatefulFunctionsConfig.fromEnvironment(env);
+    statefunConfig.setFactoryType(MessageFactoryType.WITH_KRYO_PAYLOADS);
 
     DataStream<RoutableMessage> names =
         env.addSource(new NameSource())