You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2022/11/06 20:19:48 UTC

[flink] 03/04: [FLINK-29379][streaming] Migrate TASK_CANCELLATION options in ExecutionConfig to Configuration

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bf0d28aae03542e5f7d1273d34c512349b85c841
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Thu Nov 3 09:31:33 2022 +0100

    [FLINK-29379][streaming] Migrate TASK_CANCELLATION options in ExecutionConfig to Configuration
---
 .../apache/flink/api/common/ExecutionConfig.java   | 32 ++++++++++------------
 .../pyflink/common/tests/test_execution_config.py  |  4 +--
 .../org/apache/flink/runtime/taskmanager/Task.java | 23 +++++++++-------
 3 files changed, 29 insertions(+), 30 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 461cdb4e107..4d618173e58 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -152,14 +152,6 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
 
     private boolean isDynamicGraph = false;
 
-    private long taskCancellationIntervalMillis = -1;
-
-    /**
-     * Timeout after which an ongoing task cancellation will lead to a fatal TaskManager error,
-     * usually killing the JVM.
-     */
-    private long taskCancellationTimeoutMillis = -1;
-
     // ------------------------------- User code values --------------------------------------------
 
     private GlobalJobParameters globalJobParameters = new GlobalJobParameters();
@@ -383,7 +375,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
      * Gets the interval (in milliseconds) between consecutive attempts to cancel a running task.
      */
     public long getTaskCancellationInterval() {
-        return this.taskCancellationIntervalMillis;
+        return configuration.get(TaskManagerOptions.TASK_CANCELLATION_INTERVAL);
     }
 
     /**
@@ -393,7 +385,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
      * @param interval the interval (in milliseconds).
      */
     public ExecutionConfig setTaskCancellationInterval(long interval) {
-        this.taskCancellationIntervalMillis = interval;
+        configuration.set(TaskManagerOptions.TASK_CANCELLATION_INTERVAL, interval);
         return this;
     }
 
@@ -406,7 +398,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
      */
     @PublicEvolving
     public long getTaskCancellationTimeout() {
-        return this.taskCancellationTimeoutMillis;
+        return configuration.get(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT);
     }
 
     /**
@@ -424,7 +416,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
     @PublicEvolving
     public ExecutionConfig setTaskCancellationTimeout(long timeout) {
         checkArgument(timeout >= 0, "Timeout needs to be >= 0.");
-        this.taskCancellationTimeoutMillis = timeout;
+        configuration.set(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, timeout);
         return this;
     }
 
@@ -962,7 +954,6 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
                     && defaultKryoSerializerClasses.equals(other.defaultKryoSerializerClasses)
                     && registeredKryoTypes.equals(other.registeredKryoTypes)
                     && registeredPojoTypes.equals(other.registeredPojoTypes)
-                    && taskCancellationIntervalMillis == other.taskCancellationIntervalMillis
                     && isDynamicGraph == other.isDynamicGraph;
 
         } else {
@@ -980,7 +971,6 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
                 defaultKryoSerializerClasses,
                 registeredKryoTypes,
                 registeredPojoTypes,
-                taskCancellationIntervalMillis,
                 isDynamicGraph);
     }
 
@@ -993,10 +983,6 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
                 + executionRetryDelay
                 + ", restartStrategyConfiguration="
                 + restartStrategyConfiguration
-                + ", taskCancellationIntervalMillis="
-                + taskCancellationIntervalMillis
-                + ", taskCancellationTimeoutMillis="
-                + taskCancellationTimeoutMillis
                 + ", globalJobParameters="
                 + globalJobParameters
                 + ", registeredTypesWithKryoSerializers="
@@ -1178,6 +1164,16 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
                                                 == JobManagerOptions.SchedulerType.AdaptiveBatch));
     }
 
+    /**
+     * @return A copy of internal {@link #configuration}. Note it is missing all options that are
+     *     stored as plain java fields in {@link ExecutionConfig}, for example {@link
+     *     #registeredKryoTypes} or {@link #globalJobParameters}.
+     */
+    @Internal
+    public Configuration toConfiguration() {
+        return new Configuration(configuration);
+    }
+
     private LinkedHashSet<Class<?>> loadClasses(
             List<String> classNames, ClassLoader classLoader, String errorMessage) {
         return classNames.stream()
diff --git a/flink-python/pyflink/common/tests/test_execution_config.py b/flink-python/pyflink/common/tests/test_execution_config.py
index 5c8917f0210..851413f26ff 100644
--- a/flink-python/pyflink/common/tests/test_execution_config.py
+++ b/flink-python/pyflink/common/tests/test_execution_config.py
@@ -77,7 +77,7 @@ class ExecutionConfigTests(PyFlinkTestCase):
 
     def test_get_set_task_cancellation_interval(self):
 
-        self.assertEqual(self.execution_config.get_task_cancellation_interval(), -1)
+        self.assertEqual(self.execution_config.get_task_cancellation_interval(), 30000)
 
         self.execution_config.set_task_cancellation_interval(1000)
 
@@ -85,7 +85,7 @@ class ExecutionConfigTests(PyFlinkTestCase):
 
     def test_get_set_task_cancellation_timeout(self):
 
-        self.assertEqual(self.execution_config.get_task_cancellation_timeout(), -1)
+        self.assertEqual(self.execution_config.get_task_cancellation_timeout(), 180000)
 
         self.execution_config.set_task_cancellation_timeout(3000)
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 360290c31ed..2f65781984f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -612,16 +612,19 @@ public class Task
             userCodeClassLoader = createUserCodeClassloader();
             final ExecutionConfig executionConfig =
                     serializedExecutionConfig.deserializeValue(userCodeClassLoader.asClassLoader());
-
-            if (executionConfig.getTaskCancellationInterval() >= 0) {
-                // override task cancellation interval from Flink config if set in ExecutionConfig
-                taskCancellationInterval = executionConfig.getTaskCancellationInterval();
-            }
-
-            if (executionConfig.getTaskCancellationTimeout() >= 0) {
-                // override task cancellation timeout from Flink config if set in ExecutionConfig
-                taskCancellationTimeout = executionConfig.getTaskCancellationTimeout();
-            }
+            Configuration executionConfigConfiguration = executionConfig.toConfiguration();
+
+            // override task cancellation interval from Flink config if set in ExecutionConfig
+            taskCancellationInterval =
+                    executionConfigConfiguration
+                            .getOptional(TaskManagerOptions.TASK_CANCELLATION_INTERVAL)
+                            .orElse(taskCancellationInterval);
+
+            // override task cancellation timeout from Flink config if set in ExecutionConfig
+            taskCancellationTimeout =
+                    executionConfigConfiguration
+                            .getOptional(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT)
+                            .orElse(taskCancellationTimeout);
 
             if (isCanceledOrFailed()) {
                 throw new CancelTaskException();