You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/03/04 04:54:50 UTC
[flink-statefun] 02/02: [FLINK-16326] [harness] Adapt Harness to
use valid Flink configurations
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit 7b0970812f69d2140cd5a3c5087ff399f57a6487
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Tue Mar 3 18:12:56 2020 +0800
[FLINK-16326] [harness] Adapt Harness to use valid Flink configurations
This closes #47.
---
.../flink/core/StatefulFunctionsConfig.java | 5 ----
.../flink/statefun/flink/harness/Harness.java | 27 +++++++++++++++++-----
2 files changed, 21 insertions(+), 11 deletions(-)
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java
index 7486fa3..293f917 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java
@@ -154,11 +154,6 @@ public class StatefulFunctionsConfig implements Serializable {
}
}
- /** Create a new configuration object with default values. */
- public StatefulFunctionsConfig() {
- this(new Configuration());
- }
-
/** Returns the factory type used to serialize messages. */
public MessageFactoryType getFactoryType() {
return factoryType;
diff --git a/statefun-flink/statefun-flink-harness/src/main/java/org/apache/flink/statefun/flink/harness/Harness.java b/statefun-flink/statefun-flink-harness/src/main/java/org/apache/flink/statefun/flink/harness/Harness.java
index 3d1e23f..6fe3856 100644
--- a/statefun-flink/statefun-flink-harness/src/main/java/org/apache/flink/statefun/flink/harness/Harness.java
+++ b/statefun-flink/statefun-flink-harness/src/main/java/org/apache/flink/statefun/flink/harness/Harness.java
@@ -21,7 +21,9 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig;
+import org.apache.flink.statefun.flink.core.StatefulFunctionsConfigValidator;
import org.apache.flink.statefun.flink.core.StatefulFunctionsJob;
import org.apache.flink.statefun.flink.core.StatefulFunctionsUniverse;
import org.apache.flink.statefun.flink.core.StatefulFunctionsUniverseProvider;
@@ -36,19 +38,19 @@ import org.apache.flink.statefun.sdk.io.EgressIdentifier;
import org.apache.flink.statefun.sdk.io.EgressSpec;
import org.apache.flink.statefun.sdk.io.IngressIdentifier;
import org.apache.flink.statefun.sdk.io.IngressSpec;
+import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
public class Harness {
- private final StatefulFunctionsConfig stateFunConfig;
-
private final Configuration flinkConfig;
+ private final Map<String, String> globalConfigurations = new HashMap<>();
+
private final Map<IngressIdentifier<?>, IngressSpec<?>> overrideIngress = new HashMap<>();
private final Map<EgressIdentifier<?>, EgressSpec<?>> overrideEgress = new HashMap<>();
public Harness() {
- stateFunConfig = new StatefulFunctionsConfig();
flinkConfig = new Configuration();
}
@@ -83,13 +85,14 @@ public class Harness {
}
public Harness withKryoMessageSerializer() {
- stateFunConfig.setFactoryType(MessageFactoryType.WITH_KRYO_PAYLOADS);
+ flinkConfig.set(
+ StatefulFunctionsConfig.USER_MESSAGE_SERIALIZER, MessageFactoryType.WITH_KRYO_PAYLOADS);
return this;
}
/** Set the name used in the Flink UI. */
public Harness withFlinkJobName(String flinkJobName) {
- stateFunConfig.setFlinkJobName(flinkJobName);
+ flinkConfig.set(StatefulFunctionsConfig.FLINK_JOB_NAME, flinkJobName);
return this;
}
@@ -104,18 +107,21 @@ public class Harness {
* org.apache.flink.statefun.sdk.spi.StatefulFunctionModule} on configure.
*/
public Harness withGlobalConfiguration(String key, String value) {
- stateFunConfig.setGlobalConfiguration(key, value);
+ globalConfigurations.put(key, value);
return this;
}
public void start() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ configureStrictlyRequiredFlinkConfigs(flinkConfig);
// Configure will change the value of a setting only if a corresponding option was set in the
// underlying configuration. If a key is not present, the current value of a field will remain
// untouched.
env.configure(flinkConfig, Thread.currentThread().getContextClassLoader());
+ StatefulFunctionsConfig stateFunConfig = new StatefulFunctionsConfig(flinkConfig);
+ stateFunConfig.addAllGlobalConfigurations(globalConfigurations);
stateFunConfig.setProvider(new HarnessProvider(overrideIngress, overrideEgress));
StatefulFunctionsJob.main(env, stateFunConfig);
}
@@ -153,4 +159,13 @@ public class Harness {
System.out.println(t);
}
}
+
+ private static void configureStrictlyRequiredFlinkConfigs(Configuration flinkConfig) {
+ flinkConfig.set(
+ CoreOptions.ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL,
+ String.join(";", StatefulFunctionsConfigValidator.PARENT_FIRST_CLASSLOADER_PATTERNS));
+ flinkConfig.set(
+ ExecutionCheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS,
+ StatefulFunctionsConfigValidator.MAX_CONCURRENT_CHECKPOINTS);
+ }
}