You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/11/03 17:04:12 UTC
[4/5] git commit: Implement coarse-grained fault tolerance
Implement coarse-grained fault tolerance
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/dd687bc6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/dd687bc6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/dd687bc6
Branch: refs/heads/master
Commit: dd687bc6729d9539e05db9761e22a2aadc707341
Parents: 2557832
Author: Stephan Ewen <se...@apache.org>
Authored: Sun Oct 5 20:48:56 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Nov 3 16:08:14 2014 +0100
----------------------------------------------------------------------
.../plantranslate/NepheleJobGraphGenerator.java | 1 +
.../java/org/apache/flink/api/common/Plan.java | 30 +++
.../flink/configuration/ConfigConstants.java | 12 +-
.../flink/api/java/ExecutionEnvironment.java | 28 +++
.../flink/runtime/executiongraph/Execution.java | 24 +-
.../runtime/executiongraph/ExecutionEdge.java | 18 +-
.../runtime/executiongraph/ExecutionGraph.java | 75 +++++-
.../executiongraph/ExecutionJobVertex.java | 54 +++-
.../runtime/executiongraph/ExecutionVertex.java | 17 +-
.../apache/flink/runtime/instance/Instance.java | 4 +
.../runtime/io/network/ChannelManager.java | 16 +-
.../apache/flink/runtime/jobgraph/JobGraph.java | 31 +++
.../flink/runtime/jobgraph/JobStatus.java | 5 +-
.../flink/runtime/jobmanager/JobManager.java | 15 +-
.../scheduler/CoLocationConstraint.java | 6 +
.../jobmanager/scheduler/CoLocationGroup.java | 11 +-
.../jobmanager/scheduler/SlotSharingGroup.java | 9 +
.../BlobLibraryCacheManagerTest.java | 15 +-
.../ExecutionGraphDeploymentTest.java | 3 -
.../ExecutionGraphRestartTest.java | 127 ++++++++++
.../executiongraph/ExecutionGraphTestUtils.java | 6 +-
.../ExecutionStateProgressTest.java | 3 -
.../ExecutionVertexCancelTest.java | 3 -
.../ExecutionVertexDeploymentTest.java | 4 -
.../runtime/jobgraph/JobManagerTestUtils.java | 8 +
.../jobmanager/CoLocationConstraintITCase.java | 4 -
.../runtime/jobmanager/JobManagerITCase.java | 4 -
.../runtime/jobmanager/RecoveryITCase.java | 247 +++++++++++++++++++
.../runtime/jobmanager/SlotSharingITCase.java | 4 -
.../jobmanager/TaskManagerFailsITCase.java | 3 -
.../jobmanager/tasks/ReceiverBlockingOnce.java | 52 ++++
.../jobmanager/tasks/ReceiverFailingOnce.java | 50 ++++
.../flink/api/scala/ExecutionEnvironment.scala | 16 ++
33 files changed, 828 insertions(+), 77 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java b/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
index 3dd9685..d5f9b94 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
@@ -183,6 +183,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
// create the jobgraph object
JobGraph graph = new JobGraph(program.getJobName());
+ graph.setNumberOfExecutionRetries(program.getOriginalPactPlan().getNumberOfExecutionRetries());
graph.setAllowQueuedScheduling(false);
// add vertices to the graph
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/Plan.java b/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
index f7e93b4..f299ef4 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
@@ -67,6 +67,11 @@ public class Plan implements Visitable<Operator<?>> {
* The default parallelism to use for nodes that have no explicitly specified parallelism.
*/
protected int defaultParallelism = DEFAULT_PARALELLISM;
+
+ /**
+ * The number of times failed tasks are re-executed.
+ */
+ protected int numberOfExecutionRetries;
/**
* Hash map for files in the distributed cache: registered name to cache entry.
@@ -259,6 +264,31 @@ public class Plan implements Visitable<Operator<?>> {
}
/**
+ * Sets the number of times that failed tasks are re-executed. A value of zero
+ * effectively disables fault tolerance. A value of {@code -1} indicates that the system
+ * default value (as defined in the configuration) should be used.
+ *
+ * @param numberOfExecutionRetries The number of times the system will try to re-execute failed tasks.
+ */
+ public void setNumberOfExecutionRetries(int numberOfExecutionRetries) {
+ if (numberOfExecutionRetries < -1) {
+ throw new IllegalArgumentException("The number of execution retries must be non-negative, or -1 (use system default)");
+ }
+ this.numberOfExecutionRetries = numberOfExecutionRetries;
+ }
+
+ /**
+ * Gets the number of times the system will try to re-execute failed tasks. A value
+ * of {@code -1} indicates that the system default value (as defined in the configuration)
+ * should be used.
+ *
+ * @return The number of times the system will try to re-execute failed tasks.
+ */
+ public int getNumberOfExecutionRetries() {
+ return numberOfExecutionRetries;
+ }
+
+ /**
* Gets the optimizer post-pass class for this job. The post-pass typically creates utility classes
* for data types and is specific to a particular data model (record, tuple, Scala, ...)
*
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 05b7047..75ebe54 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.configuration;
/**
@@ -36,6 +35,12 @@ public final class ConfigConstants {
*/
public static final String DEFAULT_PARALLELIZATION_DEGREE_KEY = "parallelization.degree.default";
+ /**
+ * Config parameter for the number of re-tries for failed tasks. Setting this
+ * value to 0 effectively disables fault tolerance.
+ */
+ public static final String DEFAULT_EXECUTION_RETRIES_KEY = "execution-retries.default";
+
// -------------------------------- Runtime -------------------------------
/**
@@ -313,6 +318,11 @@ public final class ConfigConstants {
*/
public static final int DEFAULT_PARALLELIZATION_DEGREE = 1;
+ /**
+ * The default number of execution retries.
+ */
+ public static final int DEFAULT_EXECUTION_RETRIES = 0;
+
// ------------------------------ Runtime ---------------------------------
/**
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 54e36c0..6b95ad8 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -96,6 +96,8 @@ public abstract class ExecutionEnvironment {
private int degreeOfParallelism = -1;
+ private int numberOfExecutionRetries = -1;
+
// --------------------------------------------------------------------------------------------
// Constructor and Properties
@@ -144,6 +146,31 @@ public abstract class ExecutionEnvironment {
}
/**
+ * Sets the number of times that failed tasks are re-executed. A value of zero
+ * effectively disables fault tolerance. A value of {@code -1} indicates that the system
+ * default value (as defined in the configuration) should be used.
+ *
+ * @param numberOfExecutionRetries The number of times the system will try to re-execute failed tasks.
+ */
+ public void setNumberOfExecutionRetries(int numberOfExecutionRetries) {
+ if (numberOfExecutionRetries < -1) {
+ throw new IllegalArgumentException("The number of execution retries must be non-negative, or -1 (use system default)");
+ }
+ this.numberOfExecutionRetries = numberOfExecutionRetries;
+ }
+
+ /**
+ * Gets the number of times the system will try to re-execute failed tasks. A value
+ * of {@code -1} indicates that the system default value (as defined in the configuration)
+ * should be used.
+ *
+ * @return The number of times the system will try to re-execute failed tasks.
+ */
+ public int getNumberOfExecutionRetries() {
+ return numberOfExecutionRetries;
+ }
+
+ /**
* Gets the UUID by which this environment is identified. The UUID sets the execution context
* in the cluster or local environment.
*
@@ -652,6 +679,7 @@ public abstract class ExecutionEnvironment {
if (getDegreeOfParallelism() > 0) {
plan.setDefaultParallelism(getDegreeOfParallelism());
}
+ plan.setNumberOfExecutionRetries(this.numberOfExecutionRetries);
try {
registerCachedFilesWithPlan(plan);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 2f881d7..3cc6b02 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -391,13 +391,13 @@ public class Execution {
if (transitionState(current, FINISHED)) {
try {
- vertex.executionFinished();
- return;
+ assignedResource.releaseSlot();
+ vertex.getExecutionGraph().deregisterExecution(this);
}
finally {
- vertex.getExecutionGraph().deregisterExecution(this);
- assignedResource.releaseSlot();
+ vertex.executionFinished();
}
+ return;
}
}
else if (current == CANCELING) {
@@ -433,14 +433,14 @@ public class Execution {
if (current == CANCELED) {
return;
}
- else if (current == CANCELING || current == RUNNING) {
+ else if (current == CANCELING || current == RUNNING || current == DEPLOYING) {
if (transitionState(current, CANCELED)) {
try {
- vertex.executionCanceled();
+ assignedResource.releaseSlot();
+ vertex.getExecutionGraph().deregisterExecution(this);
}
finally {
- vertex.getExecutionGraph().deregisterExecution(this);
- assignedResource.releaseSlot();
+ vertex.executionCanceled();
}
return;
}
@@ -493,13 +493,13 @@ public class Execution {
this.failureCause = t;
try {
- vertex.getExecutionGraph().deregisterExecution(this);
- vertex.executionFailed(t);
- }
- finally {
if (assignedResource != null) {
assignedResource.releaseSlot();
}
+ vertex.getExecutionGraph().deregisterExecution(this);
+ }
+ finally {
+ vertex.executionFailed(t);
}
if (!isCallback && (current == RUNNING || current == DEPLOYING)) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionEdge.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionEdge.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionEdge.java
index 918a0ca..92ca394 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionEdge.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionEdge.java
@@ -28,9 +28,9 @@ public class ExecutionEdge {
private final int inputNum;
- private final ChannelID inputChannelId;
+ private ChannelID inputChannelId;
- private final ChannelID outputChannelId;
+ private ChannelID outputChannelId;
public ExecutionEdge(IntermediateResultPartition source, ExecutionVertex target, int inputNum) {
@@ -42,15 +42,6 @@ public class ExecutionEdge {
this.outputChannelId = new ChannelID();
}
- public ExecutionEdge(IntermediateResultPartition source, ExecutionVertex target, int inputNum, ChannelID inputChannelId, ChannelID outputChannelId) {
- this.source = source;
- this.target = target;
- this.inputNum = inputNum;
-
- this.inputChannelId = inputChannelId;
- this.outputChannelId = outputChannelId;
- }
-
public IntermediateResultPartition getSource() {
return source;
@@ -71,4 +62,9 @@ public class ExecutionEdge {
public ChannelID getOutputChannelId() {
return outputChannelId;
}
+
+ public void assignNewChannelIDs() {
+ inputChannelId = new ChannelID();
+ outputChannelId = new ChannelID();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 1954d70..9a33dbf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -33,7 +33,6 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.blob.BlobKey;
@@ -106,6 +105,8 @@ public class ExecutionGraph {
private int nextVertexToFinish;
+ private int numberOfRetriesLeft;
+
private volatile JobStatus state = JobStatus.CREATED;
private volatile Throwable failureCause;
@@ -147,6 +148,17 @@ public class ExecutionGraph {
// --------------------------------------------------------------------------------------------
+ public void setNumberOfRetriesLeft(int numberOfRetriesLeft) {
+ if (numberOfRetriesLeft < -1) {
+ throw new IllegalArgumentException();
+ }
+ this.numberOfRetriesLeft = numberOfRetriesLeft;
+ }
+
+ public int getNumberOfRetriesLeft() {
+ return numberOfRetriesLeft;
+ }
+
public void attachJobGraph(List<AbstractJobVertex> topologiallySorted) throws JobException {
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Attaching %d topologically sorted vertices to existing job graph with %d "
@@ -344,8 +356,14 @@ public class ExecutionGraph {
public void waitForJobEnd(long timeout) throws InterruptedException {
synchronized (progressLock) {
- while (nextVertexToFinish < verticesInCreationOrder.size()) {
- progressLock.wait(timeout);
+
+ long now = System.currentTimeMillis();
+ long deadline = timeout == 0 ? Long.MAX_VALUE : now + timeout;
+
+
+ while (now < deadline && !state.isTerminalState()) {
+ progressLock.wait(deadline - now);
+ now = System.currentTimeMillis();
}
}
}
@@ -403,8 +421,21 @@ public class ExecutionGraph {
if (current == JobStatus.CANCELLING && transitionState(current, JobStatus.CANCELED)) {
break;
}
- if (current == JobStatus.FAILING && transitionState(current, JobStatus.FAILED, failureCause)) {
- break;
+ if (current == JobStatus.FAILING) {
+ if (numberOfRetriesLeft > 0 && transitionState(current, JobStatus.RESTARTING)) {
+ numberOfRetriesLeft--;
+
+ execute(new Runnable() {
+ @Override
+ public void run() {
+ restart();
+ }
+ });
+ break;
+ }
+ else if (numberOfRetriesLeft <= 0 && transitionState(current, JobStatus.FAILED, failureCause)) {
+ break;
+ }
}
if (current == JobStatus.CANCELED || current == JobStatus.CREATED || current == JobStatus.FINISHED) {
fail(new Exception("ExecutionGraph went into final state from state " + current));
@@ -659,4 +690,38 @@ public class ExecutionGraph {
action.run();
}
}
+
+ public void restart() {
+ try {
+ if (state == JobStatus.FAILED) {
+ transitionState(JobStatus.FAILED, JobStatus.RESTARTING);
+ }
+ synchronized (progressLock) {
+ if (state != JobStatus.RESTARTING) {
+ throw new IllegalStateException("Can only restart job from state restarting.");
+ }
+ if (scheduler == null) {
+ throw new IllegalStateException("The execution graph has not been schedudled before - scheduler is null.");
+ }
+
+ this.currentExecutions.clear();
+ this.edges.clear();
+
+ for (ExecutionJobVertex jv : this.verticesInCreationOrder) {
+ jv.resetForNewExecution();
+ }
+
+ for (int i = 0; i < stateTimestamps.length; i++) {
+ stateTimestamps[i] = 0;
+ }
+ nextVertexToFinish = 0;
+ transitionState(JobStatus.RESTARTING, JobStatus.CREATED);
+ }
+
+ scheduleForExecution(scheduler);
+ }
+ catch (Throwable t) {
+ fail(t);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 37a1893..73534f5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -56,19 +56,20 @@ public class ExecutionJobVertex {
private final List<IntermediateResult> inputs;
- private final InputSplitAssigner splitAssigner;
-
private final int parallelism;
private final boolean[] finishedSubtasks;
private volatile int numSubtasksInFinalState;
-
private final SlotSharingGroup slotSharingGroup;
private final CoLocationGroup coLocationGroup;
+ private final InputSplit[] inputSplits;
+
+ private InputSplitAssigner splitAssigner;
+
public ExecutionJobVertex(ExecutionGraph graph, AbstractJobVertex jobVertex, int defaultParallelism) throws JobException {
this(graph, jobVertex, defaultParallelism, System.currentTimeMillis());
@@ -126,9 +127,10 @@ public class ExecutionJobVertex {
@SuppressWarnings("unchecked")
InputSplitSource<InputSplit> splitSource = (InputSplitSource<InputSplit>) jobVertex.getInputSplitSource();
if (splitSource != null) {
- InputSplit[] splits = splitSource.createInputSplits(numTaskVertices);
- this.splitAssigner = splitSource.getInputSplitAssigner(splits);
+ this.inputSplits = splitSource.createInputSplits(numTaskVertices);
+ this.splitAssigner = splitSource.getInputSplitAssigner(this.inputSplits);
} else {
+ this.inputSplits = null;
this.splitAssigner = null;
}
}
@@ -259,6 +261,48 @@ public class ExecutionJobVertex {
}
}
+ public void resetForNewExecution() {
+ if (!(numSubtasksInFinalState == 0 || numSubtasksInFinalState == parallelism)) {
+ throw new IllegalStateException("Cannot reset vertex that is not in final state");
+ }
+
+ synchronized (stateMonitor) {
+ // check and reset the sharing groups with scheduler hints
+ if (slotSharingGroup != null) {
+ slotSharingGroup.clearTaskAssignment();
+ }
+ if (coLocationGroup != null) {
+ coLocationGroup.resetConstraints();
+ }
+
+ // reset vertices one by one. if one reset fails, the "vertices in final state"
+ // fields will be consistent to handle triggered cancel calls
+ for (int i = 0; i < parallelism; i++) {
+ taskVertices[i].resetForNewExecution();
+ if (finishedSubtasks[i]) {
+ finishedSubtasks[i] = false;
+ numSubtasksInFinalState--;
+ }
+ }
+
+ if (numSubtasksInFinalState != 0) {
+ throw new RuntimeException("Bug: resetting the execution job vertex failed.");
+ }
+
+ // set up the input splits again
+ try {
+ if (this.inputSplits != null) {
+ @SuppressWarnings("unchecked")
+ InputSplitSource<InputSplit> splitSource = (InputSplitSource<InputSplit>) jobVertex.getInputSplitSource();
+ this.splitAssigner = splitSource.getInputSplitAssigner(this.inputSplits);
+ }
+ }
+ catch (Throwable t) {
+ throw new RuntimeException("Re-creating the input split assigner failed: " + t.getMessage(), t);
+ }
+ }
+ }
+
//---------------------------------------------------------------------------------------------
// Notifications
//---------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 3ea1afc..26dd19e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -67,7 +67,7 @@ public class ExecutionVertex {
private final List<Execution> priorExecutions;
- private final CoLocationConstraint locationConstraint;
+ private volatile CoLocationConstraint locationConstraint;
private volatile Execution currentExecution; // this field must never be null
@@ -316,6 +316,21 @@ public class ExecutionVertex {
if (state == FINISHED || state == CANCELED || state == FAILED) {
priorExecutions.add(execution);
currentExecution = new Execution(this, execution.getAttemptNumber()+1, System.currentTimeMillis());
+
+ CoLocationGroup grp = jobVertex.getCoLocationGroup();
+ if (grp != null) {
+ this.locationConstraint = grp.getLocationConstraint(subTaskIndex);
+ }
+
+ // temp: assign new channel IDs.
+ ExecutionGraph graph = getExecutionGraph();
+
+ for (ExecutionEdge[] input : this.inputEdges) {
+ for (ExecutionEdge e : input) {
+ e.assignNewChannelIDs();
+ graph.registerExecutionEdge(e);
+ }
+ }
}
else {
throw new IllegalStateException("Cannot reset a vertex that is in state " + state);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
index 88450c2..0cafcec 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
@@ -157,6 +157,10 @@ public class Instance {
// --------------------------------------------------------------------------------------------
public TaskOperationProtocol getTaskManagerProxy() throws IOException {
+ if (isDead) {
+ throw new IOException("Instance has died");
+ }
+
TaskOperationProtocol tm = this.taskManager;
if (tm == null) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
index e48f3af..5f302e3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
@@ -396,7 +396,9 @@ public class ChannelManager implements EnvelopeDispatcher, BufferProviderBroker
}
}
- this.receiverCache.put(sourceChannelID, receiverList);
+ if (channels.containsKey(sourceChannelID)) {
+ this.receiverCache.put(sourceChannelID, receiverList);
+ }
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Receiver for %s: %s [%s])",
@@ -659,4 +661,16 @@ public class ChannelManager implements EnvelopeDispatcher, BufferProviderBroker
}
}
}
+
+ public void verifyAllCachesEmpty() {
+ if (!channels.isEmpty()) {
+ throw new IllegalStateException("Channel manager caches not empty: There are still registered channels.");
+ }
+ if (!localBuffersPools.isEmpty()) {
+ throw new IllegalStateException("Channel manager caches not empty: There are still local buffer pools.");
+ }
+ if (!receiverCache.isEmpty()) {
+ throw new IllegalStateException("Channel manager caches not empty: There are still entries in the receiver cache.");
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index c42bf92..4a8ca11 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -71,6 +71,10 @@ public class JobGraph implements IOReadableWritable {
/** Name of this job. */
private String jobName;
+ /** The number of times that failed tasks should be re-executed */
+ private int numExecutionRetries;
+
+ /** flag to enable queued scheduling */
private boolean allowQueuedScheduling;
// --------------------------------------------------------------------------------------------
@@ -165,6 +169,31 @@ public class JobGraph implements IOReadableWritable {
return this.jobConfiguration;
}
+ /**
+ * Sets the number of times that failed tasks are re-executed. A value of zero
+ * effectively disables fault tolerance. A value of {@code -1} indicates that the system
+ * default value (as defined in the configuration) should be used.
+ *
+ * @param numberOfExecutionRetries The number of times the system will try to re-execute failed tasks.
+ */
+ public void setNumberOfExecutionRetries(int numberOfExecutionRetries) {
+ if (numberOfExecutionRetries < -1) {
+ throw new IllegalArgumentException("The number of execution retries must be non-negative, or -1 (use system default)");
+ }
+ this.numExecutionRetries = numberOfExecutionRetries;
+ }
+
+ /**
+ * Gets the number of times the system will try to re-execute failed tasks. A value
+ * of {@code -1} indicates that the system default value (as defined in the configuration)
+ * should be used.
+ *
+ * @return The number of times the system will try to re-execute failed tasks.
+ */
+ public int getNumberOfExecutionRetries() {
+ return numExecutionRetries;
+ }
+
public void setAllowQueuedScheduling(boolean allowQueuedScheduling) {
this.allowQueuedScheduling = allowQueuedScheduling;
}
@@ -318,6 +347,7 @@ public class JobGraph implements IOReadableWritable {
this.jobID.read(in);
this.jobName = StringValue.readString(in);
this.jobConfiguration.read(in);
+ this.numExecutionRetries = in.readInt();
this.allowQueuedScheduling = in.readBoolean();
final int numVertices = in.readInt();
@@ -347,6 +377,7 @@ public class JobGraph implements IOReadableWritable {
this.jobID.write(out);
StringValue.writeString(this.jobName, out);
this.jobConfiguration.write(out);
+ out.writeInt(numExecutionRetries);
out.writeBoolean(allowQueuedScheduling);
// write the task vertices using java serialization (to resolve references in the object graph)
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java
index 857d999..3722945 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java
@@ -42,7 +42,10 @@ public enum JobStatus {
CANCELED(true),
/** All of the job's tasks have successfully finished. */
- FINISHED(true);
+ FINISHED(true),
+
+ /** The job is currently undergoing a reset and total restart */
+ RESTARTING(false);
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
index c93eee3..5a32244 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
@@ -115,8 +115,7 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
/** Executor service for asynchronous commands (to relieve the RPC threads of work) */
- private final ExecutorService executorService = Executors.newFixedThreadPool(2 * Hardware
- .getNumberCPUCores(), ExecutorThreadFactory.INSTANCE);
+ private final ExecutorService executorService = Executors.newFixedThreadPool(2 * Hardware.getNumberCPUCores(), ExecutorThreadFactory.INSTANCE);
/** The RPC end point through which the JobManager gets its calls */
@@ -140,7 +139,9 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
private final int recommendedClientPollingInterval;
// end: these will be consolidated / removed
-
+
+ private final int defaultExecutionRetries;
+
private final AtomicBoolean isShutdownInProgress = new AtomicBoolean(false);
private volatile boolean isShutDown;
@@ -173,6 +174,10 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
// Read the suggested client polling interval
this.recommendedClientPollingInterval = GlobalConfiguration.getInteger(
ConfigConstants.JOBCLIENT_POLLING_INTERVAL_KEY, ConfigConstants.DEFAULT_JOBCLIENT_POLLING_INTERVAL);
+
+ // read the default number of times that failed tasks should be re-executed
+ this.defaultExecutionRetries = GlobalConfiguration.getInteger(
+ ConfigConstants.DEFAULT_EXECUTION_RETRIES_KEY, ConfigConstants.DEFAULT_EXECUTION_RETRIES);
// Load the job progress collector
this.eventCollector = new EventCollector(this.recommendedClientPollingInterval);
@@ -326,6 +331,10 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
executionGraph = new ExecutionGraph(job.getJobID(), job.getName(),
job.getJobConfiguration(), job.getUserJarBlobKeys(), this.executorService);
+
+ executionGraph.setNumberOfRetriesLeft(job.getNumberOfExecutionRetries() >= 0 ?
+ job.getNumberOfExecutionRetries() : this.defaultExecutionRetries);
+
ExecutionGraph previous = this.currentJobs.putIfAbsent(job.getJobID(), executionGraph);
if (previous != null) {
throw new JobException("Concurrent submission of a job with the same jobId: " + job.getJobID());
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
index f554bbb..36430de 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
@@ -63,7 +63,13 @@ public class CoLocationConstraint {
return this.sharedSlot == null;
}
+ public boolean isUnassignedOrDisposed() {
+ return this.sharedSlot == null || this.sharedSlot.isDisposed();
+ }
+
public AbstractID getGroupId() {
return this.group.getId();
}
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroup.java
index d1c3bd5..fa379cf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroup.java
@@ -74,7 +74,7 @@ public class CoLocationGroup implements java.io.Serializable {
return constraints.get(subtask);
}
- public void ensureConstraints(int num) {
+ private void ensureConstraints(int num) {
if (constraints == null) {
constraints = new ArrayList<CoLocationConstraint>(num);
} else {
@@ -92,4 +92,13 @@ public class CoLocationGroup implements java.io.Serializable {
public AbstractID getId() {
return id;
}
+
+ public void resetConstraints() {
+ for (CoLocationConstraint c : this.constraints) {
+ if (!c.isUnassignedOrDisposed()) {
+ throw new IllegalStateException("Cannot reset co-location group: some constraints still have executing vertices.");
+ }
+ }
+ this.constraints.clear();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java
index c5a88f3..dcde6b2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java
@@ -70,6 +70,15 @@ public class SlotSharingGroup implements java.io.Serializable {
return this.taskAssignment;
}
+ public void clearTaskAssignment() {
+ if (this.taskAssignment != null) {
+ if (this.taskAssignment.getNumberOfSlots() > 0) {
+ throw new IllegalStateException("SlotSharingGroup cannot clear task assignment, group still has allocated resources.");
+ }
+ }
+ this.taskAssignment = null;
+ }
+
// --------------------------------------------------------------------------------------------
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
index 606fff1..df32a81 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
@@ -21,11 +21,9 @@ package org.apache.flink.runtime.execution.librarycache;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.runtime.blob.BlobCache;
import org.apache.flink.runtime.blob.BlobClient;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.blob.BlobServer;
-import org.apache.flink.runtime.blob.BlobService;
import org.apache.flink.runtime.jobgraph.JobID;
import org.junit.Test;
import static org.junit.Assert.*;
@@ -33,7 +31,6 @@ import static org.junit.Assert.*;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.net.URL;
import java.util.ArrayList;
import java.util.List;
@@ -90,11 +87,15 @@ public class BlobLibraryCacheManagerTest {
}
assertEquals(2, caughtExceptions);
- }catch(Exception e){
+
+ bc.close();
+ }
+ catch(Exception e){
e.printStackTrace();
fail(e.getMessage());
- }finally{
- if(server != null){
+ }
+ finally{
+ if (server != null){
try {
server.shutdown();
} catch (IOException e) {
@@ -102,7 +103,7 @@ public class BlobLibraryCacheManagerTest {
}
}
- if(libraryCacheManager != null){
+ if (libraryCacheManager != null){
try {
libraryCacheManager.shutdown();
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 74ab08b..4cddcbd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -27,17 +27,14 @@ import static org.mockito.Mockito.when;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.doAnswer;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.instance.AllocatedSlot;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
new file mode 100644
index 0000000..f1855f2
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getInstance;
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getSimpleAcknowledgingTaskmanager;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import org.apache.flink.runtime.jobmanager.tasks.NoOpInvokable;
+import org.apache.flink.runtime.protocols.TaskOperationProtocol;
+import org.junit.Test;
+
+public class ExecutionGraphRestartTest {
+
+ @Test
+ public void testRestartManually() {
+ final int NUM_TASKS = 31;
+
+ try {
+ TaskOperationProtocol tm = getSimpleAcknowledgingTaskmanager();
+ Instance instance = getInstance(tm);
+
+ Scheduler scheduler = new Scheduler();
+ scheduler.newInstanceAvailable(instance);
+
+ // The job:
+
+ final AbstractJobVertex sender = new AbstractJobVertex("Task");
+ sender.setInvokableClass(NoOpInvokable.class);
+ sender.setParallelism(NUM_TASKS);
+
+ final JobGraph jobGraph = new JobGraph("Pointwise Job", sender);
+
+ ExecutionGraph eg = new ExecutionGraph(new JobID(), "test job", new Configuration());
+ eg.setNumberOfRetriesLeft(0);
+ eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
+
+ assertEquals(JobStatus.CREATED, eg.getState());
+
+ eg.scheduleForExecution(scheduler);
+ assertEquals(JobStatus.RUNNING, eg.getState());
+
+ eg.getAllExecutionVertices().iterator().next().fail(new Exception("Test Exception"));
+ assertEquals(JobStatus.FAILED, eg.getState());
+
+ eg.restart();
+ assertEquals(JobStatus.RUNNING, eg.getState());
+
+ for (ExecutionVertex v : eg.getAllExecutionVertices()) {
+ v.executionFinished();
+ }
+ assertEquals(JobStatus.FINISHED, eg.getState());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testRestartSelf() {
+ final int NUM_TASKS = 31;
+
+ try {
+ TaskOperationProtocol tm = getSimpleAcknowledgingTaskmanager();
+ Instance instance = getInstance(tm);
+
+ Scheduler scheduler = new Scheduler();
+ scheduler.newInstanceAvailable(instance);
+
+ // The job:
+
+ final AbstractJobVertex sender = new AbstractJobVertex("Task");
+ sender.setInvokableClass(NoOpInvokable.class);
+ sender.setParallelism(NUM_TASKS);
+
+ final JobGraph jobGraph = new JobGraph("Pointwise Job", sender);
+
+ ExecutionGraph eg = new ExecutionGraph(new JobID(), "test job", new Configuration());
+ eg.setNumberOfRetriesLeft(1);
+ eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
+
+ assertEquals(JobStatus.CREATED, eg.getState());
+
+ eg.scheduleForExecution(scheduler);
+ assertEquals(JobStatus.RUNNING, eg.getState());
+
+ eg.getAllExecutionVertices().iterator().next().fail(new Exception("Test Exception"));
+
+ // should have restarted itself
+ assertEquals(JobStatus.RUNNING, eg.getState());
+
+ for (ExecutionVertex v : eg.getAllExecutionVertices()) {
+ v.executionFinished();
+ }
+ assertEquals(JobStatus.FINISHED, eg.getState());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 7eefa7e..30b05ee 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -96,11 +96,15 @@ public class ExecutionGraphTestUtils {
// --------------------------------------------------------------------------------------------
public static Instance getInstance(final TaskOperationProtocol top) throws Exception {
+ return getInstance(top, 1);
+ }
+
+ public static Instance getInstance(final TaskOperationProtocol top, int numSlots) throws Exception {
HardwareDescription hardwareDescription = new HardwareDescription(4, 2L*1024*1024*1024, 1024*1024*1024, 512*1024*1024);
InetAddress address = InetAddress.getByName("127.0.0.1");
InstanceConnectionInfo connection = new InstanceConnectionInfo(address, 10000, 10001);
- return new Instance(connection, new InstanceID(), hardwareDescription, 1) {
+ return new Instance(connection, new InstanceID(), hardwareDescription, numSlots) {
@Override
public TaskOperationProtocol getTaskManagerProxy() {
return top;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
index f5a4d39..2848466 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
@@ -22,12 +22,9 @@ import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.*;
import static org.junit.Assert.*;
import static org.mockito.Mockito.mock;
-import java.util.ArrayList;
import java.util.Arrays;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.blob.BlobKey;
-import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.instance.AllocatedSlot;
import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
import org.apache.flink.runtime.jobgraph.JobID;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
index 1f74ae3..9769529 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
@@ -26,12 +26,9 @@ import static org.mockito.Mockito.when;
import static org.mockito.Mockito.times;
import java.io.IOException;
-import java.util.ArrayList;
-import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.instance.AllocatedSlot;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.jobgraph.JobID;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
index efb2af4..f3081bc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
@@ -28,10 +28,8 @@ import static org.mockito.Mockito.when;
import static org.mockito.Mockito.times;
import static org.mockito.Matchers.any;
-import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.instance.AllocatedSlot;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.jobgraph.JobID;
@@ -43,8 +41,6 @@ import org.junit.Test;
import org.mockito.Matchers;
-import java.util.ArrayList;
-
public class ExecutionVertexDeploymentTest {
@Test
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java
index 8ed7a6d..168c454 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java
@@ -36,6 +36,10 @@ public class JobManagerTestUtils {
}
public static final JobManager startJobManager(int numTaskManagers, int numSlotsPerTaskManager) throws Exception {
+ return startJobManager(numTaskManagers, numSlotsPerTaskManager, null);
+ }
+
+ public static final JobManager startJobManager(int numTaskManagers, int numSlotsPerTaskManager, Configuration additionalParams) throws Exception {
Configuration cfg = new Configuration();
cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, getAvailablePort());
@@ -43,6 +47,10 @@ public class JobManagerTestUtils {
cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTaskManager);
cfg.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskManagers);
+ if (additionalParams != null) {
+ cfg.addAll(additionalParams);
+ }
+
GlobalConfiguration.includeConfiguration(cfg);
JobManager jm = new JobManager(ExecutionMode.LOCAL);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.java
index f8b229f..23a75cc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.java
@@ -22,10 +22,8 @@ import static org.apache.flink.runtime.jobgraph.JobManagerTestUtils.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
-import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.client.AbstractJobResult;
import org.apache.flink.runtime.client.JobSubmissionResult;
-import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.instance.LocalInstanceManager;
import org.apache.flink.runtime.io.network.bufferprovider.GlobalBufferPool;
@@ -38,8 +36,6 @@ import org.apache.flink.runtime.jobmanager.tasks.Receiver;
import org.apache.flink.runtime.jobmanager.tasks.Sender;
import org.junit.Test;
-import java.util.ArrayList;
-
public class CoLocationConstraintITCase {
/**
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
index 0952f60..ae7857f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
@@ -24,10 +24,8 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.client.AbstractJobResult;
import org.apache.flink.runtime.client.JobSubmissionResult;
-import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.instance.LocalInstanceManager;
import org.apache.flink.runtime.io.network.api.RecordReader;
@@ -47,8 +45,6 @@ import org.apache.flink.runtime.jobmanager.tasks.Sender;
import org.apache.flink.runtime.types.IntegerRecord;
import org.junit.Test;
-import java.util.ArrayList;
-
/**
* This test is intended to cover the basic functionality of the {@link JobManager}.
*/
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/RecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/RecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/RecoveryITCase.java
new file mode 100644
index 0000000..0b8518f
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/RecoveryITCase.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import static org.apache.flink.runtime.jobgraph.JobManagerTestUtils.startJobManager;
+import static org.apache.flink.runtime.jobgraph.JobManagerTestUtils.waitForTaskThreadsToBeTerminated;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.client.AbstractJobResult;
+import org.apache.flink.runtime.client.JobSubmissionResult;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.instance.LocalInstanceManager;
+import org.apache.flink.runtime.io.network.bufferprovider.GlobalBufferPool;
+import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.jobmanager.tasks.ReceiverBlockingOnce;
+import org.apache.flink.runtime.jobmanager.tasks.ReceiverFailingOnce;
+import org.apache.flink.runtime.jobmanager.tasks.Sender;
+import org.junit.Test;
+
+/**
+ * This test is intended to cover the basic functionality of the {@link JobManager}.
+ */
+public class RecoveryITCase {
+
+ @Test
+ public void testForwardJob() {
+
+ ReceiverFailingOnce.resetFailedBefore();
+
+ final int NUM_TASKS = 31;
+
+ JobManager jm = null;
+
+ try {
+ final AbstractJobVertex sender = new AbstractJobVertex("Sender");
+ final AbstractJobVertex receiver = new AbstractJobVertex("Receiver");
+
+ sender.setInvokableClass(Sender.class);
+ receiver.setInvokableClass(ReceiverFailingOnce.class);
+
+ sender.setParallelism(NUM_TASKS);
+ receiver.setParallelism(NUM_TASKS);
+
+ receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE);
+
+ final JobGraph jobGraph = new JobGraph("Pointwise Job", sender, receiver);
+ jobGraph.setNumberOfExecutionRetries(1);
+
+ jm = startJobManager(2 * NUM_TASKS);
+
+ final GlobalBufferPool bp = ((LocalInstanceManager) jm.getInstanceManager())
+ .getTaskManagers()[0].getChannelManager().getGlobalBufferPool();
+
+ JobSubmissionResult result = jm.submitJob(jobGraph);
+
+ if (result.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS) {
+ System.out.println(result.getDescription());
+ }
+ assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode());
+
+ // monitor the execution
+ ExecutionGraph eg = jm.getCurrentJobs().get(jobGraph.getJobID());
+
+ if (eg != null) {
+ eg.waitForJobEnd();
+ assertEquals(JobStatus.FINISHED, eg.getState());
+ }
+ else {
+ // already done, that was fast;
+ }
+
+ // make sure that in any case, the network buffers are all returned
+ waitForTaskThreadsToBeTerminated();
+ assertEquals(bp.numBuffers(), bp.numAvailableBuffers());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ finally {
+ if (jm != null) {
+ jm.shutdown();
+ }
+ }
+ }
+
+ @Test
+ public void testForwardJobWithSlotSharing() {
+
+ ReceiverFailingOnce.resetFailedBefore();
+
+ final int NUM_TASKS = 31;
+
+ JobManager jm = null;
+
+ try {
+ final AbstractJobVertex sender = new AbstractJobVertex("Sender");
+ final AbstractJobVertex receiver = new AbstractJobVertex("Receiver");
+
+ sender.setInvokableClass(Sender.class);
+ receiver.setInvokableClass(ReceiverFailingOnce.class);
+
+ sender.setParallelism(NUM_TASKS);
+ receiver.setParallelism(NUM_TASKS);
+
+ receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE);
+
+ SlotSharingGroup sharingGroup = new SlotSharingGroup();
+ sender.setSlotSharingGroup(sharingGroup);
+ receiver.setSlotSharingGroup(sharingGroup);
+
+ final JobGraph jobGraph = new JobGraph("Pointwise Job", sender, receiver);
+ jobGraph.setNumberOfExecutionRetries(1);
+
+ jm = startJobManager(NUM_TASKS);
+
+ final GlobalBufferPool bp = ((LocalInstanceManager) jm.getInstanceManager())
+ .getTaskManagers()[0].getChannelManager().getGlobalBufferPool();
+
+ JobSubmissionResult result = jm.submitJob(jobGraph);
+
+ if (result.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS) {
+ System.out.println(result.getDescription());
+ }
+ assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode());
+
+ // monitor the execution
+ ExecutionGraph eg = jm.getCurrentJobs().get(jobGraph.getJobID());
+
+ if (eg != null) {
+ eg.waitForJobEnd();
+ assertEquals(JobStatus.FINISHED, eg.getState());
+ }
+ else {
+ // already done, that was fast;
+ }
+
+ // make sure that in any case, the network buffers are all returned
+ waitForTaskThreadsToBeTerminated();
+ assertEquals(bp.numBuffers(), bp.numAvailableBuffers());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ finally {
+ if (jm != null) {
+ jm.shutdown();
+ }
+ }
+ }
+
+ @Test
+ public void testRecoverTaskManagerFailure() {
+
+ final int NUM_TASKS = 31;
+
+ JobManager jm = null;
+
+ try {
+ final AbstractJobVertex sender = new AbstractJobVertex("Sender");
+ final AbstractJobVertex receiver = new AbstractJobVertex("Receiver");
+
+ sender.setInvokableClass(Sender.class);
+ receiver.setInvokableClass(ReceiverBlockingOnce.class);
+ sender.setParallelism(NUM_TASKS);
+ receiver.setParallelism(NUM_TASKS);
+
+ receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE);
+
+ SlotSharingGroup sharingGroup = new SlotSharingGroup();
+ sender.setSlotSharingGroup(sharingGroup);
+ receiver.setSlotSharingGroup(sharingGroup);
+
+ final JobGraph jobGraph = new JobGraph("Pointwise Job", sender, receiver);
+ jobGraph.setNumberOfExecutionRetries(1);
+
+ // make sure we have fast heartbeats and failure detection
+ Configuration cfg = new Configuration();
+ cfg.setInteger(ConfigConstants.JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT_KEY, 2000);
+ cfg.setInteger(ConfigConstants.TASK_MANAGER_HEARTBEAT_INTERVAL_KEY, 500);
+
+ jm = startJobManager(2, NUM_TASKS, cfg);
+
+ JobSubmissionResult result = jm.submitJob(jobGraph);
+
+ if (result.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS) {
+ System.out.println(result.getDescription());
+ }
+ assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode());
+
+ // monitor the execution
+ ExecutionGraph eg = jm.getCurrentJobs().get(jobGraph.getJobID());
+
+ // wait for a bit until all is running, make sure the second attempt does not block
+ Thread.sleep(300);
+ ReceiverBlockingOnce.setShouldNotBlock();
+
+ // shutdown one of the taskmanagers
+ ((LocalInstanceManager) jm.getInstanceManager()).getTaskManagers()[0].shutdown();
+
+ // wait for the recovery to do its work
+ if (eg != null) {
+ eg.waitForJobEnd();
+ assertEquals(JobStatus.FINISHED, eg.getState());
+ }
+ else {
+ // already done, that was fast;
+ }
+
+ // make sure that in any case, the network buffers are all returned
+ waitForTaskThreadsToBeTerminated();
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ finally {
+ if (jm != null) {
+ jm.shutdown();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotSharingITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotSharingITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotSharingITCase.java
index 29293da..5d79aab 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotSharingITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotSharingITCase.java
@@ -22,10 +22,8 @@ import static org.apache.flink.runtime.jobgraph.JobManagerTestUtils.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
-import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.client.AbstractJobResult;
import org.apache.flink.runtime.client.JobSubmissionResult;
-import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.instance.LocalInstanceManager;
import org.apache.flink.runtime.io.network.bufferprovider.GlobalBufferPool;
@@ -39,8 +37,6 @@ import org.apache.flink.runtime.jobmanager.tasks.Receiver;
import org.apache.flink.runtime.jobmanager.tasks.Sender;
import org.junit.Test;
-import java.util.ArrayList;
-
public class SlotSharingITCase {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.java
index 19ff690..6b8be15 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.java
@@ -22,11 +22,9 @@ import static org.apache.flink.runtime.jobgraph.JobManagerTestUtils.startJobMana
import static org.apache.flink.runtime.jobgraph.JobManagerTestUtils.waitForTaskThreadsToBeTerminated;
import static org.junit.Assert.*;
-import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.client.AbstractJobResult;
import org.apache.flink.runtime.client.JobSubmissionResult;
import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.instance.LocalInstanceManager;
@@ -39,7 +37,6 @@ import org.apache.flink.runtime.jobmanager.tasks.Sender;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.junit.Test;
-import java.util.ArrayList;
public class TaskManagerFailsITCase {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/ReceiverBlockingOnce.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/ReceiverBlockingOnce.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/ReceiverBlockingOnce.java
new file mode 100644
index 0000000..3425842
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/ReceiverBlockingOnce.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.tasks;
+
+import org.apache.flink.runtime.io.network.api.RecordReader;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.types.IntegerRecord;
+
+public final class ReceiverBlockingOnce extends AbstractInvokable {
+
+ private static boolean shouldBlock = true;
+
+ private RecordReader<IntegerRecord> reader;
+
+ @Override
+ public void registerInputOutput() {
+ reader = new RecordReader<IntegerRecord>(this, IntegerRecord.class);
+ }
+
+ @Override
+ public void invoke() throws Exception {
+ if (shouldBlock) {
+
+ Object o = new Object();
+ synchronized (o) {
+ o.wait();
+ }
+ }
+
+ while (reader.next() != null);
+ }
+
+ public static void setShouldNotBlock() {
+ shouldBlock = false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/ReceiverFailingOnce.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/ReceiverFailingOnce.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/ReceiverFailingOnce.java
new file mode 100644
index 0000000..3fad6b1
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/ReceiverFailingOnce.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.tasks;
+
+import org.apache.flink.runtime.io.network.api.RecordReader;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.types.IntegerRecord;
+
+public final class ReceiverFailingOnce extends AbstractInvokable {
+
+ private static boolean hasFailedBefore = false;
+
+ private RecordReader<IntegerRecord> reader;
+
+ @Override
+ public void registerInputOutput() {
+ reader = new RecordReader<IntegerRecord>(this, IntegerRecord.class);
+ }
+
+ @Override
+ public void invoke() throws Exception {
+ if (!hasFailedBefore && getEnvironment().getIndexInSubtaskGroup() == 0) {
+ hasFailedBefore = true;
+ throw new Exception("Test exception");
+ }
+
+ while (reader.next() != null);
+ }
+
+
+ public static void resetFailedBefore() {
+ hasFailedBefore = false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index 53d57d2..ff96519 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -75,6 +75,22 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
* value can be overridden by individual operations using [[DataSet.setParallelism]]
*/
def getDegreeOfParallelism = javaEnv.getDegreeOfParallelism
+
+ /**
+ * Sets the number of times that failed tasks are re-executed. A value of zero
+ * effectively disables fault tolerance. A value of "-1" indicates that the system
+ * default value (as defined in the configuration) should be used.
+ */
+ def setNumberOfExecutionRetries(numRetries: Int): Unit = {
+ javaEnv.setNumberOfExecutionRetries(numRetries)
+ }
+
+ /**
+ * Gets the number of times the system will try to re-execute failed tasks. A value
+ * of "-1" indicates that the system default value (as defined in the configuration)
+ * should be used.
+ */
+ def getNumberOfExecutionRetries = javaEnv.getNumberOfExecutionRetries
/**
* Gets the UUID by which this environment is identified. The UUID sets the execution context