You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/11/16 23:35:34 UTC

[3/3] flink git commit: [FLINK-5057] [taskmanager] Read cancellation timeout from task manager config

[FLINK-5057] [taskmanager] Read cancellation timeout from task manager config

This closes #2793


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bf06a1cc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bf06a1cc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bf06a1cc

Branch: refs/heads/master
Commit: bf06a1cc786c0a7b8c8d446be01a63edf2cc0897
Parents: a5ec849
Author: Ufuk Celebi <uc...@apache.org>
Authored: Sat Nov 12 20:19:15 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Nov 17 00:34:55 2016 +0100

----------------------------------------------------------------------
 .../main/java/org/apache/flink/runtime/taskmanager/Task.java   | 5 +++--
 .../org/apache/flink/runtime/taskmanager/TaskStopTest.java     | 5 ++++-
 .../java/org/apache/flink/runtime/taskmanager/TaskTest.java    | 6 +++---
 3 files changed, 10 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bf06a1cc/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index b960e68..3254fc1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -299,8 +299,9 @@ public class Task implements Runnable, TaskActions {
 		this.serializedExecutionConfig = jobInformation.getSerializedExecutionConfig();
 		this.taskStateHandles = taskStateHandles;
 
-		this.taskCancellationInterval = taskConfiguration.getLong(TaskManagerOptions.TASK_CANCELLATION_INTERVAL);
-		this.taskCancellationTimeout = taskConfiguration.getLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT);
+		Configuration tmConfig = taskManagerConfig.getConfiguration();
+		this.taskCancellationInterval = tmConfig.getLong(TaskManagerOptions.TASK_CANCELLATION_INTERVAL);
+		this.taskCancellationTimeout = tmConfig.getLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT);
 
 		this.memoryManager = Preconditions.checkNotNull(memManager);
 		this.ioManager = Preconditions.checkNotNull(ioManager);

http://git-wip-us.apache.org/repos/asf/flink/blob/bf06a1cc/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
index 276e090..d80dab3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
@@ -64,6 +64,9 @@ public class TaskStopTest {
 		TaskInfo taskInfoMock = mock(TaskInfo.class);
 		when(taskInfoMock.getTaskNameWithSubtasks()).thenReturn("dummyName");
 
+		TaskManagerRuntimeInfo tmRuntimeInfo = mock(TaskManagerRuntimeInfo.class);
+		when(tmRuntimeInfo.getConfiguration()).thenReturn(new Configuration());
+
 		task = new Task(
 			mock(JobInformation.class),
 			new TaskInformation(
@@ -89,7 +92,7 @@ public class TaskStopTest {
 			mock(CheckpointResponder.class),
 			mock(LibraryCacheManager.class),
 			mock(FileCache.class),
-			mock(TaskManagerRuntimeInfo.class),
+			tmRuntimeInfo,
 			mock(TaskMetricGroup.class),
 			mock(ResultPartitionConsumableNotifier.class),
 			mock(PartitionStateChecker.class),

http://git-wip-us.apache.org/repos/asf/flink/blob/bf06a1cc/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 5d26050..8177bf7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -781,7 +781,7 @@ public class TaskTest extends TestLogger {
 		ResultPartitionConsumableNotifier consumableNotifier,
 		PartitionStateChecker partitionStateChecker,
 		Executor executor,
-		Configuration taskConfig,
+		Configuration taskManagerConfig,
 		ExecutionConfig execConfig) throws IOException {
 		
 		JobID jobId = new JobID();
@@ -813,7 +813,7 @@ public class TaskTest extends TestLogger {
 			1,
 			1,
 			invokable.getName(),
-			taskConfig);
+			new Configuration());
 		
 		return new Task(
 			jobInformation,
@@ -834,7 +834,7 @@ public class TaskTest extends TestLogger {
 			checkpointResponder,
 			libCache,
 			mock(FileCache.class),
-			new TaskManagerRuntimeInfo("localhost", new Configuration(), System.getProperty("java.io.tmpdir")),
+			new TaskManagerRuntimeInfo("localhost", taskManagerConfig, System.getProperty("java.io.tmpdir")),
 			mock(TaskMetricGroup.class),
 			consumableNotifier,
 			partitionStateChecker,