You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2022/11/06 20:19:49 UTC

[flink] 04/04: [FLINK-29379][streaming] Migrate isDynamicGraph in ExecutionConfig to Configuration

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 12efbb9c85c605db55a2aea2a7bbea62fa930e1d
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Thu Nov 3 10:43:14 2022 +0100

    [FLINK-29379][streaming] Migrate isDynamicGraph in ExecutionConfig to Configuration
---
 .../apache/flink/api/common/ExecutionConfig.java   | 32 ++++++++++------------
 .../partitioner/StreamPartitionerTestUtils.java    |  9 ++++--
 .../plan/batch/sql/ForwardHashExchangeTest.java    |  3 +-
 .../runtime/batch/ParallelismSettingTest.java      |  3 +-
 .../batch/sql/ForwardHashExchangeITCase.java       |  3 +-
 5 files changed, 27 insertions(+), 23 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 4d618173e58..1e75154edc3 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.common;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
@@ -29,6 +30,7 @@ import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.DescribedEnum;
 import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.JobManagerOptions.SchedulerType;
 import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.configuration.ReadableConfig;
@@ -150,8 +152,6 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
     private RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration =
             new RestartStrategies.FallbackRestartStrategyConfiguration();
 
-    private boolean isDynamicGraph = false;
-
     // ------------------------------- User code values --------------------------------------------
 
     private GlobalJobParameters globalJobParameters = new GlobalJobParameters();
@@ -464,14 +464,20 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
         }
     }
 
+    /**
+     * TODO: this shouldn't exist and shouldn't pollute public API. Tests should change this via
+     * Configuration
+     */
+    @VisibleForTesting
     @Internal
-    public void setDynamicGraph(boolean dynamicGraph) {
-        isDynamicGraph = dynamicGraph;
+    public ExecutionConfig setScheduler(SchedulerType schedulerType) {
+        configuration.set(JobManagerOptions.SCHEDULER, schedulerType);
+        return this;
     }
 
     @Internal
     public boolean isDynamicGraph() {
-        return isDynamicGraph;
+        return configuration.get(JobManagerOptions.SCHEDULER) == SchedulerType.AdaptiveBatch;
     }
 
     /**
@@ -953,8 +959,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
                             other.registeredTypesWithKryoSerializerClasses)
                     && defaultKryoSerializerClasses.equals(other.defaultKryoSerializerClasses)
                     && registeredKryoTypes.equals(other.registeredKryoTypes)
-                    && registeredPojoTypes.equals(other.registeredPojoTypes)
-                    && isDynamicGraph == other.isDynamicGraph;
+                    && registeredPojoTypes.equals(other.registeredPojoTypes);
 
         } else {
             return false;
@@ -970,8 +975,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
                 registeredTypesWithKryoSerializerClasses,
                 defaultKryoSerializerClasses,
                 registeredKryoTypes,
-                registeredPojoTypes,
-                isDynamicGraph);
+                registeredPojoTypes);
     }
 
     @Override
@@ -997,8 +1001,6 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
                 + registeredKryoTypes
                 + ", registeredPojoTypes="
                 + registeredPojoTypes
-                + ", isDynamicGraph="
-                + isDynamicGraph
                 + '}';
     }
 
@@ -1155,13 +1157,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
                 .map(c -> loadClasses(c, classLoader, "Could not load kryo type to be registered."))
                 .ifPresent(c -> this.registeredKryoTypes = c);
 
-        configuration
-                .getOptional(JobManagerOptions.SCHEDULER)
-                .ifPresent(
-                        schedulerType ->
-                                this.setDynamicGraph(
-                                        schedulerType
-                                                == JobManagerOptions.SchedulerType.AdaptiveBatch));
+        configuration.getOptional(JobManagerOptions.SCHEDULER).ifPresent(this::setScheduler);
     }
 
     /**
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitionerTestUtils.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitionerTestUtils.java
index 12e9d54aa6e..5ac50c654a0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitionerTestUtils.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitionerTestUtils.java
@@ -18,6 +18,9 @@
 package org.apache.flink.streaming.runtime.partitioner;
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.JobManagerOptions.SchedulerType;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -45,9 +48,11 @@ public class StreamPartitionerTestUtils {
             StreamPartitioner<Long> streamPartitioner,
             StreamExchangeMode exchangeMode) {
 
-        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        Configuration configuration = new Configuration();
+        configuration.set(JobManagerOptions.SCHEDULER, SchedulerType.AdaptiveBatch);
+        final StreamExecutionEnvironment env =
+                StreamExecutionEnvironment.getExecutionEnvironment(configuration);
         env.setRuntimeMode(RuntimeExecutionMode.BATCH);
-        env.getConfig().setDynamicGraph(true);
 
         final DataStream<Long> source =
                 env.fromSequence(0, 99).slotSharingGroup(sourceSlotSharingGroup).name("source");
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/ForwardHashExchangeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/ForwardHashExchangeTest.java
index b5edb3632f9..c9d3e42edae 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/ForwardHashExchangeTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/ForwardHashExchangeTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.table.planner.plan.batch.sql;
 
+import org.apache.flink.configuration.JobManagerOptions.SchedulerType;
 import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.api.config.OptimizerConfigOptions;
@@ -35,7 +36,7 @@ public class ForwardHashExchangeTest extends TableTestBase {
     public void before() {
         util = batchTestUtil(TableConfig.getDefault());
 
-        util.getStreamEnv().getConfig().setDynamicGraph(true);
+        util.getStreamEnv().getConfig().setScheduler(SchedulerType.AdaptiveBatch);
         util.tableEnv()
                 .getConfig()
                 .getConfiguration()
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/ParallelismSettingTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/ParallelismSettingTest.java
index fa3f1c248df..7fed38f5831 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/ParallelismSettingTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/ParallelismSettingTest.java
@@ -18,6 +18,7 @@
 package org.apache.flink.table.planner.runtime.batch;
 
 import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.JobManagerOptions.SchedulerType;
 import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.operations.ModifyOperation;
 import org.apache.flink.table.operations.Operation;
@@ -41,7 +42,7 @@ public class ParallelismSettingTest extends TableTestBase {
     public void before() {
         util = batchTestUtil(TableConfig.getDefault());
 
-        util.getStreamEnv().getConfig().setDynamicGraph(true);
+        util.getStreamEnv().getConfig().setScheduler(SchedulerType.AdaptiveBatch);
         util.tableEnv()
                 .executeSql(
                         "CREATE TABLE MyTable (\n"
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/ForwardHashExchangeITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/ForwardHashExchangeITCase.java
index 522eddc2eaa..f98ffce0af5 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/ForwardHashExchangeITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/ForwardHashExchangeITCase.java
@@ -19,6 +19,7 @@ package org.apache.flink.table.planner.runtime.batch.sql;
 
 import org.apache.flink.api.common.BatchShuffleMode;
 import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.configuration.JobManagerOptions.SchedulerType;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.planner.factories.TestValuesTableFactory;
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase;
@@ -37,7 +38,7 @@ public class ForwardHashExchangeITCase extends BatchTestBase {
     @Before
     public void before() throws Exception {
         super.before();
-        env().getConfig().setDynamicGraph(true);
+        env().getConfig().setScheduler(SchedulerType.AdaptiveBatch);
         env().disableOperatorChaining();
         tEnv().getConfig()
                 .set(ExecutionOptions.BATCH_SHUFFLE_MODE, BatchShuffleMode.ALL_EXCHANGES_BLOCKING);