You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/11/16 23:35:34 UTC
[3/3] flink git commit: [FLINK-5057] [taskmanager] Read cancellation
timeout from task manager config
[FLINK-5057] [taskmanager] Read cancellation timeout from task manager config
This closes #2793
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bf06a1cc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bf06a1cc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bf06a1cc
Branch: refs/heads/master
Commit: bf06a1cc786c0a7b8c8d446be01a63edf2cc0897
Parents: a5ec849
Author: Ufuk Celebi <uc...@apache.org>
Authored: Sat Nov 12 20:19:15 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Nov 17 00:34:55 2016 +0100
----------------------------------------------------------------------
.../main/java/org/apache/flink/runtime/taskmanager/Task.java | 5 +++--
.../org/apache/flink/runtime/taskmanager/TaskStopTest.java | 5 ++++-
.../java/org/apache/flink/runtime/taskmanager/TaskTest.java | 6 +++---
3 files changed, 10 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/bf06a1cc/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index b960e68..3254fc1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -299,8 +299,9 @@ public class Task implements Runnable, TaskActions {
this.serializedExecutionConfig = jobInformation.getSerializedExecutionConfig();
this.taskStateHandles = taskStateHandles;
- this.taskCancellationInterval = taskConfiguration.getLong(TaskManagerOptions.TASK_CANCELLATION_INTERVAL);
- this.taskCancellationTimeout = taskConfiguration.getLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT);
+ Configuration tmConfig = taskManagerConfig.getConfiguration();
+ this.taskCancellationInterval = tmConfig.getLong(TaskManagerOptions.TASK_CANCELLATION_INTERVAL);
+ this.taskCancellationTimeout = tmConfig.getLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT);
this.memoryManager = Preconditions.checkNotNull(memManager);
this.ioManager = Preconditions.checkNotNull(ioManager);
http://git-wip-us.apache.org/repos/asf/flink/blob/bf06a1cc/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
index 276e090..d80dab3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
@@ -64,6 +64,9 @@ public class TaskStopTest {
TaskInfo taskInfoMock = mock(TaskInfo.class);
when(taskInfoMock.getTaskNameWithSubtasks()).thenReturn("dummyName");
+ TaskManagerRuntimeInfo tmRuntimeInfo = mock(TaskManagerRuntimeInfo.class);
+ when(tmRuntimeInfo.getConfiguration()).thenReturn(new Configuration());
+
task = new Task(
mock(JobInformation.class),
new TaskInformation(
@@ -89,7 +92,7 @@ public class TaskStopTest {
mock(CheckpointResponder.class),
mock(LibraryCacheManager.class),
mock(FileCache.class),
- mock(TaskManagerRuntimeInfo.class),
+ tmRuntimeInfo,
mock(TaskMetricGroup.class),
mock(ResultPartitionConsumableNotifier.class),
mock(PartitionStateChecker.class),
http://git-wip-us.apache.org/repos/asf/flink/blob/bf06a1cc/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 5d26050..8177bf7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -781,7 +781,7 @@ public class TaskTest extends TestLogger {
ResultPartitionConsumableNotifier consumableNotifier,
PartitionStateChecker partitionStateChecker,
Executor executor,
- Configuration taskConfig,
+ Configuration taskManagerConfig,
ExecutionConfig execConfig) throws IOException {
JobID jobId = new JobID();
@@ -813,7 +813,7 @@ public class TaskTest extends TestLogger {
1,
1,
invokable.getName(),
- taskConfig);
+ new Configuration());
return new Task(
jobInformation,
@@ -834,7 +834,7 @@ public class TaskTest extends TestLogger {
checkpointResponder,
libCache,
mock(FileCache.class),
- new TaskManagerRuntimeInfo("localhost", new Configuration(), System.getProperty("java.io.tmpdir")),
+ new TaskManagerRuntimeInfo("localhost", taskManagerConfig, System.getProperty("java.io.tmpdir")),
mock(TaskMetricGroup.class),
consumableNotifier,
partitionStateChecker,