You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/03/04 04:54:50 UTC

[flink-statefun] 02/02: [FLINK-16326] [harness] Adapt Harness to use valid Flink configurations

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 7b0970812f69d2140cd5a3c5087ff399f57a6487
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Tue Mar 3 18:12:56 2020 +0800

    [FLINK-16326] [harness] Adapt Harness to use valid Flink configurations
    
    This closes #47.
---
 .../flink/core/StatefulFunctionsConfig.java        |  5 ----
 .../flink/statefun/flink/harness/Harness.java      | 27 +++++++++++++++++-----
 2 files changed, 21 insertions(+), 11 deletions(-)

diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java
index 7486fa3..293f917 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java
@@ -154,11 +154,6 @@ public class StatefulFunctionsConfig implements Serializable {
     }
   }
 
-  /** Create a new configuration object with default values. */
-  public StatefulFunctionsConfig() {
-    this(new Configuration());
-  }
-
   /** Returns the factory type used to serialize messages. */
   public MessageFactoryType getFactoryType() {
     return factoryType;
diff --git a/statefun-flink/statefun-flink-harness/src/main/java/org/apache/flink/statefun/flink/harness/Harness.java b/statefun-flink/statefun-flink-harness/src/main/java/org/apache/flink/statefun/flink/harness/Harness.java
index 3d1e23f..6fe3856 100644
--- a/statefun-flink/statefun-flink-harness/src/main/java/org/apache/flink/statefun/flink/harness/Harness.java
+++ b/statefun-flink/statefun-flink-harness/src/main/java/org/apache/flink/statefun/flink/harness/Harness.java
@@ -21,7 +21,9 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig;
+import org.apache.flink.statefun.flink.core.StatefulFunctionsConfigValidator;
 import org.apache.flink.statefun.flink.core.StatefulFunctionsJob;
 import org.apache.flink.statefun.flink.core.StatefulFunctionsUniverse;
 import org.apache.flink.statefun.flink.core.StatefulFunctionsUniverseProvider;
@@ -36,19 +38,19 @@ import org.apache.flink.statefun.sdk.io.EgressIdentifier;
 import org.apache.flink.statefun.sdk.io.EgressSpec;
 import org.apache.flink.statefun.sdk.io.IngressIdentifier;
 import org.apache.flink.statefun.sdk.io.IngressSpec;
+import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 
 public class Harness {
-  private final StatefulFunctionsConfig stateFunConfig;
-
   private final Configuration flinkConfig;
 
+  private final Map<String, String> globalConfigurations = new HashMap<>();
+
   private final Map<IngressIdentifier<?>, IngressSpec<?>> overrideIngress = new HashMap<>();
   private final Map<EgressIdentifier<?>, EgressSpec<?>> overrideEgress = new HashMap<>();
 
   public Harness() {
-    stateFunConfig = new StatefulFunctionsConfig();
     flinkConfig = new Configuration();
   }
 
@@ -83,13 +85,14 @@ public class Harness {
   }
 
   public Harness withKryoMessageSerializer() {
-    stateFunConfig.setFactoryType(MessageFactoryType.WITH_KRYO_PAYLOADS);
+    flinkConfig.set(
+        StatefulFunctionsConfig.USER_MESSAGE_SERIALIZER, MessageFactoryType.WITH_KRYO_PAYLOADS);
     return this;
   }
 
   /** Set the name used in the Flink UI. */
   public Harness withFlinkJobName(String flinkJobName) {
-    stateFunConfig.setFlinkJobName(flinkJobName);
+    flinkConfig.set(StatefulFunctionsConfig.FLINK_JOB_NAME, flinkJobName);
     return this;
   }
 
@@ -104,18 +107,21 @@ public class Harness {
    * org.apache.flink.statefun.sdk.spi.StatefulFunctionModule} on configure.
    */
   public Harness withGlobalConfiguration(String key, String value) {
-    stateFunConfig.setGlobalConfiguration(key, value);
+    globalConfigurations.put(key, value);
     return this;
   }
 
   public void start() throws Exception {
     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
+    configureStrictlyRequiredFlinkConfigs(flinkConfig);
     // Configure will change the value of a setting only if a corresponding option was set in the
     // underlying configuration. If a key is not present, the current value of a field will remain
     // untouched.
     env.configure(flinkConfig, Thread.currentThread().getContextClassLoader());
 
+    StatefulFunctionsConfig stateFunConfig = new StatefulFunctionsConfig(flinkConfig);
+    stateFunConfig.addAllGlobalConfigurations(globalConfigurations);
     stateFunConfig.setProvider(new HarnessProvider(overrideIngress, overrideEgress));
     StatefulFunctionsJob.main(env, stateFunConfig);
   }
@@ -153,4 +159,13 @@ public class Harness {
       System.out.println(t);
     }
   }
+
+  private static void configureStrictlyRequiredFlinkConfigs(Configuration flinkConfig) {
+    flinkConfig.set(
+        CoreOptions.ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL,
+        String.join(";", StatefulFunctionsConfigValidator.PARENT_FIRST_CLASSLOADER_PATTERNS));
+    flinkConfig.set(
+        ExecutionCheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS,
+        StatefulFunctionsConfigValidator.MAX_CONCURRENT_CHECKPOINTS);
+  }
 }