You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2021/11/18 19:10:27 UTC

[flink] 03/04: [FLINK-24255][tests] Test environments respect configuration when being instantiated.

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 57253c5fc880fff880526a8954446c8189ac7c72
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Fri Aug 27 02:27:10 2021 +0200

    [FLINK-24255][tests] Test environments respect configuration when being instantiated.
    
    This closes #17240
---
 .../streaming/util/TestStreamEnvironment.java      | 13 ++++++++---
 .../MiniClusterPipelineExecutorServiceLoader.java  | 25 +++++++++++++++++++---
 .../apache/flink/test/util/TestEnvironment.java    |  4 +++-
 3 files changed, 35 insertions(+), 7 deletions(-)

diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
index b59f598..00f5692 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
@@ -48,19 +48,26 @@ public class TestStreamEnvironment extends StreamExecutionEnvironment {
 
     public TestStreamEnvironment(
             MiniCluster miniCluster,
+            Configuration config,
             int parallelism,
             Collection<Path> jarFiles,
             Collection<URL> classPaths) {
         super(
                 new MiniClusterPipelineExecutorServiceLoader(miniCluster),
-                MiniClusterPipelineExecutorServiceLoader.createConfiguration(jarFiles, classPaths),
+                MiniClusterPipelineExecutorServiceLoader.updateConfigurationForMiniCluster(
+                        config, jarFiles, classPaths),
                 null);
 
         setParallelism(parallelism);
     }
 
     public TestStreamEnvironment(MiniCluster miniCluster, int parallelism) {
-        this(miniCluster, parallelism, Collections.emptyList(), Collections.emptyList());
+        this(
+                miniCluster,
+                new Configuration(),
+                parallelism,
+                Collections.emptyList(),
+                Collections.emptyList());
     }
 
     /**
@@ -83,7 +90,7 @@ public class TestStreamEnvironment extends StreamExecutionEnvironment {
                 conf -> {
                     TestStreamEnvironment env =
                             new TestStreamEnvironment(
-                                    miniCluster, parallelism, jarFiles, classpaths);
+                                    miniCluster, conf, parallelism, jarFiles, classpaths);
 
                     randomizeConfiguration(miniCluster, conf);
 
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterPipelineExecutorServiceLoader.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterPipelineExecutorServiceLoader.java
index 9bc2e60..b181d51 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterPipelineExecutorServiceLoader.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterPipelineExecutorServiceLoader.java
@@ -20,6 +20,7 @@ package org.apache.flink.test.util;
 
 import org.apache.flink.api.dag.Pipeline;
 import org.apache.flink.client.deployment.executors.PipelineExecutorUtils;
+import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigUtils;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
@@ -36,6 +37,9 @@ import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.minicluster.MiniClusterJobClient;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.net.MalformedURLException;
 import java.net.URL;
@@ -48,6 +52,10 @@ import java.util.stream.Stream;
  * PipelineExecutors} that use a given {@link MiniCluster}.
  */
 public class MiniClusterPipelineExecutorServiceLoader implements PipelineExecutorServiceLoader {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(MiniClusterPipelineExecutorServiceLoader.class);
+
     public static final String NAME = "minicluster";
 
     private final MiniCluster miniCluster;
@@ -60,9 +68,14 @@ public class MiniClusterPipelineExecutorServiceLoader implements PipelineExecuto
      * Populates a {@link Configuration} that is compatible with this {@link
      * MiniClusterPipelineExecutorServiceLoader}.
      */
-    public static Configuration createConfiguration(
-            Collection<Path> jarFiles, Collection<URL> classPaths) {
-        Configuration config = new Configuration();
+    public static Configuration updateConfigurationForMiniCluster(
+            Configuration config, Collection<Path> jarFiles, Collection<URL> classPaths) {
+
+        checkOverridesOption(config, PipelineOptions.JARS);
+        checkOverridesOption(config, PipelineOptions.CLASSPATHS);
+        checkOverridesOption(config, DeploymentOptions.TARGET);
+        checkOverridesOption(config, DeploymentOptions.ATTACHED);
+
         ConfigUtils.encodeCollectionToConfig(
                 config,
                 PipelineOptions.JARS,
@@ -75,6 +88,12 @@ public class MiniClusterPipelineExecutorServiceLoader implements PipelineExecuto
         return config;
     }
 
+    private static void checkOverridesOption(Configuration config, ConfigOption<?> option) {
+        if (config.contains(option)) {
+            LOG.warn("Overriding config setting '{}' for MiniCluster.", option.key());
+        }
+    }
+
     private static String getAbsoluteURL(Path path) {
         FileSystem fs;
         try {
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
index ac7c311..b63d9c4 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
@@ -21,6 +21,7 @@ package org.apache.flink.test.util;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.ExecutionEnvironmentFactory;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.util.Preconditions;
@@ -46,7 +47,8 @@ public class TestEnvironment extends ExecutionEnvironment {
             Collection<URL> classPaths) {
         super(
                 new MiniClusterPipelineExecutorServiceLoader(miniCluster),
-                MiniClusterPipelineExecutorServiceLoader.createConfiguration(jarFiles, classPaths),
+                MiniClusterPipelineExecutorServiceLoader.updateConfigurationForMiniCluster(
+                        new Configuration(), jarFiles, classPaths),
                 null);
 
         this.miniCluster = Preconditions.checkNotNull(miniCluster);