You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2022/11/06 20:19:48 UTC
[flink] 03/04: [FLINK-29379][streaming] Migrate TASK_CANCELLATION options in ExecutionConfig to Configuration
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit bf0d28aae03542e5f7d1273d34c512349b85c841
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Thu Nov 3 09:31:33 2022 +0100
[FLINK-29379][streaming] Migrate TASK_CANCELLATION options in ExecutionConfig to Configuration
---
.../apache/flink/api/common/ExecutionConfig.java | 32 ++++++++++------------
.../pyflink/common/tests/test_execution_config.py | 4 +--
.../org/apache/flink/runtime/taskmanager/Task.java | 23 +++++++++-------
3 files changed, 29 insertions(+), 30 deletions(-)
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 461cdb4e107..4d618173e58 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -152,14 +152,6 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
private boolean isDynamicGraph = false;
- private long taskCancellationIntervalMillis = -1;
-
- /**
- * Timeout after which an ongoing task cancellation will lead to a fatal TaskManager error,
- * usually killing the JVM.
- */
- private long taskCancellationTimeoutMillis = -1;
-
// ------------------------------- User code values --------------------------------------------
private GlobalJobParameters globalJobParameters = new GlobalJobParameters();
@@ -383,7 +375,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
* Gets the interval (in milliseconds) between consecutive attempts to cancel a running task.
*/
public long getTaskCancellationInterval() {
- return this.taskCancellationIntervalMillis;
+ return configuration.get(TaskManagerOptions.TASK_CANCELLATION_INTERVAL);
}
/**
@@ -393,7 +385,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
* @param interval the interval (in milliseconds).
*/
public ExecutionConfig setTaskCancellationInterval(long interval) {
- this.taskCancellationIntervalMillis = interval;
+ configuration.set(TaskManagerOptions.TASK_CANCELLATION_INTERVAL, interval);
return this;
}
@@ -406,7 +398,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
*/
@PublicEvolving
public long getTaskCancellationTimeout() {
- return this.taskCancellationTimeoutMillis;
+ return configuration.get(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT);
}
/**
@@ -424,7 +416,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
@PublicEvolving
public ExecutionConfig setTaskCancellationTimeout(long timeout) {
checkArgument(timeout >= 0, "Timeout needs to be >= 0.");
- this.taskCancellationTimeoutMillis = timeout;
+ configuration.set(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, timeout);
return this;
}
@@ -962,7 +954,6 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
&& defaultKryoSerializerClasses.equals(other.defaultKryoSerializerClasses)
&& registeredKryoTypes.equals(other.registeredKryoTypes)
&& registeredPojoTypes.equals(other.registeredPojoTypes)
- && taskCancellationIntervalMillis == other.taskCancellationIntervalMillis
&& isDynamicGraph == other.isDynamicGraph;
} else {
@@ -980,7 +971,6 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
defaultKryoSerializerClasses,
registeredKryoTypes,
registeredPojoTypes,
- taskCancellationIntervalMillis,
isDynamicGraph);
}
@@ -993,10 +983,6 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
+ executionRetryDelay
+ ", restartStrategyConfiguration="
+ restartStrategyConfiguration
- + ", taskCancellationIntervalMillis="
- + taskCancellationIntervalMillis
- + ", taskCancellationTimeoutMillis="
- + taskCancellationTimeoutMillis
+ ", globalJobParameters="
+ globalJobParameters
+ ", registeredTypesWithKryoSerializers="
@@ -1178,6 +1164,16 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
== JobManagerOptions.SchedulerType.AdaptiveBatch));
}
+ /**
+ * @return A copy of internal {@link #configuration}. Note it is missing all options that are
+ * stored as plain java fields in {@link ExecutionConfig}, for example {@link
+ * #registeredKryoTypes} or {@link #globalJobParameters}.
+ */
+ @Internal
+ public Configuration toConfiguration() {
+ return new Configuration(configuration);
+ }
+
private LinkedHashSet<Class<?>> loadClasses(
List<String> classNames, ClassLoader classLoader, String errorMessage) {
return classNames.stream()
diff --git a/flink-python/pyflink/common/tests/test_execution_config.py b/flink-python/pyflink/common/tests/test_execution_config.py
index 5c8917f0210..851413f26ff 100644
--- a/flink-python/pyflink/common/tests/test_execution_config.py
+++ b/flink-python/pyflink/common/tests/test_execution_config.py
@@ -77,7 +77,7 @@ class ExecutionConfigTests(PyFlinkTestCase):
def test_get_set_task_cancellation_interval(self):
- self.assertEqual(self.execution_config.get_task_cancellation_interval(), -1)
+ self.assertEqual(self.execution_config.get_task_cancellation_interval(), 30000)
self.execution_config.set_task_cancellation_interval(1000)
@@ -85,7 +85,7 @@ class ExecutionConfigTests(PyFlinkTestCase):
def test_get_set_task_cancellation_timeout(self):
- self.assertEqual(self.execution_config.get_task_cancellation_timeout(), -1)
+ self.assertEqual(self.execution_config.get_task_cancellation_timeout(), 180000)
self.execution_config.set_task_cancellation_timeout(3000)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 360290c31ed..2f65781984f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -612,16 +612,19 @@ public class Task
userCodeClassLoader = createUserCodeClassloader();
final ExecutionConfig executionConfig =
serializedExecutionConfig.deserializeValue(userCodeClassLoader.asClassLoader());
-
- if (executionConfig.getTaskCancellationInterval() >= 0) {
- // override task cancellation interval from Flink config if set in ExecutionConfig
- taskCancellationInterval = executionConfig.getTaskCancellationInterval();
- }
-
- if (executionConfig.getTaskCancellationTimeout() >= 0) {
- // override task cancellation timeout from Flink config if set in ExecutionConfig
- taskCancellationTimeout = executionConfig.getTaskCancellationTimeout();
- }
+ Configuration executionConfigConfiguration = executionConfig.toConfiguration();
+
+ // override task cancellation interval from Flink config if set in ExecutionConfig
+ taskCancellationInterval =
+ executionConfigConfiguration
+ .getOptional(TaskManagerOptions.TASK_CANCELLATION_INTERVAL)
+ .orElse(taskCancellationInterval);
+
+ // override task cancellation timeout from Flink config if set in ExecutionConfig
+ taskCancellationTimeout =
+ executionConfigConfiguration
+ .getOptional(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT)
+ .orElse(taskCancellationTimeout);
if (isCanceledOrFailed()) {
throw new CancelTaskException();