You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/05/28 08:09:44 UTC
[flink-statefun] 02/02: [FLINK-17963] [core] Revert execution
environment patching in StatefulFunctionsJob
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit 89fdde49b19919375b74f006b528fb8f3ac76e15
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed May 27 18:11:29 2020 +0800
[FLINK-17963] [core] Revert execution environment patching in StatefulFunctionsJob
This is a revert of FLINK-16926, which was a temporary workaround for
the problem back in Flink 1.10.0 where values present in flink-conf.yaml
were not being respected via the StreamPlanEnvironment, which was used
when using a JobClusterEntrypoint to start the job.
This closes #118.
---
.../statefun/flink/core/StatefulFunctionsConfig.java | 11 -----------
.../flink/statefun/flink/core/StatefulFunctionsJob.java | 16 ----------------
2 files changed, 27 deletions(-)
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java
index 3487d35..293f917 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java
@@ -120,8 +120,6 @@ public class StatefulFunctionsConfig implements Serializable {
}
}
- private final Configuration flinkConfiguration;
-
private MessageFactoryType factoryType;
private String flinkJobName;
@@ -141,7 +139,6 @@ public class StatefulFunctionsConfig implements Serializable {
*/
public StatefulFunctionsConfig(Configuration configuration) {
StatefulFunctionsConfigValidator.validate(configuration);
- this.flinkConfiguration = configuration;
this.factoryType = configuration.get(USER_MESSAGE_SERIALIZER);
this.flinkJobName = configuration.get(FLINK_JOB_NAME);
@@ -243,12 +240,4 @@ public class StatefulFunctionsConfig implements Serializable {
public void setGlobalConfiguration(String key, String value) {
this.globalConfigurations.put(key, value);
}
-
- /**
- * Returns the underlying Flink configuration that used to initialize this {@link
- * StatefulFunctionsConfig}.
- */
- public Configuration getFlinkConfiguration() {
- return flinkConfiguration;
- }
}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java
index 25ed940..57ddfc6 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java
@@ -25,7 +25,6 @@ import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
import org.apache.flink.statefun.flink.core.translation.FlinkUniverse;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.environment.StreamPlanEnvironment;
public class StatefulFunctionsJob {
@@ -53,7 +52,6 @@ public class StatefulFunctionsJob {
Objects.requireNonNull(stateFunConfig);
setDefaultContextClassLoaderIfAbsent();
- configureExecutionEnvironment(env, stateFunConfig);
env.getConfig().enableObjectReuse();
@@ -71,20 +69,6 @@ public class StatefulFunctionsJob {
env.execute(stateFunConfig.getFlinkJobName());
}
- private static void configureExecutionEnvironment(
- StreamExecutionEnvironment env, StatefulFunctionsConfig stateFunConfig) {
- if (!(env instanceof StreamPlanEnvironment)) {
- return;
- }
- // This is a workaround until FLINK-16560 would be resolved.
- // When submitting the Job via StatefulFunctionsClusterEntryPoint (an adopted version of a
- // JobClusterEntryPoint) The resulting StreamExecutionEnvironment is started with an empty
- // configuration object, hence might miss important config options set at flink-conf.yaml.
- ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
- Objects.requireNonNull(contextClassLoader);
- env.configure(stateFunConfig.getFlinkConfiguration(), contextClassLoader);
- }
-
private static void setDefaultContextClassLoaderIfAbsent() {
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
if (classLoader == null) {