You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/09/17 07:40:53 UTC

[flink-statefun] 01/03: [FLINK-19256] [core] Move Flink config validation out of StatefulFunctionsConfig

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 97c7cf560e740af173c40ec92f07f9fe8f811c96
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed Sep 16 14:33:07 2020 +0800

    [FLINK-19256] [core] Move Flink config validation out of StatefulFunctionsConfig
    
    Instead of always validating the Flink configuration when creating a
    StatefulFunctionsConfig, we now validate it only in
    StatefulFunctionsJob.
---
 .../flink/core/ReflectiveFlinkConfigExtractor.java | 43 ++++++++++++++++++++++
 .../flink/core/StatefulFunctionsConfig.java        | 18 ++-------
 .../statefun/flink/core/StatefulFunctionsJob.java  |  8 +++-
 3 files changed, 53 insertions(+), 16 deletions(-)

diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/ReflectiveFlinkConfigExtractor.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/ReflectiveFlinkConfigExtractor.java
new file mode 100644
index 0000000..72bc44c
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/ReflectiveFlinkConfigExtractor.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+class ReflectiveFlinkConfigExtractor {
+
+  static Configuration extractFromEnv(StreamExecutionEnvironment env) {
+    try {
+      return (Configuration) getConfigurationMethod().invoke(env);
+    } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
+      throw new RuntimeException(
+          "Failed to acquire the Flink configuration from the current environment", e);
+    }
+  }
+
+  private static Method getConfigurationMethod() throws NoSuchMethodException {
+    Method getConfiguration =
+        StreamExecutionEnvironment.class.getDeclaredMethod("getConfiguration");
+    getConfiguration.setAccessible(true);
+    return getConfiguration;
+  }
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java
index ae338f3..9c96bce 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java
@@ -21,8 +21,6 @@ import static org.apache.flink.configuration.description.TextElement.code;
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -106,20 +104,12 @@ public class StatefulFunctionsConfig implements Serializable {
    * current environment set via the {@code flink-conf.yaml}.
    */
   public static StatefulFunctionsConfig fromEnvironment(StreamExecutionEnvironment env) {
-    Configuration configuration = getConfiguration(env);
+    Configuration configuration = ReflectiveFlinkConfigExtractor.extractFromEnv(env);
     return new StatefulFunctionsConfig(configuration);
   }
 
-  private static Configuration getConfiguration(StreamExecutionEnvironment env) {
-    try {
-      Method getConfiguration =
-          StreamExecutionEnvironment.class.getDeclaredMethod("getConfiguration");
-      getConfiguration.setAccessible(true);
-      return (Configuration) getConfiguration.invoke(env);
-    } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
-      throw new RuntimeException(
-          "Failed to acquire the Flink configuration from the current environment", e);
-    }
+  public static StatefulFunctionsConfig fromFlinkConfiguration(Configuration flinkConfiguration) {
+    return new StatefulFunctionsConfig(flinkConfiguration);
   }
 
   private MessageFactoryType factoryType;
@@ -142,8 +132,6 @@ public class StatefulFunctionsConfig implements Serializable {
    * @param configuration a configuration to read the values from
    */
   public StatefulFunctionsConfig(Configuration configuration) {
-    StatefulFunctionsConfigValidator.validate(configuration);
-
     this.factoryType = configuration.get(USER_MESSAGE_SERIALIZER);
     this.flinkJobName = configuration.get(FLINK_JOB_NAME);
     this.feedbackBufferSize = configuration.get(TOTAL_MEMORY_USED_FOR_FEEDBACK_CHECKPOINTING);
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java
index 2398cda..803bde4 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java
@@ -22,6 +22,7 @@ import java.net.URLClassLoader;
 import java.util.Map;
 import java.util.Objects;
 import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
 import org.apache.flink.statefun.flink.core.translation.FlinkUniverse;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -34,7 +35,12 @@ public class StatefulFunctionsJob {
     Map<String, String> globalConfigurations = parameterTool.toMap();
 
     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-    StatefulFunctionsConfig stateFunConfig = StatefulFunctionsConfig.fromEnvironment(env);
+
+    Configuration flinkConfig = ReflectiveFlinkConfigExtractor.extractFromEnv(env);
+    StatefulFunctionsConfigValidator.validate(flinkConfig);
+
+    StatefulFunctionsConfig stateFunConfig =
+        StatefulFunctionsConfig.fromFlinkConfiguration(flinkConfig);
     stateFunConfig.addAllGlobalConfigurations(globalConfigurations);
     stateFunConfig.setProvider(new StatefulFunctionsUniverses.ClassPathUniverseProvider());