You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2022/11/06 20:19:49 UTC
[flink] 04/04: [FLINK-29379][streaming] Migrate isDynamicGraph in ExecutionConfig to Configuration
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 12efbb9c85c605db55a2aea2a7bbea62fa930e1d
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Thu Nov 3 10:43:14 2022 +0100
[FLINK-29379][streaming] Migrate isDynamicGraph in ExecutionConfig to Configuration
---
.../apache/flink/api/common/ExecutionConfig.java | 32 ++++++++++------------
.../partitioner/StreamPartitionerTestUtils.java | 9 ++++--
.../plan/batch/sql/ForwardHashExchangeTest.java | 3 +-
.../runtime/batch/ParallelismSettingTest.java | 3 +-
.../batch/sql/ForwardHashExchangeITCase.java | 3 +-
5 files changed, 27 insertions(+), 23 deletions(-)
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 4d618173e58..1e75154edc3 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.common;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
@@ -29,6 +30,7 @@ import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.DescribedEnum;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.JobManagerOptions.SchedulerType;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.ReadableConfig;
@@ -150,8 +152,6 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
private RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration =
new RestartStrategies.FallbackRestartStrategyConfiguration();
- private boolean isDynamicGraph = false;
-
// ------------------------------- User code values --------------------------------------------
private GlobalJobParameters globalJobParameters = new GlobalJobParameters();
@@ -464,14 +464,20 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
}
}
+ /**
+ * TODO: this shouldn't exist and shouldn't pollute public API. Tests should change this via
+ * Configuration
+ */
+ @VisibleForTesting
@Internal
- public void setDynamicGraph(boolean dynamicGraph) {
- isDynamicGraph = dynamicGraph;
+ public ExecutionConfig setScheduler(SchedulerType schedulerType) {
+ configuration.set(JobManagerOptions.SCHEDULER, schedulerType);
+ return this;
}
@Internal
public boolean isDynamicGraph() {
- return isDynamicGraph;
+ return configuration.get(JobManagerOptions.SCHEDULER) == SchedulerType.AdaptiveBatch;
}
/**
@@ -953,8 +959,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
other.registeredTypesWithKryoSerializerClasses)
&& defaultKryoSerializerClasses.equals(other.defaultKryoSerializerClasses)
&& registeredKryoTypes.equals(other.registeredKryoTypes)
- && registeredPojoTypes.equals(other.registeredPojoTypes)
- && isDynamicGraph == other.isDynamicGraph;
+ && registeredPojoTypes.equals(other.registeredPojoTypes);
} else {
return false;
@@ -970,8 +975,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
registeredTypesWithKryoSerializerClasses,
defaultKryoSerializerClasses,
registeredKryoTypes,
- registeredPojoTypes,
- isDynamicGraph);
+ registeredPojoTypes);
}
@Override
@@ -997,8 +1001,6 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
+ registeredKryoTypes
+ ", registeredPojoTypes="
+ registeredPojoTypes
- + ", isDynamicGraph="
- + isDynamicGraph
+ '}';
}
@@ -1155,13 +1157,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
.map(c -> loadClasses(c, classLoader, "Could not load kryo type to be registered."))
.ifPresent(c -> this.registeredKryoTypes = c);
- configuration
- .getOptional(JobManagerOptions.SCHEDULER)
- .ifPresent(
- schedulerType ->
- this.setDynamicGraph(
- schedulerType
- == JobManagerOptions.SchedulerType.AdaptiveBatch));
+ configuration.getOptional(JobManagerOptions.SCHEDULER).ifPresent(this::setScheduler);
}
/**
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitionerTestUtils.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitionerTestUtils.java
index 12e9d54aa6e..5ac50c654a0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitionerTestUtils.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitionerTestUtils.java
@@ -18,6 +18,9 @@
package org.apache.flink.streaming.runtime.partitioner;
import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.JobManagerOptions.SchedulerType;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -45,9 +48,11 @@ public class StreamPartitionerTestUtils {
StreamPartitioner<Long> streamPartitioner,
StreamExchangeMode exchangeMode) {
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ Configuration configuration = new Configuration();
+ configuration.set(JobManagerOptions.SCHEDULER, SchedulerType.AdaptiveBatch);
+ final StreamExecutionEnvironment env =
+ StreamExecutionEnvironment.getExecutionEnvironment(configuration);
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
- env.getConfig().setDynamicGraph(true);
final DataStream<Long> source =
env.fromSequence(0, 99).slotSharingGroup(sourceSlotSharingGroup).name("source");
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/ForwardHashExchangeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/ForwardHashExchangeTest.java
index b5edb3632f9..c9d3e42edae 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/ForwardHashExchangeTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/ForwardHashExchangeTest.java
@@ -17,6 +17,7 @@
package org.apache.flink.table.planner.plan.batch.sql;
+import org.apache.flink.configuration.JobManagerOptions.SchedulerType;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.config.OptimizerConfigOptions;
@@ -35,7 +36,7 @@ public class ForwardHashExchangeTest extends TableTestBase {
public void before() {
util = batchTestUtil(TableConfig.getDefault());
- util.getStreamEnv().getConfig().setDynamicGraph(true);
+ util.getStreamEnv().getConfig().setScheduler(SchedulerType.AdaptiveBatch);
util.tableEnv()
.getConfig()
.getConfiguration()
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/ParallelismSettingTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/ParallelismSettingTest.java
index fa3f1c248df..7fed38f5831 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/ParallelismSettingTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/ParallelismSettingTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.planner.runtime.batch;
import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.JobManagerOptions.SchedulerType;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation;
@@ -41,7 +42,7 @@ public class ParallelismSettingTest extends TableTestBase {
public void before() {
util = batchTestUtil(TableConfig.getDefault());
- util.getStreamEnv().getConfig().setDynamicGraph(true);
+ util.getStreamEnv().getConfig().setScheduler(SchedulerType.AdaptiveBatch);
util.tableEnv()
.executeSql(
"CREATE TABLE MyTable (\n"
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/ForwardHashExchangeITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/ForwardHashExchangeITCase.java
index 522eddc2eaa..f98ffce0af5 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/ForwardHashExchangeITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/ForwardHashExchangeITCase.java
@@ -19,6 +19,7 @@ package org.apache.flink.table.planner.runtime.batch.sql;
import org.apache.flink.api.common.BatchShuffleMode;
import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.configuration.JobManagerOptions.SchedulerType;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.runtime.utils.BatchTestBase;
@@ -37,7 +38,7 @@ public class ForwardHashExchangeITCase extends BatchTestBase {
@Before
public void before() throws Exception {
super.before();
- env().getConfig().setDynamicGraph(true);
+ env().getConfig().setScheduler(SchedulerType.AdaptiveBatch);
env().disableOperatorChaining();
tEnv().getConfig()
.set(ExecutionOptions.BATCH_SHUFFLE_MODE, BatchShuffleMode.ALL_EXCHANGES_BLOCKING);