You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/03/29 22:51:14 UTC
[7/7] flink git commit: [hotfix] [dist. coordination] Remove
redundant method 'ExecutionVertex.getSimpleName()'
[hotfix] [dist. coordination] Remove redundant method 'ExecutionVertex.getSimpleName()'
Replace the method via identical method 'getTaskNameWithSubtaskIndex'.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ca681101
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ca681101
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ca681101
Branch: refs/heads/master
Commit: ca681101fa7c813345dc3125a3ec7af22563ab00
Parents: 719d0cf
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Mar 29 22:32:53 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Mar 29 22:32:53 2017 +0200
----------------------------------------------------------------------
.../checkpoint/CheckpointCoordinator.java | 4 ++--
.../flink/runtime/executiongraph/Execution.java | 6 ++---
.../runtime/executiongraph/ExecutionVertex.java | 24 ++++++++------------
.../runtime/jobmanager/scheduler/Scheduler.java | 2 +-
.../ExecutionGraphDeploymentTest.java | 2 +-
.../scheduler/SchedulerTestUtils.java | 2 +-
6 files changed, 18 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ca681101/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index cc60837..7087540 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -435,7 +435,7 @@ public class CheckpointCoordinator {
executions[i] = ee;
} else {
LOG.info("Checkpoint triggering task {} is not being executed at the moment. Aborting checkpoint.",
- tasksToTrigger[i].getSimpleName());
+ tasksToTrigger[i].getTaskNameWithSubtaskIndex());
return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
}
}
@@ -450,7 +450,7 @@ public class CheckpointCoordinator {
ackTasks.put(ee.getAttemptId(), ev);
} else {
LOG.info("Checkpoint acknowledging task {} is not being executed at the moment. Aborting checkpoint.",
- ev.getSimpleName());
+ ev.getTaskNameWithSubtaskIndex());
return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ca681101/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 1a3ef11..729e161 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -357,7 +357,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
}
if (LOG.isInfoEnabled()) {
- LOG.info(String.format("Deploying %s (attempt #%d) to %s", vertex.getSimpleName(),
+ LOG.info(String.format("Deploying %s (attempt #%d) to %s", vertex.getTaskNameWithSubtaskIndex(),
attemptNumber, getAssignedResourceLocation().getHostname()));
}
@@ -1071,7 +1071,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
}
public String getVertexWithAttempt() {
- return vertex.getSimpleName() + " - execution #" + attemptNumber;
+ return vertex.getTaskNameWithSubtaskIndex() + " - execution #" + attemptNumber;
}
// ------------------------------------------------------------------------
@@ -1126,7 +1126,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
@Override
public String toString() {
- return String.format("Attempt #%d (%s) @ %s - [%s]", attemptNumber, vertex.getSimpleName(),
+ return String.format("Attempt #%d (%s) @ %s - [%s]", attemptNumber, vertex.getTaskNameWithSubtaskIndex(),
(assignedResource == null ? "(unassigned)" : assignedResource.toString()), state);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ca681101/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index c7829fa..90820e9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -188,6 +188,14 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
return this.jobVertex.getJobVertex().getName();
}
+ /**
+ * Creates a simple name representation in the style 'taskname (x/y)', where
+ * 'taskname' is the name as returned by {@link #getTaskName()}, 'x' is the parallel
+ * subtask index as returned by {@link #getParallelSubtaskIndex()}{@code + 1}, and 'y' is the total
+ * number of tasks, as returned by {@link #getTotalNumberOfParallelSubtasks()}.
+ *
+ * @return A simple name representation in the form 'myTask (2/7)'
+ */
@Override
public String getTaskNameWithSubtaskIndex() {
return this.taskNameWithSubtask;
@@ -503,7 +511,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
public void resetForNewExecution() {
- LOG.debug("Resetting execution vertex {} for new execution.", getSimpleName());
+ LOG.debug("Resetting execution vertex {} for new execution.", getTaskNameWithSubtaskIndex());
synchronized (priorExecutions) {
Execution execution = currentExecution;
@@ -722,21 +730,9 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
// Utilities
// --------------------------------------------------------------------------------------------
- /**
- * Creates a simple name representation in the style 'taskname (x/y)', where
- * 'taskname' is the name as returned by {@link #getTaskName()}, 'x' is the parallel
- * subtask index as returned by {@link #getParallelSubtaskIndex()}{@code + 1}, and 'y' is the total
- * number of tasks, as returned by {@link #getTotalNumberOfParallelSubtasks()}.
- *
- * @return A simple name representation.
- */
- public String getSimpleName() {
- return taskNameWithSubtask;
- }
-
@Override
public String toString() {
- return getSimpleName();
+ return getTaskNameWithSubtaskIndex();
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/ca681101/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index 58dac3e..af72d7c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -570,7 +570,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
queued.getFuture().complete(newSlot);
}
catch (Throwable t) {
- LOG.error("Error calling allocation future for task " + vertex.getSimpleName(), t);
+ LOG.error("Error calling allocation future for task " + vertex.getTaskNameWithSubtaskIndex(), t);
task.getTaskToExecute().fail(t);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ca681101/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 7f5811a..8d91b84 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -359,7 +359,7 @@ public class ExecutionGraphDeploymentTest {
Collections.sort(execList, new Comparator<Execution>() {
@Override
public int compare(Execution o1, Execution o2) {
- return o1.getVertex().getSimpleName().compareTo(o2.getVertex().getSimpleName());
+ return o1.getVertex().getTaskNameWithSubtaskIndex().compareTo(o2.getVertex().getTaskNameWithSubtaskIndex());
}
});
http://git-wip-us.apache.org/repos/asf/flink/blob/ca681101/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
index 9e692ff..4312b0f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
@@ -126,7 +126,7 @@ public class SchedulerTestUtils {
when(vertex.getTotalNumberOfParallelSubtasks()).thenReturn(numTasks);
when(vertex.getMaxParallelism()).thenReturn(numTasks);
when(vertex.toString()).thenReturn("TEST-VERTEX");
- when(vertex.getSimpleName()).thenReturn("TEST-VERTEX");
+ when(vertex.getTaskNameWithSubtaskIndex()).thenReturn("TEST-VERTEX");
Execution execution = mock(Execution.class);
when(execution.getVertex()).thenReturn(vertex);