You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/09/17 07:40:53 UTC
[flink-statefun] 01/03: [FLINK-19256] [core] Move Flink config
validation out of StatefulFunctionsConfig
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit 97c7cf560e740af173c40ec92f07f9fe8f811c96
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed Sep 16 14:33:07 2020 +0800
[FLINK-19256] [core] Move Flink config validation out of StatefulFunctionsConfig
Instead of always validating the Flink configuration when creating a
StatefulFunctionsConfig, we now validate it only in
StatefulFunctionsJob.
---
.../flink/core/ReflectiveFlinkConfigExtractor.java | 43 ++++++++++++++++++++++
.../flink/core/StatefulFunctionsConfig.java | 18 ++-------
.../statefun/flink/core/StatefulFunctionsJob.java | 8 +++-
3 files changed, 53 insertions(+), 16 deletions(-)
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/ReflectiveFlinkConfigExtractor.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/ReflectiveFlinkConfigExtractor.java
new file mode 100644
index 0000000..72bc44c
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/ReflectiveFlinkConfigExtractor.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+class ReflectiveFlinkConfigExtractor {
+
+ static Configuration extractFromEnv(StreamExecutionEnvironment env) {
+ try {
+ return (Configuration) getConfigurationMethod().invoke(env);
+ } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
+ throw new RuntimeException(
+ "Failed to acquire the Flink configuration from the current environment", e);
+ }
+ }
+
+ private static Method getConfigurationMethod() throws NoSuchMethodException {
+ Method getConfiguration =
+ StreamExecutionEnvironment.class.getDeclaredMethod("getConfiguration");
+ getConfiguration.setAccessible(true);
+ return getConfiguration;
+ }
+}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java
index ae338f3..9c96bce 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java
@@ -21,8 +21,6 @@ import static org.apache.flink.configuration.description.TextElement.code;
import java.io.IOException;
import java.io.Serializable;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -106,20 +104,12 @@ public class StatefulFunctionsConfig implements Serializable {
* current environment set via the {@code flink-conf.yaml}.
*/
public static StatefulFunctionsConfig fromEnvironment(StreamExecutionEnvironment env) {
- Configuration configuration = getConfiguration(env);
+ Configuration configuration = ReflectiveFlinkConfigExtractor.extractFromEnv(env);
return new StatefulFunctionsConfig(configuration);
}
- private static Configuration getConfiguration(StreamExecutionEnvironment env) {
- try {
- Method getConfiguration =
- StreamExecutionEnvironment.class.getDeclaredMethod("getConfiguration");
- getConfiguration.setAccessible(true);
- return (Configuration) getConfiguration.invoke(env);
- } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
- throw new RuntimeException(
- "Failed to acquire the Flink configuration from the current environment", e);
- }
+ public static StatefulFunctionsConfig fromFlinkConfiguration(Configuration flinkConfiguration) {
+ return new StatefulFunctionsConfig(flinkConfiguration);
}
private MessageFactoryType factoryType;
@@ -142,8 +132,6 @@ public class StatefulFunctionsConfig implements Serializable {
* @param configuration a configuration to read the values from
*/
public StatefulFunctionsConfig(Configuration configuration) {
- StatefulFunctionsConfigValidator.validate(configuration);
-
this.factoryType = configuration.get(USER_MESSAGE_SERIALIZER);
this.flinkJobName = configuration.get(FLINK_JOB_NAME);
this.feedbackBufferSize = configuration.get(TOTAL_MEMORY_USED_FOR_FEEDBACK_CHECKPOINTING);
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java
index 2398cda..803bde4 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java
@@ -22,6 +22,7 @@ import java.net.URLClassLoader;
import java.util.Map;
import java.util.Objects;
import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
import org.apache.flink.statefun.flink.core.translation.FlinkUniverse;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -34,7 +35,12 @@ public class StatefulFunctionsJob {
Map<String, String> globalConfigurations = parameterTool.toMap();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- StatefulFunctionsConfig stateFunConfig = StatefulFunctionsConfig.fromEnvironment(env);
+
+ Configuration flinkConfig = ReflectiveFlinkConfigExtractor.extractFromEnv(env);
+ StatefulFunctionsConfigValidator.validate(flinkConfig);
+
+ StatefulFunctionsConfig stateFunConfig =
+ StatefulFunctionsConfig.fromFlinkConfiguration(flinkConfig);
stateFunConfig.addAllGlobalConfigurations(globalConfigurations);
stateFunConfig.setProvider(new StatefulFunctionsUniverses.ClassPathUniverseProvider());