You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/03/29 22:51:14 UTC

[7/7] flink git commit: [hotfix] [dist. coordination] Remove redundant method 'ExecutionVertex.getSimpleName()'

[hotfix] [dist. coordination] Remove redundant method 'ExecutionVertex.getSimpleName()'

Replace the method via identical method 'getTaskNameWithSubtaskIndex'.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ca681101
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ca681101
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ca681101

Branch: refs/heads/master
Commit: ca681101fa7c813345dc3125a3ec7af22563ab00
Parents: 719d0cf
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Mar 29 22:32:53 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Mar 29 22:32:53 2017 +0200

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       |  4 ++--
 .../flink/runtime/executiongraph/Execution.java |  6 ++---
 .../runtime/executiongraph/ExecutionVertex.java | 24 ++++++++------------
 .../runtime/jobmanager/scheduler/Scheduler.java |  2 +-
 .../ExecutionGraphDeploymentTest.java           |  2 +-
 .../scheduler/SchedulerTestUtils.java           |  2 +-
 6 files changed, 18 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ca681101/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index cc60837..7087540 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -435,7 +435,7 @@ public class CheckpointCoordinator {
 				executions[i] = ee;
 			} else {
 				LOG.info("Checkpoint triggering task {} is not being executed at the moment. Aborting checkpoint.",
-						tasksToTrigger[i].getSimpleName());
+						tasksToTrigger[i].getTaskNameWithSubtaskIndex());
 				return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
 			}
 		}
@@ -450,7 +450,7 @@ public class CheckpointCoordinator {
 				ackTasks.put(ee.getAttemptId(), ev);
 			} else {
 				LOG.info("Checkpoint acknowledging task {} is not being executed at the moment. Aborting checkpoint.",
-						ev.getSimpleName());
+						ev.getTaskNameWithSubtaskIndex());
 				return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/ca681101/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 1a3ef11..729e161 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -357,7 +357,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 			}
 
 			if (LOG.isInfoEnabled()) {
-				LOG.info(String.format("Deploying %s (attempt #%d) to %s", vertex.getSimpleName(),
+				LOG.info(String.format("Deploying %s (attempt #%d) to %s", vertex.getTaskNameWithSubtaskIndex(),
 						attemptNumber, getAssignedResourceLocation().getHostname()));
 			}
 
@@ -1071,7 +1071,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	}
 
 	public String getVertexWithAttempt() {
-		return vertex.getSimpleName() + " - execution #" + attemptNumber;
+		return vertex.getTaskNameWithSubtaskIndex() + " - execution #" + attemptNumber;
 	}
 
 	// ------------------------------------------------------------------------
@@ -1126,7 +1126,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	
 	@Override
 	public String toString() {
-		return String.format("Attempt #%d (%s) @ %s - [%s]", attemptNumber, vertex.getSimpleName(),
+		return String.format("Attempt #%d (%s) @ %s - [%s]", attemptNumber, vertex.getTaskNameWithSubtaskIndex(),
 				(assignedResource == null ? "(unassigned)" : assignedResource.toString()), state);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ca681101/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index c7829fa..90820e9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -188,6 +188,14 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 		return this.jobVertex.getJobVertex().getName();
 	}
 
+	/**
+	 * Creates a simple name representation in the style 'taskname (x/y)', where
+	 * 'taskname' is the name as returned by {@link #getTaskName()}, 'x' is the parallel
+	 * subtask index as returned by {@link #getParallelSubtaskIndex()}{@code + 1}, and 'y' is the total
+	 * number of tasks, as returned by {@link #getTotalNumberOfParallelSubtasks()}.
+	 *
+	 * @return A simple name representation in the form 'myTask (2/7)'
+	 */
 	@Override
 	public String getTaskNameWithSubtaskIndex() {
 		return this.taskNameWithSubtask;
@@ -503,7 +511,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 
 	public void resetForNewExecution() {
 
-		LOG.debug("Resetting execution vertex {} for new execution.", getSimpleName());
+		LOG.debug("Resetting execution vertex {} for new execution.", getTaskNameWithSubtaskIndex());
 
 		synchronized (priorExecutions) {
 			Execution execution = currentExecution;
@@ -722,21 +730,9 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 	//  Utilities
 	// --------------------------------------------------------------------------------------------
 
-	/**
-	 * Creates a simple name representation in the style 'taskname (x/y)', where
-	 * 'taskname' is the name as returned by {@link #getTaskName()}, 'x' is the parallel
-	 * subtask index as returned by {@link #getParallelSubtaskIndex()}{@code + 1}, and 'y' is the total
-	 * number of tasks, as returned by {@link #getTotalNumberOfParallelSubtasks()}.
-	 *
-	 * @return A simple name representation.
-	 */
-	public String getSimpleName() {
-		return taskNameWithSubtask;
-	}
-
 	@Override
 	public String toString() {
-		return getSimpleName();
+		return getTaskNameWithSubtaskIndex();
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/ca681101/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index 58dac3e..af72d7c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -570,7 +570,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
 								queued.getFuture().complete(newSlot);
 							}
 							catch (Throwable t) {
-								LOG.error("Error calling allocation future for task " + vertex.getSimpleName(), t);
+								LOG.error("Error calling allocation future for task " + vertex.getTaskNameWithSubtaskIndex(), t);
 								task.getTaskToExecute().fail(t);
 							}
 						}

http://git-wip-us.apache.org/repos/asf/flink/blob/ca681101/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 7f5811a..8d91b84 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -359,7 +359,7 @@ public class ExecutionGraphDeploymentTest {
 			Collections.sort(execList, new Comparator<Execution>() {
 				@Override
 				public int compare(Execution o1, Execution o2) {
-					return o1.getVertex().getSimpleName().compareTo(o2.getVertex().getSimpleName());
+					return o1.getVertex().getTaskNameWithSubtaskIndex().compareTo(o2.getVertex().getTaskNameWithSubtaskIndex());
 				}
 			});
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ca681101/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
index 9e692ff..4312b0f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
@@ -126,7 +126,7 @@ public class SchedulerTestUtils {
 		when(vertex.getTotalNumberOfParallelSubtasks()).thenReturn(numTasks);
 		when(vertex.getMaxParallelism()).thenReturn(numTasks);
 		when(vertex.toString()).thenReturn("TEST-VERTEX");
-		when(vertex.getSimpleName()).thenReturn("TEST-VERTEX");
+		when(vertex.getTaskNameWithSubtaskIndex()).thenReturn("TEST-VERTEX");
 		
 		Execution execution = mock(Execution.class);
 		when(execution.getVertex()).thenReturn(vertex);