You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/03 12:37:30 UTC

[4/6] flink git commit: [hotfix] Minor code cleanups in the ExecutionGraph's Execution

[hotfix] Minor code cleanups in the ExecutionGraph's Execution


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/10e4e321
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/10e4e321
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/10e4e321

Branch: refs/heads/master
Commit: 10e4e321b335b6f9376501f90715e31b71b02da8
Parents: 2e107b1
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Jan 31 17:21:36 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Feb 3 10:28:23 2017 +0100

----------------------------------------------------------------------
 .../flink/runtime/executiongraph/Execution.java | 24 ++++++++------------
 .../taskmanager/TaskManagerLocation.java        |  2 +-
 2 files changed, 10 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/10e4e321/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 18a4445..c2fe5ea 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -49,6 +49,7 @@ import org.apache.flink.runtime.messages.StackTraceSampleResponse;
 import org.apache.flink.runtime.state.TaskStateHandles;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.ExceptionUtils;
+
 import org.slf4j.Logger;
 
 import java.util.ArrayList;
@@ -70,6 +71,7 @@ import static org.apache.flink.runtime.execution.ExecutionState.FINISHED;
 import static org.apache.flink.runtime.execution.ExecutionState.RUNNING;
 import static org.apache.flink.runtime.execution.ExecutionState.SCHEDULED;
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * A single execution of a vertex. While an {@link ExecutionVertex} can be executed multiple times (for recovery,
@@ -112,7 +114,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 
 	private final Time timeout;
 
-	private ConcurrentLinkedQueue<PartialInputChannelDeploymentDescriptor> partialInputChannelDeploymentDescriptors;
+	private final ConcurrentLinkedQueue<PartialInputChannelDeploymentDescriptor> partialInputChannelDeploymentDescriptors;
 
 	private volatile ExecutionState state = CREATED;
 
@@ -120,8 +122,6 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 
 	private volatile Throwable failureCause;          // once assigned, never changes
 
-	private volatile TaskManagerLocation assignedResourceLocation; // for the archived execution
-
 	private TaskStateHandles taskStateHandles;
 
 	/** The executor which is used to execute futures. */
@@ -189,7 +189,8 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 
 	@Override
 	public TaskManagerLocation getAssignedResourceLocation() {
-		return assignedResourceLocation;
+		// returns non-null only when a location is already assigned
+		return assignedResource != null ? assignedResource.getTaskManagerLocation() : null;
 	}
 
 	public Throwable getFailureCause() {
@@ -226,11 +227,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	 * @param checkpointStateHandles all checkpointed operator state
 	 */
 	public void setInitialState(TaskStateHandles checkpointStateHandles) {
-
-		if (state != ExecutionState.CREATED) {
-			throw new IllegalArgumentException("Can only assign operator state when execution attempt is in CREATED");
-		}
-
+		checkState(state == CREATED, "Can only assign operator state when execution attempt is in CREATED");
 		this.taskStateHandles = checkpointStateHandles;
 	}
 
@@ -343,7 +340,6 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 				throw new JobException("Could not assign the ExecutionVertex to the slot " + slot);
 			}
 			this.assignedResource = slot;
-			this.assignedResourceLocation = slot.getTaskManagerLocation();
 
 			// race double check, did we fail/cancel and do we need to release the slot?
 			if (this.state != DEPLOYING) {
@@ -353,7 +349,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 
 			if (LOG.isInfoEnabled()) {
 				LOG.info(String.format("Deploying %s (attempt #%d) to %s", vertex.getSimpleName(),
-						attemptNumber, assignedResourceLocation.getHostname()));
+						attemptNumber, getAssignedResourceLocation().getHostname()));
 			}
 
 			final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor(
@@ -373,12 +369,10 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 				@Override
 				public Void apply(Throwable failure) {
 					if (failure instanceof TimeoutException) {
-						String taskname = vertex.getTaskName() + '(' +
-							(getParallelSubtaskIndex() + 1) + '/' +
-							vertex.getTotalNumberOfParallelSubtasks() + ") (" + attemptId + ')';
+						String taskname = vertex.getTaskNameWithSubtaskIndex()+ " (" + attemptId + ')';
 
 						markFailed(new Exception(
-							"Cannot deploy task " + taskname + " - TaskManager (" + assignedResourceLocation
+							"Cannot deploy task " + taskname + " - TaskManager (" + getAssignedResourceLocation()
 								+ ") not responding after a timeout of " + timeout, failure));
 					}
 					else {

http://git-wip-us.apache.org/repos/asf/flink/blob/10e4e321/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java
index 01d0654..956a2a2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java
@@ -219,7 +219,7 @@ public class TaskManagerLocation implements Comparable<TaskManagerLocation>, jav
 
 	@Override
 	public int compareTo(@Nonnull TaskManagerLocation o) {
-		// decide based on address first
+		// decide based on resource ID first
 		int resourceIdCmp = this.resourceID.getResourceIdString().compareTo(o.resourceID.getResourceIdString());
 		if (resourceIdCmp != 0) {
 			return resourceIdCmp;