You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/03 12:37:30 UTC
[4/6] flink git commit: [hotfix] Minor code cleanups in the
ExecutionGraph's Execution
[hotfix] Minor code cleanups in the ExecutionGraph's Execution
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/10e4e321
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/10e4e321
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/10e4e321
Branch: refs/heads/master
Commit: 10e4e321b335b6f9376501f90715e31b71b02da8
Parents: 2e107b1
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Jan 31 17:21:36 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Feb 3 10:28:23 2017 +0100
----------------------------------------------------------------------
.../flink/runtime/executiongraph/Execution.java | 24 ++++++++------------
.../taskmanager/TaskManagerLocation.java | 2 +-
2 files changed, 10 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/10e4e321/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 18a4445..c2fe5ea 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -49,6 +49,7 @@ import org.apache.flink.runtime.messages.StackTraceSampleResponse;
import org.apache.flink.runtime.state.TaskStateHandles;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.ExceptionUtils;
+
import org.slf4j.Logger;
import java.util.ArrayList;
@@ -70,6 +71,7 @@ import static org.apache.flink.runtime.execution.ExecutionState.FINISHED;
import static org.apache.flink.runtime.execution.ExecutionState.RUNNING;
import static org.apache.flink.runtime.execution.ExecutionState.SCHEDULED;
import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
/**
* A single execution of a vertex. While an {@link ExecutionVertex} can be executed multiple times (for recovery,
@@ -112,7 +114,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
private final Time timeout;
- private ConcurrentLinkedQueue<PartialInputChannelDeploymentDescriptor> partialInputChannelDeploymentDescriptors;
+ private final ConcurrentLinkedQueue<PartialInputChannelDeploymentDescriptor> partialInputChannelDeploymentDescriptors;
private volatile ExecutionState state = CREATED;
@@ -120,8 +122,6 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
private volatile Throwable failureCause; // once assigned, never changes
- private volatile TaskManagerLocation assignedResourceLocation; // for the archived execution
-
private TaskStateHandles taskStateHandles;
/** The executor which is used to execute futures. */
@@ -189,7 +189,8 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
@Override
public TaskManagerLocation getAssignedResourceLocation() {
- return assignedResourceLocation;
+ // returns non-null only when a location is already assigned
+ return assignedResource != null ? assignedResource.getTaskManagerLocation() : null;
}
public Throwable getFailureCause() {
@@ -226,11 +227,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
* @param checkpointStateHandles all checkpointed operator state
*/
public void setInitialState(TaskStateHandles checkpointStateHandles) {
-
- if (state != ExecutionState.CREATED) {
- throw new IllegalArgumentException("Can only assign operator state when execution attempt is in CREATED");
- }
-
+ checkState(state == CREATED, "Can only assign operator state when execution attempt is in CREATED");
this.taskStateHandles = checkpointStateHandles;
}
@@ -343,7 +340,6 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
throw new JobException("Could not assign the ExecutionVertex to the slot " + slot);
}
this.assignedResource = slot;
- this.assignedResourceLocation = slot.getTaskManagerLocation();
// race double check, did we fail/cancel and do we need to release the slot?
if (this.state != DEPLOYING) {
@@ -353,7 +349,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
if (LOG.isInfoEnabled()) {
LOG.info(String.format("Deploying %s (attempt #%d) to %s", vertex.getSimpleName(),
- attemptNumber, assignedResourceLocation.getHostname()));
+ attemptNumber, getAssignedResourceLocation().getHostname()));
}
final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor(
@@ -373,12 +369,10 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
@Override
public Void apply(Throwable failure) {
if (failure instanceof TimeoutException) {
- String taskname = vertex.getTaskName() + '(' +
- (getParallelSubtaskIndex() + 1) + '/' +
- vertex.getTotalNumberOfParallelSubtasks() + ") (" + attemptId + ')';
+ String taskname = vertex.getTaskNameWithSubtaskIndex()+ " (" + attemptId + ')';
markFailed(new Exception(
- "Cannot deploy task " + taskname + " - TaskManager (" + assignedResourceLocation
+ "Cannot deploy task " + taskname + " - TaskManager (" + getAssignedResourceLocation()
+ ") not responding after a timeout of " + timeout, failure));
}
else {
http://git-wip-us.apache.org/repos/asf/flink/blob/10e4e321/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java
index 01d0654..956a2a2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java
@@ -219,7 +219,7 @@ public class TaskManagerLocation implements Comparable<TaskManagerLocation>, jav
@Override
public int compareTo(@Nonnull TaskManagerLocation o) {
- // decide based on address first
+ // decide based on resource ID first
int resourceIdCmp = this.resourceID.getResourceIdString().compareTo(o.resourceID.getResourceIdString());
if (resourceIdCmp != 0) {
return resourceIdCmp;