You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/21 04:13:07 UTC
[43/63] [abbrv] git commit: Adjust ExecutionGraph state machine to
TaskManager's failing model (direct transitions to canceled)
Adjust ExecutionGraph state machine to TaskManager's failing model (direct transitions to canceled)
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/25acb6ba
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/25acb6ba
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/25acb6ba
Branch: refs/heads/master
Commit: 25acb6ba7724f40ac041a499b607ac0206eadc97
Parents: ae139f5
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Sep 11 16:31:50 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sat Sep 20 20:02:49 2014 +0200
----------------------------------------------------------------------
.../flink/core/memory/DataInputViewStream.java | 2 +-
.../deployment/TaskDeploymentDescriptor.java | 4 +
.../runtime/execution/RuntimeEnvironment.java | 62 +-
.../flink/runtime/executiongraph/Execution.java | 181 +++--
.../runtime/executiongraph/ExecutionGraph.java | 267 ++++----
.../executiongraph/ExecutionJobVertex.java | 4 +-
.../runtime/executiongraph/ExecutionVertex.java | 5 +-
.../executiongraph/IntermediateResult.java | 9 +
.../IntermediateResultPartition.java | 1 +
.../runtime/instance/InstanceDiedException.java | 25 +-
.../runtime/io/network/ChannelManager.java | 2 +-
.../apache/flink/runtime/jobgraph/JobGraph.java | 12 +
.../flink/runtime/jobgraph/JobStatus.java | 27 +-
.../runtime/jobmanager/EventCollector.java | 6 +
.../flink/runtime/jobmanager/JobManager.java | 20 +-
.../jobmanager/scheduler/DefaultScheduler.java | 413 ------------
.../runtime/jobmanager/scheduler/Scheduler.java | 447 +++++++++++++
.../apache/flink/runtime/taskmanager/Task.java | 35 +-
.../flink/runtime/taskmanager/TaskManager.java | 39 +-
.../runtime/util/ExecutorThreadFactory.java | 4 +-
.../ExecutionGraphDeploymentTest.java | 4 +-
.../ExecutionVertexCancelTest.java | 45 +-
.../ExecutionVertexSchedulingTest.java | 8 +-
.../runtime/jobmanager/JobManagerITCase.java | 670 ++++++++++++++++++-
.../scheduler/SchedulerIsolatedTasksTest.java | 12 +-
.../scheduler/SchedulerSlotSharingTest.java | 14 +-
.../runtime/taskmanager/TaskManagerTest.java | 14 -
.../util/EnvironmentInformationTest.java | 25 +-
.../runtime/util/KeyGroupedIteratorTest.java | 1 -
29 files changed, 1623 insertions(+), 735 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/25acb6ba/flink-core/src/main/java/org/apache/flink/core/memory/DataInputViewStream.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/DataInputViewStream.java b/flink-core/src/main/java/org/apache/flink/core/memory/DataInputViewStream.java
index 25835f5..8fe7b03 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/DataInputViewStream.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/DataInputViewStream.java
@@ -36,7 +36,7 @@ public final class DataInputViewStream extends InputStream {
return inputView.readByte();
}
- public int read(byte b[], int off, int len) throws IOException {
+ public int read(byte[] b, int off, int len) throws IOException {
inputView.readFully(b, off, len);
return len;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/25acb6ba/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
index 2d00f40..e1e80f9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
@@ -228,6 +228,10 @@ public final class TaskDeploymentDescriptor implements IOReadableWritable {
return inputGates;
}
+ public String[] getRequiredJarFiles() {
+ return requiredJarFiles;
+ }
+
// --------------------------------------------------------------------------------------------
// Serialization
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/25acb6ba/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
index eef081c..79a4aaa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
@@ -113,7 +113,7 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
/** The thread executing the task in the environment. */
- private volatile Thread executingThread;
+ private Thread executingThread;
/**
* The RPC proxy to report accumulators to JobManager
@@ -221,7 +221,6 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
@Override
public void run() {
-
// quick fail in case the task was cancelled while the tread was started
if (owner.isCanceled()) {
owner.cancelingDone();
@@ -238,6 +237,7 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
}
}
catch (Throwable t) {
+
if (!this.owner.isCanceled()) {
// Perform clean up when the task failed and has been not canceled by the user
@@ -260,7 +260,7 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
return;
}
-
+
try {
// If there is any unclosed input gate, close it and propagate close operation to corresponding output gate
closeInputGates();
@@ -275,6 +275,7 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
waitForOutputChannelsToBeClosed();
}
catch (Throwable t) {
+
// Release all resources that may currently be allocated by the individual channels
releaseAllChannelResources();
@@ -396,37 +397,38 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
}
final Thread executingThread = this.executingThread;
-
- // interrupt the running thread and wait for it to die
- executingThread.interrupt();
-
- try {
- executingThread.join(5000);
- } catch (InterruptedException e) {}
-
- if (!executingThread.isAlive()) {
- return;
- }
-
- // Continuously interrupt the user thread until it changed to state CANCELED
- while (executingThread != null && executingThread.isAlive()) {
- LOG.warn("Task " + owner.getTaskNameWithSubtasks() + " did not react to cancelling signal. Sending repeated interrupt.");
-
- if (LOG.isDebugEnabled()) {
- StringBuilder bld = new StringBuilder("Task ").append(owner.getTaskNameWithSubtasks()).append(" is stuck in method:\n");
-
- StackTraceElement[] stack = executingThread.getStackTrace();
- for (StackTraceElement e : stack) {
- bld.append(e).append('\n');
- }
- LOG.debug(bld.toString());
- }
-
+ if (executingThread != null) {
+ // interrupt the running thread and wait for it to die
executingThread.interrupt();
try {
- executingThread.join(1000);
+ executingThread.join(5000);
} catch (InterruptedException e) {}
+
+ if (!executingThread.isAlive()) {
+ return;
+ }
+
+ // Continuously interrupt the user thread until it changed to state CANCELED
+ while (executingThread != null && executingThread.isAlive()) {
+ LOG.warn("Task " + owner.getTaskNameWithSubtasks() + " did not react to cancelling signal. Sending repeated interrupt.");
+
+ if (LOG.isDebugEnabled()) {
+ StringBuilder bld = new StringBuilder("Task ").append(owner.getTaskNameWithSubtasks()).append(" is stuck in method:\n");
+
+ StackTraceElement[] stack = executingThread.getStackTrace();
+ for (StackTraceElement e : stack) {
+ bld.append(e).append('\n');
+ }
+ LOG.debug(bld.toString());
+ }
+
+ executingThread.interrupt();
+
+ try {
+ executingThread.join(1000);
+ } catch (InterruptedException e) {}
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/25acb6ba/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 8cfc7fd..c290883 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -18,7 +18,14 @@
package org.apache.flink.runtime.executiongraph;
-import static org.apache.flink.runtime.execution.ExecutionState.*;
+import static org.apache.flink.runtime.execution.ExecutionState.CANCELED;
+import static org.apache.flink.runtime.execution.ExecutionState.CANCELING;
+import static org.apache.flink.runtime.execution.ExecutionState.CREATED;
+import static org.apache.flink.runtime.execution.ExecutionState.DEPLOYING;
+import static org.apache.flink.runtime.execution.ExecutionState.FAILED;
+import static org.apache.flink.runtime.execution.ExecutionState.FINISHED;
+import static org.apache.flink.runtime.execution.ExecutionState.RUNNING;
+import static org.apache.flink.runtime.execution.ExecutionState.SCHEDULED;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
@@ -27,9 +34,9 @@ import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.AllocatedSlot;
import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.jobmanager.scheduler.DefaultScheduler;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFuture;
import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFutureAction;
import org.apache.flink.runtime.taskmanager.TaskOperationResult;
@@ -124,6 +131,10 @@ public class Execution {
return failureCause;
}
+ public long[] getStateTimestamps() {
+ return stateTimestamps;
+ }
+
public long getStateTimestamp(ExecutionState state) {
return this.stateTimestamps[state.ordinal()];
}
@@ -146,16 +157,13 @@ public class Execution {
* @throws IllegalStateException Thrown, if the vertex is not in CREATED state, which is the only state that permits scheduling.
* @throws NoResourceAvailableException Thrown is no queued scheduling is allowed and no resources are currently available.
*/
- public void scheduleForExecution(DefaultScheduler scheduler, boolean queued) throws NoResourceAvailableException {
+ public void scheduleForExecution(Scheduler scheduler, boolean queued) throws NoResourceAvailableException {
if (scheduler == null) {
throw new NullPointerException();
}
if (transitionState(CREATED, SCHEDULED)) {
- // record that we were scheduled
- vertex.notifyStateTransition(attemptId, SCHEDULED, null);
-
ScheduledUnit toSchedule = new ScheduledUnit(this, vertex.getJobVertex().getSlotSharingGroup());
// IMPORTANT: To prevent leaks of cluster resources, we need to make sure that slots are returned
@@ -221,8 +229,6 @@ public class Execution {
// this should actually not happen and indicates a race somewhere else
throw new IllegalStateException("Cannot deploy task: Concurrent deployment call race.");
}
-
- vertex.notifyStateTransition(attemptId, DEPLOYING, null);
}
else {
// vertex may have been cancelled, or it was already scheduled
@@ -236,9 +242,15 @@ public class Execution {
}
this.assignedResource = slot;
+ // race double check, did we fail/cancel and do we need to release the slot?
+ if (this.state != DEPLOYING) {
+ slot.releaseSlot();
+ return;
+ }
+
final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor(attemptId, slot);
- // register this execution at the execution graph, to receive callbacks
+ // register this execution at the execution graph, to receive call backs
vertex.getExecutionGraph().registerExecution(this);
// we execute the actual deploy call in a concurrent action to prevent this call from blocking for long
@@ -300,7 +312,6 @@ public class Execution {
else if (current == RUNNING || current == DEPLOYING) {
// try to transition to canceling, if successful, send the cancel call
if (transitionState(current, CANCELING)) {
- vertex.notifyStateTransition(attemptId, CANCELING, null);
sendCancelRpcCall();
return;
}
@@ -318,7 +329,7 @@ public class Execution {
// we skip the canceling state. set the timestamp, for a consistent appearance
markTimestamp(CANCELING, getStateTimestamp(CANCELED));
- vertex.notifyStateTransition(attemptId, CANCELED, null);
+ vertex.executionCanceled();
return;
}
// else: fall through the loop
@@ -336,11 +347,7 @@ public class Execution {
* @param t The exception that caused the task to fail.
*/
public void fail(Throwable t) {
- if (processFail(t, false)) {
- if (LOG.isErrorEnabled()) {
- LOG.error("Task " + getVertexWithAttempt() + " was failed.", t);
- }
- }
+ processFail(t, false);
}
// --------------------------------------------------------------------------------------------
@@ -355,15 +362,11 @@ public class Execution {
* @param t The exception that caused the task to fail.
*/
void markFailed(Throwable t) {
- // the call returns true if it actually made the state transition (was not already failed before, etc)
- if (processFail(t, true)) {
- if (LOG.isErrorEnabled()) {
- LOG.error("Task " + getVertexWithAttempt() + " failed.", t);
- }
- }
+ processFail(t, true);
}
void markFinished() {
+
// this call usually comes during RUNNING, but may also come while still in deploying (very fast tasks!)
while (true) {
ExecutionState current = this.state;
@@ -372,7 +375,6 @@ public class Execution {
if (transitionState(current, FINISHED)) {
try {
- vertex.notifyStateTransition(attemptId, FINISHED, null);
vertex.executionFinished();
return;
}
@@ -382,41 +384,60 @@ public class Execution {
}
}
}
- else {
- if (current == CANCELED || current == CANCELING || current == FAILED) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Task FINISHED, but concurrently went to state " + state);
- }
- return;
- }
- else {
- // this should not happen, we need to fail this
- markFailed(new Exception("Vertex received FINISHED message while being in state " + state));
- return;
+ else if (current == CANCELING) {
+ // we sent a cancel call, and the task manager finished before it arrived. We
+ // will never get a CANCELED call back from the job manager
+ cancelingComplete();
+ return;
+ }
+ else if (current == CANCELED || current == FAILED) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Task FINISHED, but concurrently went to state " + state);
}
+ return;
+ }
+ else {
+ // this should not happen, we need to fail this
+ markFailed(new Exception("Vertex received FINISHED message while being in state " + state));
+ return;
}
}
}
void cancelingComplete() {
- if (transitionState(CANCELING, CANCELED)) {
- try {
- vertex.executionCanceled();
- vertex.notifyStateTransition(attemptId, CANCELED, null);
- }
- finally {
- vertex.getExecutionGraph().deregisterExecution(this);
- assignedResource.releaseSlot();
+
+ // the taskmanagers can themselves cancel tasks without an external trigger, if they find that the
+ // network stack is canceled (for example by a failing / canceling receiver or sender
+ // this is an artifact of the old network runtime, but for now we need to support task transitions
+ // from running directly to canceled
+
+ while (true) {
+ ExecutionState current = this.state;
+
+ if (current == CANCELED) {
+ return;
}
- }
- else {
- ExecutionState actualState = this.state;
- // failing in the meantime may happen and is no problem.
- // anything else is a serious problem !!!
- if (actualState != FAILED) {
- String message = String.format("Asynchronous race: Found state %s after successful cancel call.", state);
- LOG.error(message);
- vertex.getExecutionGraph().fail(new Exception(message));
+ else if (current == CANCELING || current == RUNNING) {
+ if (transitionState(current, CANCELED)) {
+ try {
+ vertex.executionCanceled();
+ }
+ finally {
+ vertex.getExecutionGraph().deregisterExecution(this);
+ assignedResource.releaseSlot();
+ }
+ return;
+ }
+ }
+ else {
+ // failing in the meantime may happen and is no problem.
+ // anything else is a serious problem !!!
+ if (current != FAILED) {
+ String message = String.format("Asynchronous race: Found state %s after successful cancel call.", state);
+ LOG.error(message);
+ vertex.getExecutionGraph().fail(new Exception(message));
+ }
+ return;
}
}
}
@@ -440,17 +461,29 @@ public class Execution {
return false;
}
- if (current == CANCELED || (current == CANCELING && isCallback)) {
+ if (current == CANCELED) {
// we are already aborting or are already aborted
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Ignoring transition of vertex %s to %s while being %s",
- getVertexWithAttempt(), FAILED, current));
+ getVertexWithAttempt(), FAILED, CANCELED));
}
return false;
}
- if (transitionState(current, FAILED)) {
+ if (transitionState(current, FAILED, t)) {
// success (in a manner of speaking)
+ this.failureCause = t;
+
+ try {
+ vertex.getExecutionGraph().deregisterExecution(this);
+ vertex.executionFailed(t);
+ }
+ finally {
+ if (assignedResource != null) {
+ assignedResource.releaseSlot();
+ }
+ }
+
if (!isCallback && (current == RUNNING || current == DEPLOYING)) {
if (LOG.isDebugEnabled()) {
@@ -467,29 +500,16 @@ public class Execution {
}
}
- try {
- this.failureCause = t;
- vertex.executionFailed(t);
- vertex.notifyStateTransition(attemptId, FAILED, t);
- }
- finally {
- if (assignedResource != null) {
- assignedResource.releaseSlot();
- }
- vertex.getExecutionGraph().deregisterExecution(this);
- }
-
// leave the loop
return true;
}
}
}
- private void switchToRunning() {
+ private boolean switchToRunning() {
- // transition state, the common case
if (transitionState(DEPLOYING, RUNNING)) {
- vertex.notifyStateTransition(attemptId, RUNNING, null);
+ return true;
}
else {
// something happened while the call was in progress.
@@ -501,10 +521,10 @@ public class Execution {
ExecutionState currentState = this.state;
if (currentState == FINISHED || currentState == CANCELED) {
- // do nothing, this is nice, the task was really fast
+ // do nothing, the task was really fast (nice)
+ // or it was canceled really fast
}
-
- if (currentState == CANCELING || currentState == FAILED) {
+ else if (currentState == CANCELING || currentState == FAILED) {
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Concurrent canceling/failing of %s while deployment was in progress.", getVertexWithAttempt()));
}
@@ -524,13 +544,15 @@ public class Execution {
// record the failure
markFailed(new Exception(message));
}
+
+ return false;
}
}
private void sendCancelRpcCall() {
final AllocatedSlot slot = this.assignedResource;
if (slot == null) {
- throw new IllegalStateException("Cannot cancel when task was not running or deployed.");
+ return;
}
Runnable cancelAction = new Runnable() {
@@ -578,8 +600,21 @@ public class Execution {
// --------------------------------------------------------------------------------------------
private boolean transitionState(ExecutionState currentState, ExecutionState targetState) {
+ return transitionState(currentState, targetState, null);
+ }
+
+ private boolean transitionState(ExecutionState currentState, ExecutionState targetState, Throwable error) {
if (STATE_UPDATER.compareAndSet(this, currentState, targetState)) {
markTimestamp(targetState);
+
+ // make sure that the state transition completes normally.
+ // potential errors (in listeners may not affect the main logic)
+ try {
+ vertex.notifyStateTransition(attemptId, targetState, error);
+ }
+ catch (Throwable t) {
+ LOG.error("Error while notifying execution graph of execution state trnsition.", t);
+ }
return true;
} else {
return false;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/25acb6ba/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 72525e9..3dab13e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.executiongraph;
+import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -37,15 +38,17 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.execution.ExecutionListener;
import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
import org.apache.flink.runtime.io.network.ConnectionInfoLookupResponse;
+import org.apache.flink.runtime.io.network.RemoteReceiver;
import org.apache.flink.runtime.io.network.channels.ChannelID;
import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmanager.scheduler.DefaultScheduler;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.util.ExceptionUtils;
@@ -105,7 +108,9 @@ public class ExecutionGraph {
private volatile JobStatus state = JobStatus.CREATED;
- private boolean allowQueuedScheduling = false;
+ private Scheduler scheduler;
+
+ private boolean allowQueuedScheduling = true;
public ExecutionGraph(JobID jobId, String jobName, Configuration jobConfig) {
@@ -265,12 +270,17 @@ public class ExecutionGraph {
// Actions
// --------------------------------------------------------------------------------------------
- public void scheduleForExecution(DefaultScheduler scheduler) throws JobException {
+ public void scheduleForExecution(Scheduler scheduler) throws JobException {
if (scheduler == null) {
throw new IllegalArgumentException("Scheduler must not be null.");
}
+ if (this.scheduler != null && this.scheduler != scheduler) {
+ throw new IllegalArgumentException("Cann not use different schedulers for the same job");
+ }
+
if (transitionState(JobStatus.CREATED, JobStatus.RUNNING)) {
+ this.scheduler = scheduler;
// initially, we simply take the ones without inputs.
// next, we implement the logic to go back from vertices that need computation
@@ -289,32 +299,35 @@ public class ExecutionGraph {
public void cancel() {
while (true) {
JobStatus current = state;
+
if (current == JobStatus.RUNNING || current == JobStatus.CREATED) {
if (transitionState(current, JobStatus.CANCELLING)) {
for (ExecutionJobVertex ejv : verticesInCreationOrder) {
ejv.cancel();
}
+ return;
}
}
-
- // no need to treat other states
+ else {
+ // no need to treat other states
+ return;
+ }
}
}
public void fail(Throwable t) {
- if (LOG.isErrorEnabled()) {
- LOG.error(String.format("Failing ExecutionGraph %s (%s): ", getJobID(), getJobName()), t);
- }
-
while (true) {
JobStatus current = state;
- if (current != JobStatus.FAILED) {
- if (transitionState(current, JobStatus.FAILED, t)) {
- // cancel all. what is failed will not cancel but stay failed
- for (ExecutionJobVertex ejv : verticesInCreationOrder) {
- ejv.cancel();
- }
+ if (current == JobStatus.FAILED || current == JobStatus.FAILING) {
+ return;
+ }
+ else if (transitionState(current, JobStatus.FAILING, t)) {
+ // cancel all. what is failed will not cancel but stay failed
+ for (ExecutionJobVertex ejv : verticesInCreationOrder) {
+ ejv.cancel();
}
+
+ return;
}
// no need to treat other states
@@ -362,6 +375,7 @@ public class ExecutionGraph {
nextVertexToFinish = nextPos;
if (nextPos == verticesInCreationOrder.size()) {
+
// we are done, transition to the final state
while (true) {
@@ -372,7 +386,7 @@ public class ExecutionGraph {
if (current == JobStatus.CANCELLING && transitionState(current, JobStatus.CANCELED)) {
break;
}
- if (current == JobStatus.FAILED) {
+ if (current == JobStatus.FAILING && transitionState(current, JobStatus.FAILED)) {
break;
}
if (current == JobStatus.CANCELED || current == JobStatus.CREATED || current == JobStatus.FINISHED) {
@@ -417,99 +431,118 @@ public class ExecutionGraph {
}
public ConnectionInfoLookupResponse lookupConnectionInfoAndDeployReceivers(InstanceConnectionInfo caller, ChannelID sourceChannelID) {
- //TODO
- return null;
-// final InternalJobStatus jobStatus = eg.getJobStatus();
-// if (jobStatus == InternalJobStatus.FAILING || jobStatus == InternalJobStatus.CANCELING) {
-// return ConnectionInfoLookupResponse.createJobIsAborting();
-// }
-//
-// final ExecutionEdge edge = eg.getEdgeByID(sourceChannelID);
-// if (edge == null) {
-// LOG.error("Cannot find execution edge associated with ID " + sourceChannelID);
-// return ConnectionInfoLookupResponse.createReceiverNotFound();
-// }
-//
-// if (sourceChannelID.equals(edge.getInputChannelID())) {
-// // Request was sent from an input channel
-//
-// final ExecutionVertex connectedVertex = edge.getOutputGate().getVertex();
-//
-// final Instance assignedInstance = connectedVertex.getAllocatedResource().getInstance();
-// if (assignedInstance == null) {
-// LOG.error("Cannot resolve lookup: vertex found for channel ID " + edge.getOutputGateIndex()
-// + " but no instance assigned");
-// // LOG.info("Created receiverNotReady for " + connectedVertex + " 1");
-// return ConnectionInfoLookupResponse.createReceiverNotReady();
-// }
-//
-// // Check execution state
-// final ExecutionState executionState = connectedVertex.getExecutionState();
-// if (executionState == ExecutionState.FINISHED) {
-// // that should not happen. if there is data pending, the receiver cannot be ready
-// return ConnectionInfoLookupResponse.createReceiverNotFound();
-// }
-//
-// // running is common, finishing is happens when the lookup is for the close event
-// if (executionState != ExecutionState.RUNNING && executionState != ExecutionState.FINISHING) {
-// // LOG.info("Created receiverNotReady for " + connectedVertex + " in state " + executionState + " 2");
-// return ConnectionInfoLookupResponse.createReceiverNotReady();
-// }
-//
-// if (assignedInstance.getInstanceConnectionInfo().equals(caller)) {
-// // Receiver runs on the same task manager
-// return ConnectionInfoLookupResponse.createReceiverFoundAndReady(edge.getOutputChannelID());
-// } else {
-// // Receiver runs on a different task manager
-//
-// final InstanceConnectionInfo ici = assignedInstance.getInstanceConnectionInfo();
-// final InetSocketAddress isa = new InetSocketAddress(ici.address(), ici.dataPort());
-//
-// return ConnectionInfoLookupResponse.createReceiverFoundAndReady(new RemoteReceiver(isa, edge.getConnectionID()));
-// }
-// }
-// // else, the request is for an output channel
-// // Find vertex of connected input channel
-// final ExecutionVertex targetVertex = edge.getInputGate().getVertex();
-//
-// // Check execution state
-// final ExecutionState executionState = targetVertex.getExecutionState();
-//
-// // check whether the task needs to be deployed
-// if (executionState != ExecutionState.RUNNING && executionState != ExecutionState.FINISHING && executionState != ExecutionState.FINISHED) {
-//
-// if (executionState == ExecutionState.ASSIGNED) {
-// final Runnable command = new Runnable() {
-// @Override
-// public void run() {
-// scheduler.deployAssignedVertices(targetVertex);
-// }
-// };
-// eg.executeCommand(command);
-// }
-//
-// // LOG.info("Created receiverNotReady for " + targetVertex + " in state " + executionState + " 3");
-// return ConnectionInfoLookupResponse.createReceiverNotReady();
-// }
-//
-// final Instance assignedInstance = targetVertex.getAllocatedResource().getInstance();
-// if (assignedInstance == null) {
-// LOG.error("Cannot resolve lookup: vertex found for channel ID " + edge.getInputChannelID() + " but no instance assigned");
-// // LOG.info("Created receiverNotReady for " + targetVertex + " in state " + executionState + " 4");
-// return ConnectionInfoLookupResponse.createReceiverNotReady();
-// }
-//
-// if (assignedInstance.getInstanceConnectionInfo().equals(caller)) {
-// // Receiver runs on the same task manager
-// return ConnectionInfoLookupResponse.createReceiverFoundAndReady(edge.getInputChannelID());
-// } else {
-// // Receiver runs on a different task manager
-// final InstanceConnectionInfo ici = assignedInstance.getInstanceConnectionInfo();
-// final InetSocketAddress isa = new InetSocketAddress(ici.address(), ici.dataPort());
-//
-// return ConnectionInfoLookupResponse.createReceiverFoundAndReady(new RemoteReceiver(isa, edge.getConnectionID()));
-// }
+ final ExecutionEdge edge = edges.get(sourceChannelID);
+ if (edge == null) {
+ // that is bad, we need to fail the job
+ LOG.error("Cannot find execution edge associated with ID " + sourceChannelID);
+ fail(new Exception("Channels are not correctly registered"));
+ return ConnectionInfoLookupResponse.createReceiverNotFound();
+ }
+
+
+ // ----- Request was sent from an input channel (receiver side), requesting the output channel (sender side) ------
+ // ----- This is the case for backwards events ------
+
+ if (sourceChannelID.equals(edge.getInputChannelId())) {
+ final ExecutionVertex targetVertex = edge.getSource().getProducer();
+ final ExecutionState executionState = targetVertex.getExecutionState();
+
+ // common case - found task running
+ if (executionState == ExecutionState.RUNNING) {
+ Instance location = targetVertex.getCurrentAssignedResource().getInstance();
+
+ if (location.getInstanceConnectionInfo().equals(caller)) {
+ // Receiver runs on the same task manager
+ return ConnectionInfoLookupResponse.createReceiverFoundAndReady(edge.getOutputChannelId());
+ }
+ else {
+ // Receiver runs on a different task manager
+ final InstanceConnectionInfo ici = location.getInstanceConnectionInfo();
+ final InetSocketAddress isa = new InetSocketAddress(ici.address(), ici.dataPort());
+
+ int connectionIdx = edge.getSource().getIntermediateResult().getConnectionIndex();
+ return ConnectionInfoLookupResponse.createReceiverFoundAndReady(new RemoteReceiver(isa, connectionIdx));
+ }
+ }
+ else if (executionState == ExecutionState.FINISHED) {
+ // that should not happen. if there is data pending, the sender cannot yet be done
+ // we need to fail the whole affair
+ LOG.error("Receiver " + targetVertex + " set to FINISHED even though data is pending");
+ fail(new Exception("Channels are not correctly registered"));
+ return ConnectionInfoLookupResponse.createReceiverNotFound();
+ }
+ else if (executionState == ExecutionState.FAILED || executionState == ExecutionState.CANCELED ||
+ executionState == ExecutionState.CANCELING)
+ {
+ return ConnectionInfoLookupResponse.createJobIsAborting();
+ }
+ else {
+ // all other states should not be, because the sender cannot be in CREATED, SCHEDULED, or DEPLOYING
+ // state when the receiver is already running
+ LOG.error("Channel lookup (backwards) - sender " + targetVertex + " found in inconsistent state " + executionState);
+ fail(new Exception("Channels are not correctly registered"));
+ return ConnectionInfoLookupResponse.createReceiverNotFound();
+ }
+ }
+
+ // ----- Request was sent from an output channel (sender side), requesting the input channel (receiver side) ------
+ // ----- This is the case for forward data ------
+
+ final ExecutionVertex targetVertex = edge.getTarget();
+ final ExecutionState executionState = targetVertex.getExecutionState();
+
+ if (executionState == ExecutionState.RUNNING) {
+
+ // already online
+ Instance location = targetVertex.getCurrentAssignedResource().getInstance();
+
+ if (location.getInstanceConnectionInfo().equals(caller)) {
+ // Receiver runs on the same task manager
+ return ConnectionInfoLookupResponse.createReceiverFoundAndReady(edge.getInputChannelId());
+ }
+ else {
+ // Receiver runs on a different task manager
+ final InstanceConnectionInfo ici = location.getInstanceConnectionInfo();
+ final InetSocketAddress isa = new InetSocketAddress(ici.address(), ici.dataPort());
+
+ final int connectionIdx = edge.getSource().getIntermediateResult().getConnectionIndex();
+ return ConnectionInfoLookupResponse.createReceiverFoundAndReady(new RemoteReceiver(isa, connectionIdx));
+ }
+ }
+ else if (executionState == ExecutionState.DEPLOYING || executionState == ExecutionState.SCHEDULED) {
+ return ConnectionInfoLookupResponse.createReceiverNotReady();
+ }
+ else if (executionState == ExecutionState.CREATED) {
+ // bring the receiver online
+ try {
+ edge.getTarget().scheduleForExecution(scheduler, false);
+
+ // delay the requester
+ return ConnectionInfoLookupResponse.createReceiverNotReady();
+ }
+ catch (JobException e) {
+ fail(new Exception("Cannot schedule the receivers, not enough resources."));
+ return ConnectionInfoLookupResponse.createJobIsAborting();
+ }
+ }
+ else if (executionState == ExecutionState.CANCELED || executionState == ExecutionState.CANCELING ||
+ executionState == ExecutionState.FAILED)
+ {
+ return ConnectionInfoLookupResponse.createJobIsAborting();
+ }
+ else {
+ // illegal state for all other states - or all the other state, since the only remaining state is FINISHED
+ // state when the receiver is already running
+ String message = "Channel lookup (forward) - receiver " + targetVertex + " found in inconsistent state " + executionState;
+ LOG.error(message);
+ fail(new Exception(message));
+ return ConnectionInfoLookupResponse.createReceiverNotFound();
+ }
+ }
+
+ public Map<ExecutionAttemptID, Execution> getRegisteredExecutions() {
+ return Collections.unmodifiableMap(currentExecutions);
}
void registerExecution(Execution exec) {
@@ -521,18 +554,15 @@ public class ExecutionGraph {
void deregisterExecution(Execution exec) {
Execution contained = currentExecutions.remove(exec.getAttemptId());
+
if (contained != null && contained != exec) {
fail(new Exception("De-registering execution " + exec + " failed. Found for same ID execution " + contained));
}
}
- Map<ExecutionAttemptID, Execution> getRegisteredExecutions() {
- return Collections.unmodifiableMap(currentExecutions);
- }
-
void registerExecutionEdge(ExecutionEdge edge) {
- ChannelID source = edge.getInputChannelId();
ChannelID target = edge.getInputChannelId();
+ ChannelID source = edge.getOutputChannelId();
edges.put(source, edge);
edges.put(target, edge);
}
@@ -581,6 +611,7 @@ public class ExecutionGraph {
*/
void notifyExecutionChange(JobVertexID vertexId, int subtask, ExecutionAttemptID executionId, ExecutionState newExecutionState, Throwable error) {
+ // we must be very careful here with exceptions
if (this.executionListeners.size() > 0) {
String message = error == null ? null : ExceptionUtils.stringifyException(error);
@@ -592,11 +623,11 @@ public class ExecutionGraph {
LOG.error("Notification of execution state change caused an error.", t);
}
}
-
- // see what this means for us. currently, the first FAILED state means -> FAILED
- if (newExecutionState == ExecutionState.FAILED) {
- fail(error);
- }
+ }
+
+ // see what this means for us. currently, the first FAILED state means -> FAILED
+ if (newExecutionState == ExecutionState.FAILED) {
+ fail(error);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/25acb6ba/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 440566d..1884ce0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -32,7 +32,7 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmanager.scheduler.DefaultScheduler;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
@@ -225,7 +225,7 @@ public class ExecutionJobVertex {
// Actions
//---------------------------------------------------------------------------------------------
- public void scheduleAll(DefaultScheduler scheduler, boolean queued) throws NoResourceAvailableException {
+ public void scheduleAll(Scheduler scheduler, boolean queued) throws NoResourceAvailableException {
for (ExecutionVertex ev : getTaskVertices()) {
ev.scheduleForExecution(scheduler, queued);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/25acb6ba/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index bbc0c97..3c65f2e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -28,7 +28,6 @@ import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import org.slf4j.Logger;
-
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.deployment.GateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
@@ -39,7 +38,7 @@ import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmanager.scheduler.DefaultScheduler;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
/**
@@ -305,7 +304,7 @@ public class ExecutionVertex {
}
}
- public void scheduleForExecution(DefaultScheduler scheduler, boolean queued) throws NoResourceAvailableException {
+ public void scheduleForExecution(Scheduler scheduler, boolean queued) throws NoResourceAvailableException {
this.currentExecution.scheduleForExecution(scheduler, queued);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/25acb6ba/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
index 0b822ab..f770b87 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
@@ -34,6 +34,8 @@ public class IntermediateResult {
private int numConsumers;
+ private final int connectionIndex;
+
public IntermediateResult(IntermediateDataSetID id, ExecutionJobVertex producer, int numParallelProducers) {
this.id = id;
@@ -43,6 +45,9 @@ public class IntermediateResult {
// we do not set the intermediate result partitions here, because we let them be initialized by
// the execution vertex that produces them
+
+ // assign a random connection index
+ this.connectionIndex = (int) (Math.random() * Integer.MAX_VALUE);
}
public void setPartition(int partitionNumber, IntermediateResultPartition partition) {
@@ -87,4 +92,8 @@ public class IntermediateResult {
}
return index;
}
+
+ public int getConnectionIndex() {
+ return connectionIndex;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/25acb6ba/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
index 1c4e1fb..1cc5e13 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
@@ -39,6 +39,7 @@ public class IntermediateResultPartition {
this.consumers = new ArrayList<List<ExecutionEdge>>(0);
}
+
public ExecutionVertex getProducer() {
return producer;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/25acb6ba/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceDiedException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceDiedException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceDiedException.java
index 42b9817..69e41d2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceDiedException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceDiedException.java
@@ -1,17 +1,20 @@
-/***********************************************************************************************************************
- *
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- *
- **********************************************************************************************************************/
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.flink.runtime.instance;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/25acb6ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
index 3066bb5..602f88b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
@@ -366,7 +366,7 @@ public class ChannelManager implements EnvelopeDispatcher, BufferProviderBroker
}
else if (lookupResponse.receiverNotReady()) {
try {
- Thread.sleep(500);
+ Thread.sleep(100);
} catch (InterruptedException e) {
if (reportException) {
throw new IOException("Lookup was interrupted.");
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/25acb6ba/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index f8b5ab9..85978fe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -71,6 +71,8 @@ public class JobGraph implements IOReadableWritable {
/** Name of this job. */
private String jobName;
+ private boolean allowQueuedScheduling;
+
// --------------------------------------------------------------------------------------------
/**
@@ -162,6 +164,14 @@ public class JobGraph implements IOReadableWritable {
public Configuration getJobConfiguration() {
return this.jobConfiguration;
}
+
+ public void setAllowQueuedScheduling(boolean allowQueuedScheduling) {
+ this.allowQueuedScheduling = allowQueuedScheduling;
+ }
+
+ public boolean getAllowQueuedScheduling() {
+ return allowQueuedScheduling;
+ }
/**
* Adds a new task vertex to the job graph if it is not already included.
@@ -304,6 +314,7 @@ public class JobGraph implements IOReadableWritable {
this.jobID.read(in);
this.jobName = StringValue.readString(in);
this.jobConfiguration.read(in);
+ this.allowQueuedScheduling = in.readBoolean();
final int numVertices = in.readInt();
@@ -332,6 +343,7 @@ public class JobGraph implements IOReadableWritable {
this.jobID.write(out);
StringValue.writeString(this.jobName, out);
this.jobConfiguration.write(out);
+ out.writeBoolean(allowQueuedScheduling);
// write the task vertices using java serialization (to resolve references in the object graph)
out.writeInt(taskVertices.size());
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/25acb6ba/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java
index 60b2edf..f5a2e9c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java
@@ -24,20 +24,35 @@ package org.apache.flink.runtime.jobgraph;
public enum JobStatus {
/** Job is newly created, no task has started to run. */
- CREATED,
+ CREATED(false),
/** Some tasks are scheduled or running, some may be pending, some may be finished. */
- RUNNING,
+ RUNNING(false),
+ /** The job has failed and is currently waiting for the cleanup to complete */
+ FAILING(false),
+
/** The job has failed to to non-recoverable task failure */
- FAILED,
+ FAILED(true),
/** Job is being cancelled */
- CANCELLING,
+ CANCELLING(false),
/** Job has been cancelled */
- CANCELED,
+ CANCELED(true),
/** All of the job's tasks have successfully finished. */
- FINISHED
+ FINISHED(true);
+
+ // --------------------------------------------------------------------------------------------
+
+ private final boolean terminalState;
+
+ private JobStatus(boolean terminalState) {
+ this.terminalState = terminalState;
+ }
+
+ public boolean isTerminalState() {
+ return terminalState;
+ }
};
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/25acb6ba/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/EventCollector.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/EventCollector.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/EventCollector.java
index a45507a..551dce2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/EventCollector.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/EventCollector.java
@@ -26,6 +26,8 @@ import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.flink.runtime.event.job.AbstractEvent;
import org.apache.flink.runtime.event.job.ExecutionStateChangeEvent;
import org.apache.flink.runtime.event.job.JobEvent;
@@ -52,6 +54,8 @@ import org.apache.flink.runtime.profiling.types.ProfilingEvent;
*/
public final class EventCollector extends TimerTask implements ProfilingListener {
+ private static final Log LOG = LogFactory.getLog(EventCollector.class);
+
/**
* The execution listener wrapper is an auxiliary class. It is required
* because the job vertex ID and the management vertex ID cannot be accessed from
@@ -93,6 +97,8 @@ public final class EventCollector extends TimerTask implements ProfilingListener
executionId, newExecutionState);
this.eventCollector.addEvent(jobID, executionStateChangeEvent);
+
+ LOG.info(vertexEvent.toString());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/25acb6ba/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
index d3a920c..3526e15 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
@@ -79,7 +79,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.accumulators.AccumulatorManager;
import org.apache.flink.runtime.jobmanager.archive.ArchiveListener;
import org.apache.flink.runtime.jobmanager.archive.MemoryArchivist;
-import org.apache.flink.runtime.jobmanager.scheduler.DefaultScheduler;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmanager.web.WebInfoServer;
import org.apache.flink.runtime.protocols.AccumulatorProtocol;
import org.apache.flink.runtime.protocols.ChannelLookupProtocol;
@@ -122,7 +122,7 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
private final InstanceManager instanceManager;
/** Assigns tasks to slots and keeps track on available and allocated task slots*/
- private final DefaultScheduler scheduler;
+ private final Scheduler scheduler;
/** The currently running jobs */
private final ConcurrentHashMap<JobID, ExecutionGraph> currentJobs;
@@ -143,7 +143,6 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
private volatile boolean isShutDown;
-
private WebInfoServer server;
@@ -222,7 +221,7 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
}
// create the scheduler and make it listen at the availability of new instances
- this.scheduler = new DefaultScheduler();
+ this.scheduler = new Scheduler(this.executorService);
this.instanceManager.addInstanceListener(this.scheduler);
}
@@ -349,6 +348,9 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Successfully created execution graph from job graph %s (%s)", job.getJobID(), job.getName()));
}
+
+ // should the job fail if a vertex cannot be deployed immediately (streams, closed iterations)
+ executionGraph.setQueuedSchedulingAllowed(job.getAllowQueuedScheduling());
// Register job with the progress collector
if (this.eventCollector != null) {
@@ -461,12 +463,12 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
if (LOG.isInfoEnabled()) {
String message = optionalMessage == null ? "." : ": " + optionalMessage;
- LOG.info(String.format("Status of job %s (%s) changed to %s%s",
+ LOG.info(String.format("Job %s (%s) switched to %s%s",
jid, executionGraph.getJobName(), newJobStatus, message));
}
// remove the job graph if the state is any terminal state
- if (newJobStatus == JobStatus.FINISHED || newJobStatus == JobStatus.CANCELED || newJobStatus == JobStatus.FAILED) {
+ if (newJobStatus.isTerminalState()) {
this.currentJobs.remove(jid);
try {
@@ -643,7 +645,11 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
@Override
public InstanceID registerTaskManager(InstanceConnectionInfo instanceConnectionInfo, HardwareDescription hardwareDescription, int numberOfSlots) {
- return this.instanceManager.registerTaskManager(instanceConnectionInfo, hardwareDescription, numberOfSlots);
+ if (this.instanceManager != null) {
+ return this.instanceManager.registerTaskManager(instanceConnectionInfo, hardwareDescription, numberOfSlots);
+ } else {
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/25acb6ba/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/DefaultScheduler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/DefaultScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/DefaultScheduler.java
deleted file mode 100644
index 54e16b9..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/DefaultScheduler.java
+++ /dev/null
@@ -1,413 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager.scheduler;
-
-import java.util.ArrayDeque;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Queue;
-import java.util.Set;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.instance.AllocatedSlot;
-import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.instance.InstanceDiedException;
-import org.apache.flink.runtime.instance.InstanceListener;
-
-/**
- * The scheduler is responsible for distributing the ready-to-run tasks and assigning them to instances and
- * slots.
- */
-public class DefaultScheduler implements InstanceListener, SlotAvailablilityListener {
-
- private static final Logger LOG = LoggerFactory.getLogger(DefaultScheduler.class);
-
-
- private final Object globalLock = new Object();
-
-
- /** All instances that the scheduler can deploy to */
- private final Set<Instance> allInstances = new HashSet<Instance>();
-
- /** All instances that still have available resources */
- private final Queue<Instance> instancesWithAvailableResources = new SetQueue<Instance>();
-
- /** All tasks pending to be scheduled */
- private final Queue<QueuedTask> taskQueue = new ArrayDeque<QueuedTask>();
-
-
- private int unconstrainedAssignments = 0;
-
- private int localizedAssignments = 0;
-
- private int nonLocalizedAssignments = 0;
-
-
- public DefaultScheduler() {
- }
-
- /**
- * Shuts the scheduler down. After shut down no more tasks can be added to the scheduler.
- */
- public void shutdown() {
- synchronized (globalLock) {
- for (Instance i : allInstances) {
- i.removeSlotListener();
- i.cancelAndReleaseAllSlots();
- }
- allInstances.clear();
- instancesWithAvailableResources.clear();
- taskQueue.clear();
- }
- }
-
- /**
- *
- * NOTE: In the presence of multi-threaded operations, this number may be inexact.
- *
- * @return The number of empty slots, for tasks.
- */
- public int getNumberOfAvailableSlots() {
- int count = 0;
-
- synchronized (globalLock) {
- for (Instance instance : instancesWithAvailableResources) {
- count += instance.getNumberOfAvailableSlots();
- }
- }
-
- return count;
- }
-
- // --------------------------------------------------------------------------------------------
- // Scheduling
- // --------------------------------------------------------------------------------------------
-
- public AllocatedSlot scheduleImmediately(ScheduledUnit task) throws NoResourceAvailableException {
- Object ret = scheduleTask(task, false);
- if (ret instanceof AllocatedSlot) {
- return (AllocatedSlot) ret;
- }
- else {
- throw new RuntimeException();
- }
- }
-
- public SlotAllocationFuture scheduleQueued(ScheduledUnit task) throws NoResourceAvailableException {
- Object ret = scheduleTask(task, true);
- if (ret instanceof AllocatedSlot) {
- return new SlotAllocationFuture((AllocatedSlot) ret);
- }
- if (ret instanceof SlotAllocationFuture) {
- return (SlotAllocationFuture) ret;
- }
- else {
- throw new RuntimeException();
- }
- }
-
- /**
- * Returns either an {@link AllocatedSlot}, or an {@link SlotAllocationFuture}.
- */
- private Object scheduleTask(ScheduledUnit task, boolean queueIfNoResource) throws NoResourceAvailableException {
- if (task == null) {
- throw new IllegalArgumentException();
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Scheduling task " + task);
- }
-
- final ExecutionVertex vertex = task.getTaskToExecute().getVertex();
-
- synchronized (globalLock) {
- // 1) If the task has a strict co-schedule hint, obey it, if it has been assigned.
-// CoLocationHint hint = task.getCoScheduleHint();
-// if (hint != null) {
-//
-// // try to add to the slot, or make it wait on the hint and schedule the hint itself
-// if () {
-// return slot;
-// }
-// }
-
- // 2) See if we can place the task somewhere together with another existing task.
- // This is defined by the slot sharing groups
- SlotSharingGroup sharingUnit = task.getSlotSharingGroup();
- if (sharingUnit != null) {
- // see if we can add the task to the current sharing group.
- SlotSharingGroupAssignment assignment = sharingUnit.getTaskAssignment();
- AllocatedSlot slot = assignment.getSlotForTask(vertex.getJobvertexId(), vertex);
- if (slot != null) {
- return slot;
- }
- }
-
- // 3) We could not schedule it to an existing slot, so we need to get a new one or queue the task
-
- // we need potentially to loop multiple times, because there may be false positives
- // in the set-with-available-instances
- while (true) {
-
-
- Instance instanceToUse = getFreeInstanceForTask(task.getTaskToExecute().getVertex());
-
- if (instanceToUse != null) {
- try {
- AllocatedSlot slot = instanceToUse.allocateSlot(vertex.getJobId());
-
- // if the instance has further available slots, re-add it to the set of available resources.
- if (instanceToUse.hasResourcesAvailable()) {
- this.instancesWithAvailableResources.add(instanceToUse);
- }
-
- if (slot != null) {
-
- // if the task is in a shared group, assign the slot to that group
- // and get a sub slot in turn
- if (sharingUnit != null) {
- slot = sharingUnit.getTaskAssignment().addSlotWithTask(slot, task.getJobVertexId());
- }
-
- return slot;
- }
- }
- catch (InstanceDiedException e) {
- // the instance died it has not yet been propagated to this scheduler
- // remove the instance from the set of available instances
- this.allInstances.remove(instanceToUse);
- this.instancesWithAvailableResources.remove(instanceToUse);
- }
- }
- else {
- // no resource available now, so queue the request
- if (queueIfNoResource) {
- SlotAllocationFuture future = new SlotAllocationFuture();
- this.taskQueue.add(new QueuedTask(task, future));
- return future;
- }
- else {
- throw new NoResourceAvailableException(task);
- }
- }
- }
- }
- }
-
- /**
- * Gets a suitable instance to schedule the vertex execution to.
- * <p>
- * NOTE: This method does is not thread-safe, it needs to be synchronized by the caller.
- *
- * @param vertex The task to run.
- * @return The instance to run the vertex on, it {@code null}, if no instance is available.
- */
- protected Instance getFreeInstanceForTask(ExecutionVertex vertex) {
- if (this.instancesWithAvailableResources.isEmpty()) {
- return null;
- }
-
- Iterable<Instance> locationsIterable = vertex.getPreferredLocations();
- Iterator<Instance> locations = locationsIterable == null ? null : locationsIterable.iterator();
-
- if (locations != null && locations.hasNext()) {
-
- while (locations.hasNext()) {
- Instance location = locations.next();
-
- if (location != null && this.instancesWithAvailableResources.remove(location)) {
-
- localizedAssignments++;
- if (LOG.isDebugEnabled()) {
- LOG.debug("Local assignment: " + vertex.getSimpleName() + " --> " + location);
- }
-
- return location;
- }
- }
-
- Instance instance = this.instancesWithAvailableResources.poll();
- nonLocalizedAssignments++;
- if (LOG.isDebugEnabled()) {
- LOG.debug("Non-local assignment: " + vertex.getSimpleName() + " --> " + instance);
- }
- return instance;
- }
- else {
- Instance instance = this.instancesWithAvailableResources.poll();
- unconstrainedAssignments++;
- if (LOG.isDebugEnabled()) {
- LOG.debug("Unconstrained assignment: " + vertex.getSimpleName() + " --> " + instance);
- }
-
- return instance;
- }
- }
-
- @Override
- public void newSlotAvailable(Instance instance) {
-
- // global lock before instance lock, so that the order of acquiring locks is always 1) global, 2) instance
- synchronized (globalLock) {
- QueuedTask queued = taskQueue.peek();
-
- // the slot was properly released, we can allocate a new one from that instance
-
- if (queued != null) {
- ScheduledUnit task = queued.getTask();
- ExecutionVertex vertex = task.getTaskToExecute().getVertex();
-
- try {
- AllocatedSlot newSlot = instance.allocateSlot(vertex.getJobId());
- if (newSlot != null) {
-
- // success, remove from the task queue and notify the future
- taskQueue.poll();
- if (queued.getFuture() != null) {
- try {
- queued.getFuture().setSlot(newSlot);
- }
- catch (Throwable t) {
- LOG.error("Error calling allocation future for task " + vertex.getSimpleName(), t);
- task.getTaskToExecute().fail(t);
- }
- }
- }
- }
- catch (InstanceDiedException e) {
- this.allInstances.remove(instance);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Instance " + instance + " was marked dead asynchronously.");
- }
- }
- }
- else {
- this.instancesWithAvailableResources.add(instance);
- }
- }
- }
-
- // --------------------------------------------------------------------------------------------
- // Instance Availability
- // --------------------------------------------------------------------------------------------
-
- @Override
- public void newInstanceAvailable(Instance instance) {
- if (instance == null) {
- throw new IllegalArgumentException();
- }
- if (instance.getNumberOfAvailableSlots() <= 0) {
- throw new IllegalArgumentException("The given instance has no resources.");
- }
- if (!instance.isAlive()) {
- throw new IllegalArgumentException("The instance is not alive.");
- }
-
- // synchronize globally for instance changes
- synchronized (this.globalLock) {
-
- // check we do not already use this instance
- if (!this.allInstances.add(instance)) {
- throw new IllegalArgumentException("The instance is already contained.");
- }
-
- try {
- instance.setSlotAvailabilityListener(this);
- }
- catch (IllegalStateException e) {
- this.allInstances.remove(instance);
- LOG.error("Scheduler could not attach to the instance as a listener.");
- }
-
-
- // add it to the available resources and let potential waiters know
- this.instancesWithAvailableResources.add(instance);
-
- for (int i = 0; i < instance.getNumberOfAvailableSlots(); i++) {
- newSlotAvailable(instance);
- }
- }
- }
-
- @Override
- public void instanceDied(Instance instance) {
- if (instance == null) {
- throw new IllegalArgumentException();
- }
-
- instance.markDead();
-
- // we only remove the instance from the pools, we do not care about the
- synchronized (this.globalLock) {
- // the instance must not be available anywhere any more
- this.allInstances.remove(instance);
- this.instancesWithAvailableResources.remove(instance);
- }
- }
-
- // --------------------------------------------------------------------------------------------
- // Status reporting
- // --------------------------------------------------------------------------------------------
-
- public int getNumberOfAvailableInstances() {
- return allInstances.size();
- }
-
- public int getNumberOfInstancesWithAvailableSlots() {
- return instancesWithAvailableResources.size();
- }
-
- public int getNumberOfUnconstrainedAssignments() {
- return unconstrainedAssignments;
- }
-
- public int getNumberOfLocalizedAssignments() {
- return localizedAssignments;
- }
-
- public int getNumberOfNonLocalizedAssignments() {
- return nonLocalizedAssignments;
- }
-
- // --------------------------------------------------------------------------------------------
-
- private static final class QueuedTask {
-
- private final ScheduledUnit task;
-
- private final SlotAllocationFuture future;
-
-
- public QueuedTask(ScheduledUnit task, SlotAllocationFuture future) {
- this.task = task;
- this.future = future;
- }
-
- public ScheduledUnit getTask() {
- return task;
- }
-
- public SlotAllocationFuture getFuture() {
- return future;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/25acb6ba/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
new file mode 100644
index 0000000..cd57454
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -0,0 +1,447 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.scheduler;
+
+import java.util.ArrayDeque;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.instance.AllocatedSlot;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.InstanceDiedException;
+import org.apache.flink.runtime.instance.InstanceListener;
+
+/**
+ * The scheduler is responsible for distributing the ready-to-run tasks and assigning them to instances and
+ * slots.
+ */
+public class Scheduler implements InstanceListener, SlotAvailablilityListener {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Scheduler.class);
+
+
+ private final Object globalLock = new Object();
+
+ private final ExecutorService executor;
+
+
+ /** All instances that the scheduler can deploy to */
+ private final Set<Instance> allInstances = new HashSet<Instance>();
+
+ /** All instances that still have available resources */
+ private final Queue<Instance> instancesWithAvailableResources = new SetQueue<Instance>();
+
+ /** All tasks pending to be scheduled */
+ private final Queue<QueuedTask> taskQueue = new ArrayDeque<QueuedTask>();
+
+
+ private int unconstrainedAssignments = 0;
+
+ private int localizedAssignments = 0;
+
+ private int nonLocalizedAssignments = 0;
+
+
+ public Scheduler() {
+ this(null);
+ }
+
+ public Scheduler(ExecutorService executorService) {
+ this.executor = executorService;
+ }
+
+
+ /**
+ * Shuts the scheduler down. After shut down no more tasks can be added to the scheduler.
+ */
+ public void shutdown() {
+ synchronized (globalLock) {
+ for (Instance i : allInstances) {
+ i.removeSlotListener();
+ i.cancelAndReleaseAllSlots();
+ }
+ allInstances.clear();
+ instancesWithAvailableResources.clear();
+ taskQueue.clear();
+ }
+ }
+
+ /**
+ *
+ * NOTE: In the presence of multi-threaded operations, this number may be inexact.
+ *
+ * @return The number of empty slots, for tasks.
+ */
+ public int getNumberOfAvailableSlots() {
+ int count = 0;
+
+ synchronized (globalLock) {
+ for (Instance instance : instancesWithAvailableResources) {
+ count += instance.getNumberOfAvailableSlots();
+ }
+ }
+
+ return count;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Scheduling
+ // --------------------------------------------------------------------------------------------
+
+ public AllocatedSlot scheduleImmediately(ScheduledUnit task) throws NoResourceAvailableException {
+ Object ret = scheduleTask(task, false);
+ if (ret instanceof AllocatedSlot) {
+ return (AllocatedSlot) ret;
+ }
+ else {
+ throw new RuntimeException();
+ }
+ }
+
+ public SlotAllocationFuture scheduleQueued(ScheduledUnit task) throws NoResourceAvailableException {
+ Object ret = scheduleTask(task, true);
+ if (ret instanceof AllocatedSlot) {
+ return new SlotAllocationFuture((AllocatedSlot) ret);
+ }
+ if (ret instanceof SlotAllocationFuture) {
+ return (SlotAllocationFuture) ret;
+ }
+ else {
+ throw new RuntimeException();
+ }
+ }
+
+ /**
+ * Returns either an {@link AllocatedSlot}, or an {@link SlotAllocationFuture}.
+ */
+ private Object scheduleTask(ScheduledUnit task, boolean queueIfNoResource) throws NoResourceAvailableException {
+ if (task == null) {
+ throw new IllegalArgumentException();
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Scheduling task " + task);
+ }
+
+ final ExecutionVertex vertex = task.getTaskToExecute().getVertex();
+
+ synchronized (globalLock) {
+ // 1) If the task has a strict co-schedule hint, obey it, if it has been assigned.
+// CoLocationHint hint = task.getCoScheduleHint();
+// if (hint != null) {
+//
+// // try to add to the slot, or make it wait on the hint and schedule the hint itself
+// if () {
+// return slot;
+// }
+// }
+
+ // 2) See if we can place the task somewhere together with another existing task.
+ // This is defined by the slot sharing groups
+ SlotSharingGroup sharingUnit = task.getSlotSharingGroup();
+ if (sharingUnit != null) {
+ // see if we can add the task to the current sharing group.
+ SlotSharingGroupAssignment assignment = sharingUnit.getTaskAssignment();
+ AllocatedSlot slot = assignment.getSlotForTask(vertex.getJobvertexId(), vertex);
+ if (slot != null) {
+ return slot;
+ }
+ }
+
+ // 3) We could not schedule it to an existing slot, so we need to get a new one or queue the task
+
+ // we need potentially to loop multiple times, because there may be false positives
+ // in the set-with-available-instances
+ while (true) {
+
+
+ Instance instanceToUse = getFreeInstanceForTask(task.getTaskToExecute().getVertex());
+
+ if (instanceToUse != null) {
+ try {
+ AllocatedSlot slot = instanceToUse.allocateSlot(vertex.getJobId());
+
+ // if the instance has further available slots, re-add it to the set of available resources.
+ if (instanceToUse.hasResourcesAvailable()) {
+ this.instancesWithAvailableResources.add(instanceToUse);
+ }
+
+ if (slot != null) {
+
+ // if the task is in a shared group, assign the slot to that group
+ // and get a sub slot in turn
+ if (sharingUnit != null) {
+ slot = sharingUnit.getTaskAssignment().addSlotWithTask(slot, task.getJobVertexId());
+ }
+
+ return slot;
+ }
+ }
+ catch (InstanceDiedException e) {
+ // the instance died it has not yet been propagated to this scheduler
+ // remove the instance from the set of available instances
+ this.allInstances.remove(instanceToUse);
+ this.instancesWithAvailableResources.remove(instanceToUse);
+ }
+ }
+ else {
+ // no resource available now, so queue the request
+ if (queueIfNoResource) {
+ SlotAllocationFuture future = new SlotAllocationFuture();
+ this.taskQueue.add(new QueuedTask(task, future));
+ return future;
+ }
+ else {
+ throw new NoResourceAvailableException(task);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Gets a suitable instance to schedule the vertex execution to.
+ * <p>
+ * NOTE: This method does is not thread-safe, it needs to be synchronized by the caller.
+ *
+ * @param vertex The task to run.
+ * @return The instance to run the vertex on, it {@code null}, if no instance is available.
+ */
+ protected Instance getFreeInstanceForTask(ExecutionVertex vertex) {
+ if (this.instancesWithAvailableResources.isEmpty()) {
+ return null;
+ }
+
+ Iterable<Instance> locationsIterable = vertex.getPreferredLocations();
+ Iterator<Instance> locations = locationsIterable == null ? null : locationsIterable.iterator();
+
+ if (locations != null && locations.hasNext()) {
+
+ while (locations.hasNext()) {
+ Instance location = locations.next();
+
+ if (location != null && this.instancesWithAvailableResources.remove(location)) {
+
+ localizedAssignments++;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Local assignment: " + vertex.getSimpleName() + " --> " + location);
+ }
+
+ return location;
+ }
+ }
+
+ Instance instance = this.instancesWithAvailableResources.poll();
+ nonLocalizedAssignments++;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Non-local assignment: " + vertex.getSimpleName() + " --> " + instance);
+ }
+ return instance;
+ }
+ else {
+ Instance instance = this.instancesWithAvailableResources.poll();
+ unconstrainedAssignments++;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Unconstrained assignment: " + vertex.getSimpleName() + " --> " + instance);
+ }
+
+ return instance;
+ }
+ }
+
+ @Override
+ public void newSlotAvailable(final Instance instance) {
+
+ // WARNING: The asynchrony here is necessary, because we cannot guarantee the order
+ // of lock acquisition (global scheduler, instance) and otherwise lead to potential deadlocks:
+ //
+ // -> The scheduler needs to grab them (1) global scheduler lock
+ // (2) slot/instance lock
+ // -> The slot releasing grabs (1) slot/instance (for releasing) and
+ // (2) scheduler (to check whether to take a new task item
+ //
+ // that leads with a high probability to deadlocks, when scheduling fast
+
+ if (this.executor != null) {
+ this.executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ handleNewSlot(instance);
+ }
+ });
+ }
+ else {
+ // for tests, we use the synchronous variant
+ handleNewSlot(instance);
+ }
+ }
+
+ private void handleNewSlot(Instance instance) {
+
+ synchronized (globalLock) {
+ QueuedTask queued = taskQueue.peek();
+
+ // the slot was properly released, we can allocate a new one from that instance
+
+ if (queued != null) {
+ ScheduledUnit task = queued.getTask();
+ ExecutionVertex vertex = task.getTaskToExecute().getVertex();
+
+ try {
+ AllocatedSlot newSlot = instance.allocateSlot(vertex.getJobId());
+ if (newSlot != null) {
+
+ // success, remove from the task queue and notify the future
+ taskQueue.poll();
+ if (queued.getFuture() != null) {
+ try {
+ queued.getFuture().setSlot(newSlot);
+ }
+ catch (Throwable t) {
+ LOG.error("Error calling allocation future for task " + vertex.getSimpleName(), t);
+ task.getTaskToExecute().fail(t);
+ }
+ }
+ }
+ }
+ catch (InstanceDiedException e) {
+ this.allInstances.remove(instance);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Instance " + instance + " was marked dead asynchronously.");
+ }
+ }
+ }
+ else {
+ this.instancesWithAvailableResources.add(instance);
+ }
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Instance Availability
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public void newInstanceAvailable(Instance instance) {
+ if (instance == null) {
+ throw new IllegalArgumentException();
+ }
+ if (instance.getNumberOfAvailableSlots() <= 0) {
+ throw new IllegalArgumentException("The given instance has no resources.");
+ }
+ if (!instance.isAlive()) {
+ throw new IllegalArgumentException("The instance is not alive.");
+ }
+
+ // synchronize globally for instance changes
+ synchronized (this.globalLock) {
+
+ // check we do not already use this instance
+ if (!this.allInstances.add(instance)) {
+ throw new IllegalArgumentException("The instance is already contained.");
+ }
+
+ try {
+ instance.setSlotAvailabilityListener(this);
+ }
+ catch (IllegalStateException e) {
+ this.allInstances.remove(instance);
+ LOG.error("Scheduler could not attach to the instance as a listener.");
+ }
+
+
+ // add it to the available resources and let potential waiters know
+ this.instancesWithAvailableResources.add(instance);
+
+ for (int i = 0; i < instance.getNumberOfAvailableSlots(); i++) {
+ newSlotAvailable(instance);
+ }
+ }
+ }
+
+ @Override
+ public void instanceDied(Instance instance) {
+ if (instance == null) {
+ throw new IllegalArgumentException();
+ }
+
+ instance.markDead();
+
+ // we only remove the instance from the pools, we do not care about the
+ synchronized (this.globalLock) {
+ // the instance must not be available anywhere any more
+ this.allInstances.remove(instance);
+ this.instancesWithAvailableResources.remove(instance);
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Status reporting
+ // --------------------------------------------------------------------------------------------
+
+ public int getNumberOfAvailableInstances() {
+ return allInstances.size();
+ }
+
+ public int getNumberOfInstancesWithAvailableSlots() {
+ return instancesWithAvailableResources.size();
+ }
+
+ public int getNumberOfUnconstrainedAssignments() {
+ return unconstrainedAssignments;
+ }
+
+ public int getNumberOfLocalizedAssignments() {
+ return localizedAssignments;
+ }
+
+ public int getNumberOfNonLocalizedAssignments() {
+ return nonLocalizedAssignments;
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ private static final class QueuedTask {
+
+ private final ScheduledUnit task;
+
+ private final SlotAllocationFuture future;
+
+
+ public QueuedTask(ScheduledUnit task, SlotAllocationFuture future) {
+ this.task = task;
+ this.future = future;
+ }
+
+ public ScheduledUnit getTask() {
+ return task;
+ }
+
+ public SlotAllocationFuture getFuture() {
+ return future;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/25acb6ba/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 7b692e3..6ac7c54 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -185,11 +185,15 @@ public final class Task {
while (true) {
ExecutionState current = this.executionState;
- if (current == ExecutionState.CANCELED || current == ExecutionState.CANCELING) {
+ // if canceled, fine. we are done, and the jobmanager has been told
+ if (current == ExecutionState.CANCELED) {
return;
}
- if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.FAILED)) {
+ // if canceling, we are done, but we cannot be sure that the jobmanager has been told.
+ // after all, we may have recognized our failure state before the cancelling and never sent a canceled
+ // message back
+ else if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.FAILED)) {
notifyObservers(ExecutionState.FAILED, ExceptionUtils.stringifyException(error));
taskManager.notifyExecutionStateChange(jobId, executionId, ExecutionState.FAILED, error);
return;
@@ -201,16 +205,18 @@ public final class Task {
while (true) {
ExecutionState current = this.executionState;
- // if the task is already canceled (or canceling) or finished, then we
- // need not do anything
+ // if the task is already canceled (or canceling) or finished or failed,
+ // then we need not do anything
if (current == ExecutionState.FINISHED || current == ExecutionState.CANCELED ||
- current == ExecutionState.CANCELING) {
+ current == ExecutionState.CANCELING || current == ExecutionState.FAILED)
+ {
return;
}
if (current == ExecutionState.DEPLOYING) {
// directly set to canceled
if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.CANCELED)) {
+
notifyObservers(ExecutionState.CANCELED, null);
taskManager.notifyExecutionStateChange(jobId, executionId, ExecutionState.CANCELED, null);
return;
@@ -219,6 +225,7 @@ public final class Task {
else if (current == ExecutionState.RUNNING) {
// go to canceling and perform the actual task canceling
if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.CANCELING)) {
+
notifyObservers(ExecutionState.CANCELING, null);
try {
this.environment.cancelExecution();
@@ -236,9 +243,21 @@ public final class Task {
}
public void cancelingDone() {
- if (STATE_UPDATER.compareAndSet(this, ExecutionState.CANCELING, ExecutionState.CANCELED)) {
- notifyObservers(ExecutionState.CANCELED, null);
- taskManager.notifyExecutionStateChange(jobId, executionId, ExecutionState.CANCELED, null);
+ while (true) {
+ ExecutionState current = this.executionState;
+
+ if (current == ExecutionState.CANCELED) {
+ return;
+ }
+ if (!(current == ExecutionState.RUNNING || current == ExecutionState.CANCELING)) {
+ LOG.error(String.format("Unexpected state transition in Task: %s -> %s", current, ExecutionState.CANCELED));
+ }
+
+ if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.CANCELED)) {
+ notifyObservers(ExecutionState.CANCELED, null);
+ taskManager.notifyExecutionStateChange(jobId, executionId, ExecutionState.CANCELED, null);
+ return;
+ }
}
}