You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2021/11/18 19:10:27 UTC
[flink] 03/04: [FLINK-24255][tests] Test environments respect configuration when being instantiated.
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 57253c5fc880fff880526a8954446c8189ac7c72
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Fri Aug 27 02:27:10 2021 +0200
[FLINK-24255][tests] Test environments respect configuration when being instantiated.
This closes #17240
---
.../streaming/util/TestStreamEnvironment.java | 13 ++++++++---
.../MiniClusterPipelineExecutorServiceLoader.java | 25 +++++++++++++++++++---
.../apache/flink/test/util/TestEnvironment.java | 4 +++-
3 files changed, 35 insertions(+), 7 deletions(-)
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
index b59f598..00f5692 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
@@ -48,19 +48,26 @@ public class TestStreamEnvironment extends StreamExecutionEnvironment {
public TestStreamEnvironment(
MiniCluster miniCluster,
+ Configuration config,
int parallelism,
Collection<Path> jarFiles,
Collection<URL> classPaths) {
super(
new MiniClusterPipelineExecutorServiceLoader(miniCluster),
- MiniClusterPipelineExecutorServiceLoader.createConfiguration(jarFiles, classPaths),
+ MiniClusterPipelineExecutorServiceLoader.updateConfigurationForMiniCluster(
+ config, jarFiles, classPaths),
null);
setParallelism(parallelism);
}
public TestStreamEnvironment(MiniCluster miniCluster, int parallelism) {
- this(miniCluster, parallelism, Collections.emptyList(), Collections.emptyList());
+ this(
+ miniCluster,
+ new Configuration(),
+ parallelism,
+ Collections.emptyList(),
+ Collections.emptyList());
}
/**
@@ -83,7 +90,7 @@ public class TestStreamEnvironment extends StreamExecutionEnvironment {
conf -> {
TestStreamEnvironment env =
new TestStreamEnvironment(
- miniCluster, parallelism, jarFiles, classpaths);
+ miniCluster, conf, parallelism, jarFiles, classpaths);
randomizeConfiguration(miniCluster, conf);
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterPipelineExecutorServiceLoader.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterPipelineExecutorServiceLoader.java
index 9bc2e60..b181d51 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterPipelineExecutorServiceLoader.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterPipelineExecutorServiceLoader.java
@@ -20,6 +20,7 @@ package org.apache.flink.test.util;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.client.deployment.executors.PipelineExecutorUtils;
+import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
@@ -36,6 +37,9 @@ import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterJobClient;
import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
@@ -48,6 +52,10 @@ import java.util.stream.Stream;
* PipelineExecutors} that use a given {@link MiniCluster}.
*/
public class MiniClusterPipelineExecutorServiceLoader implements PipelineExecutorServiceLoader {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(MiniClusterPipelineExecutorServiceLoader.class);
+
public static final String NAME = "minicluster";
private final MiniCluster miniCluster;
@@ -60,9 +68,14 @@ public class MiniClusterPipelineExecutorServiceLoader implements PipelineExecuto
* Populates a {@link Configuration} that is compatible with this {@link
* MiniClusterPipelineExecutorServiceLoader}.
*/
- public static Configuration createConfiguration(
- Collection<Path> jarFiles, Collection<URL> classPaths) {
- Configuration config = new Configuration();
+ public static Configuration updateConfigurationForMiniCluster(
+ Configuration config, Collection<Path> jarFiles, Collection<URL> classPaths) {
+
+ checkOverridesOption(config, PipelineOptions.JARS);
+ checkOverridesOption(config, PipelineOptions.CLASSPATHS);
+ checkOverridesOption(config, DeploymentOptions.TARGET);
+ checkOverridesOption(config, DeploymentOptions.ATTACHED);
+
ConfigUtils.encodeCollectionToConfig(
config,
PipelineOptions.JARS,
@@ -75,6 +88,12 @@ public class MiniClusterPipelineExecutorServiceLoader implements PipelineExecuto
return config;
}
+ private static void checkOverridesOption(Configuration config, ConfigOption<?> option) {
+ if (config.contains(option)) {
+ LOG.warn("Overriding config setting '{}' for MiniCluster.", option.key());
+ }
+ }
+
private static String getAbsoluteURL(Path path) {
FileSystem fs;
try {
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
index ac7c311..b63d9c4 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
@@ -21,6 +21,7 @@ package org.apache.flink.test.util;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.ExecutionEnvironmentFactory;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.util.Preconditions;
@@ -46,7 +47,8 @@ public class TestEnvironment extends ExecutionEnvironment {
Collection<URL> classPaths) {
super(
new MiniClusterPipelineExecutorServiceLoader(miniCluster),
- MiniClusterPipelineExecutorServiceLoader.createConfiguration(jarFiles, classPaths),
+ MiniClusterPipelineExecutorServiceLoader.updateConfigurationForMiniCluster(
+ new Configuration(), jarFiles, classPaths),
null);
this.miniCluster = Preconditions.checkNotNull(miniCluster);