You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/05/28 08:09:44 UTC

[flink-statefun] 02/02: [FLINK-17963] [core] Revert execution environment patching in StatefulFunctionsJob

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 89fdde49b19919375b74f006b528fb8f3ac76e15
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed May 27 18:11:29 2020 +0800

    [FLINK-17963] [core] Revert execution environment patching in StatefulFunctionsJob
    
    This is a revert of FLINK-16926, which was a temporary workaround for
    the problem back in Flink 1.10.0 where values present in flink-conf.yaml
    were not being respected via the StreamPlanEnvironment, which was used
    when using a JobClusterEntrypoint to start the job.
    
    This closes #118.
---
 .../statefun/flink/core/StatefulFunctionsConfig.java     | 11 -----------
 .../flink/statefun/flink/core/StatefulFunctionsJob.java  | 16 ----------------
 2 files changed, 27 deletions(-)

diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java
index 3487d35..293f917 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java
@@ -120,8 +120,6 @@ public class StatefulFunctionsConfig implements Serializable {
     }
   }
 
-  private final Configuration flinkConfiguration;
-
   private MessageFactoryType factoryType;
 
   private String flinkJobName;
@@ -141,7 +139,6 @@ public class StatefulFunctionsConfig implements Serializable {
    */
   public StatefulFunctionsConfig(Configuration configuration) {
     StatefulFunctionsConfigValidator.validate(configuration);
-    this.flinkConfiguration = configuration;
 
     this.factoryType = configuration.get(USER_MESSAGE_SERIALIZER);
     this.flinkJobName = configuration.get(FLINK_JOB_NAME);
@@ -243,12 +240,4 @@ public class StatefulFunctionsConfig implements Serializable {
   public void setGlobalConfiguration(String key, String value) {
     this.globalConfigurations.put(key, value);
   }
-
-  /**
-   * Returns the underlying Flink configuration that used to initialize this {@link
-   * StatefulFunctionsConfig}.
-   */
-  public Configuration getFlinkConfiguration() {
-    return flinkConfiguration;
-  }
 }
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java
index 25ed940..57ddfc6 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java
@@ -25,7 +25,6 @@ import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
 import org.apache.flink.statefun.flink.core.translation.FlinkUniverse;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.environment.StreamPlanEnvironment;
 
 public class StatefulFunctionsJob {
 
@@ -53,7 +52,6 @@ public class StatefulFunctionsJob {
     Objects.requireNonNull(stateFunConfig);
 
     setDefaultContextClassLoaderIfAbsent();
-    configureExecutionEnvironment(env, stateFunConfig);
 
     env.getConfig().enableObjectReuse();
 
@@ -71,20 +69,6 @@ public class StatefulFunctionsJob {
     env.execute(stateFunConfig.getFlinkJobName());
   }
 
-  private static void configureExecutionEnvironment(
-      StreamExecutionEnvironment env, StatefulFunctionsConfig stateFunConfig) {
-    if (!(env instanceof StreamPlanEnvironment)) {
-      return;
-    }
-    // This is a workaround until FLINK-16560 would be resolved.
-    // When submitting the Job via StatefulFunctionsClusterEntryPoint (an adopted version of a
-    // JobClusterEntryPoint) The resulting StreamExecutionEnvironment is started with an empty
-    // configuration object, hence might miss important config options set at flink-conf.yaml.
-    ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
-    Objects.requireNonNull(contextClassLoader);
-    env.configure(stateFunConfig.getFlinkConfiguration(), contextClassLoader);
-  }
-
   private static void setDefaultContextClassLoaderIfAbsent() {
     ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
     if (classLoader == null) {