You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/11/03 17:04:09 UTC
[1/5] git commit: [FLINK-1203] Deactivate fork reuse in tests
(workaround for potential surefire bug)
Repository: incubator-flink
Updated Branches:
refs/heads/master f5898a01e -> f0fd8823e
[FLINK-1203] Deactivate fork reuse in tests (workaround for potential surefire bug)
This closes #174
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/f42dcc35
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/f42dcc35
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/f42dcc35
Branch: refs/heads/master
Commit: f42dcc3573dd595c4ee11f089f7ea5d68905ebca
Parents: f5898a0
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Nov 3 12:27:03 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Nov 3 16:04:30 2014 +0100
----------------------------------------------------------------------
pom.xml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f42dcc35/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 60e87da..698f04e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -76,7 +76,7 @@ under the License.
forkCount is not exposed as a property. With this we can set
it on the "mvn" commandline in travis. -->
<flink.forkCount>1.5C</flink.forkCount>
- <flink.reuseForks>true</flink.reuseForks>
+ <flink.reuseForks>false</flink.reuseForks>
<slf4j.version>1.7.7</slf4j.version>
<guava.version>17.0</guava.version>
</properties>
[5/5] git commit: Introduce a delay before restarts to make sure that
taskmanager failures are detected before restart.
Posted by se...@apache.org.
Introduce a delay before restarts to make sure that taskmanager failures are detected before restart.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/f0fd8823
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/f0fd8823
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/f0fd8823
Branch: refs/heads/master
Commit: f0fd8823ee8d157f9a5a00b2687827b94206d7b7
Parents: dd687bc
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Oct 6 22:01:36 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Nov 3 16:08:14 2014 +0100
----------------------------------------------------------------------
.../api/common/typeutils/TypeSerializer.java | 4 ++--
.../flink/configuration/ConfigConstants.java | 2 +-
.../runtime/executiongraph/ExecutionGraph.java | 18 ++++++++++++++++++
.../flink/runtime/instance/InstanceManager.java | 8 ++++++--
.../flink/runtime/jobmanager/JobManager.java | 8 ++++++++
.../runtime/instance/InstanceManagerTest.java | 12 +++++-------
.../runtime/jobgraph/JobManagerTestUtils.java | 2 ++
.../flink/runtime/jobmanager/RecoveryITCase.java | 4 ++--
8 files changed, 44 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f0fd8823/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
index 87d7e20..5e32c86 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
@@ -117,7 +117,7 @@ public abstract class TypeSerializer<T> implements Serializable {
* De-serializes a record from the given source input view.
*
* @param source The input view from which to read the data.
- * @result The deserialized element.
+ * @return The deserialized element.
*
* @throws IOException Thrown, if the de-serialization encountered an I/O related error. Typically raised by the
* input view, which may have an underlying I/O channel from which it reads.
@@ -129,7 +129,7 @@ public abstract class TypeSerializer<T> implements Serializable {
*
* @param reuse The record instance into which to de-serialize the data.
* @param source The input view from which to read the data.
- * @result The deserialized element.
+ * @return The deserialized element.
*
* @throws IOException Thrown, if the de-serialization encountered an I/O related error. Typically raised by the
* input view, which may have an underlying I/O channel from which it reads.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f0fd8823/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 75ebe54..dcd9342 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -344,7 +344,7 @@ public final class ConfigConstants {
* Default number of seconds after which a task manager is marked as failed.
*/
// 30 seconds (its enough to get to mars, should be enough to detect failure)
- public static final int DEFAULT_JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT = 30;
+ public static final int DEFAULT_JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT = 30*1000;
/**
* The default network port the task manager expects incoming IPC connections. The {@code -1} means that
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f0fd8823/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 9a33dbf..3df3452 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -107,6 +107,8 @@ public class ExecutionGraph {
private int numberOfRetriesLeft;
+ private long delayBeforeRetrying;
+
private volatile JobStatus state = JobStatus.CREATED;
private volatile Throwable failureCause;
@@ -159,6 +161,17 @@ public class ExecutionGraph {
return numberOfRetriesLeft;
}
+ public void setDelayBeforeRetrying(long delayBeforeRetrying) {
+ if (delayBeforeRetrying < 0) {
+ throw new IllegalArgumentException("Delay before retry must be non-negative.");
+ }
+ this.delayBeforeRetrying = delayBeforeRetrying;
+ }
+
+ public long getDelayBeforeRetrying() {
+ return delayBeforeRetrying;
+ }
+
public void attachJobGraph(List<AbstractJobVertex> topologiallySorted) throws JobException {
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Attaching %d topologically sorted vertices to existing job graph with %d "
@@ -428,6 +441,11 @@ public class ExecutionGraph {
execute(new Runnable() {
@Override
public void run() {
+ try {
+ Thread.sleep(delayBeforeRetrying);
+ } catch (InterruptedException e) {
+ // should only happen on shutdown
+ }
restart();
}
});
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f0fd8823/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
index 8127f1d..ced1afe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
@@ -77,7 +77,7 @@ public class InstanceManager {
* where a task manager is still considered alive.
*/
public InstanceManager() {
- this(1000 * GlobalConfiguration.getLong(
+ this(GlobalConfiguration.getLong(
ConfigConstants.JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT_KEY,
ConfigConstants.DEFAULT_JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT));
}
@@ -98,6 +98,10 @@ public class InstanceManager {
new Timer(true).schedule(cleanupStaleMachines, cleanupInterval, cleanupInterval);
}
+
+ public long getHeartbeatTimeout() {
+ return heartbeatTimeout;
+ }
public void shutdown() {
synchronized (this.lock) {
@@ -126,7 +130,7 @@ public class InstanceManager {
synchronized (this.lock) {
if (this.shutdown) {
- throw new IllegalStateException("InstanceManager is shut down.");
+ return false;
}
Instance host = registeredHostsById.get(instanceId);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f0fd8823/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
index 5a32244..4bce4a4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
@@ -142,6 +142,8 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
private final int defaultExecutionRetries;
+ private final long delayBetweenRetries;
+
private final AtomicBoolean isShutdownInProgress = new AtomicBoolean(false);
private volatile boolean isShutDown;
@@ -179,6 +181,11 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
this.defaultExecutionRetries = GlobalConfiguration.getInteger(
ConfigConstants.DEFAULT_EXECUTION_RETRIES_KEY, ConfigConstants.DEFAULT_EXECUTION_RETRIES);
+ // delay between retries should be one heartbeat timeout
+ this.delayBetweenRetries = 2 * GlobalConfiguration.getLong(
+ ConfigConstants.JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT_KEY,
+ ConfigConstants.DEFAULT_JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT);
+
// Load the job progress collector
this.eventCollector = new EventCollector(this.recommendedClientPollingInterval);
@@ -334,6 +341,7 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
executionGraph.setNumberOfRetriesLeft(job.getNumberOfExecutionRetries() >= 0 ?
job.getNumberOfExecutionRetries() : this.defaultExecutionRetries);
+ executionGraph.setDelayBeforeRetrying(this.delayBetweenRetries);
ExecutionGraph previous = this.currentJobs.putIfAbsent(job.getJobID(), executionGraph);
if (previous != null) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f0fd8823/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
index 34d86b4..ecdf891 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
@@ -43,6 +43,10 @@ public class InstanceManagerTest {
try {
InstanceManager cm = new InstanceManager();
+ // catches error that some parts assumed config values in seconds, others in
+ // milliseconds by verifying that the timeout is not larger than 2 minutes.
+ assertTrue(cm.getHeartbeatTimeout() < 2 * 60 * 1000);
+
final int ipcPort = 10000;
final int dataPort = 20000;
@@ -182,13 +186,7 @@ public class InstanceManagerTest {
// expected
}
- try {
- cm.reportHeartBeat(new InstanceID());
- fail("Should raise exception in shutdown state");
- }
- catch (IllegalStateException e) {
- // expected
- }
+ assertFalse(cm.reportHeartBeat(new InstanceID()));
}
catch (Exception e) {
System.err.println(e.getMessage());
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f0fd8823/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java
index 168c454..cc767c2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java
@@ -46,6 +46,8 @@ public class JobManagerTestUtils {
cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 10);
cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTaskManager);
cfg.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskManagers);
+ cfg.setInteger(ConfigConstants.TASK_MANAGER_HEARTBEAT_INTERVAL_KEY, 500);
+ cfg.setInteger(ConfigConstants.JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT_KEY, 2000);
if (additionalParams != null) {
cfg.addAll(additionalParams);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f0fd8823/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/RecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/RecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/RecoveryITCase.java
index 0b8518f..9e259f6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/RecoveryITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/RecoveryITCase.java
@@ -200,8 +200,8 @@ public class RecoveryITCase {
// make sure we have fast heartbeats and failure detection
Configuration cfg = new Configuration();
- cfg.setInteger(ConfigConstants.JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT_KEY, 2000);
- cfg.setInteger(ConfigConstants.TASK_MANAGER_HEARTBEAT_INTERVAL_KEY, 500);
+ cfg.setInteger(ConfigConstants.JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT_KEY, 3000);
+ cfg.setInteger(ConfigConstants.TASK_MANAGER_HEARTBEAT_INTERVAL_KEY, 1000);
jm = startJobManager(2, NUM_TASKS, cfg);
[3/5] git commit: Remove obsolete collection execution example.
Correct remote collector format example.
Posted by se...@apache.org.
Remove obsolete collection execution example.
Correct remote collector format example.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/2557832a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/2557832a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/2557832a
Branch: refs/heads/master
Commit: 2557832af50e5d7bb479d568b370bfdd96b54fef
Parents: a747b61
Author: Stephan Ewen <se...@apache.org>
Authored: Sun Oct 5 21:02:33 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Nov 3 16:08:13 2014 +0100
----------------------------------------------------------------------
.../CollectionExecutionExample.java | 48 --------
.../RemoteCollectorOutputFormatExample.java | 114 +++++++++++++++++++
.../RemoteCollectorOutputFormatExample.java | 114 -------------------
3 files changed, 114 insertions(+), 162 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2557832a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/environments/CollectionExecutionExample.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/environments/CollectionExecutionExample.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/environments/CollectionExecutionExample.java
deleted file mode 100644
index 1ce3e7a..0000000
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/environments/CollectionExecutionExample.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.examples.java.environments;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.CollectionEnvironment;
-
-/**
- * This example shows how to use Flink's collection execution functionality.
- * Collection-based execution is an extremely lightweight, non-parallel way to
- * execute programs on small data: The programs are s
- *
- * Because this method of execution spawns no background threads, managed memory,
- * coordinator, or parallel worker, it has a minimal execution footprint.
- */
-public class CollectionExecutionExample {
-
- public static void main(String[] args) throws Exception {
-
- CollectionEnvironment env = new CollectionEnvironment();
-
- env.fromElements("A", "B", "C", "D")
- .map(new MapFunction<String, String>() {
- public String map(String value) {
- return value + " " + 1;
- };
- })
- .print();
-
- env.execute();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2557832a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/RemoteCollectorOutputFormatExample.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/RemoteCollectorOutputFormatExample.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/RemoteCollectorOutputFormatExample.java
new file mode 100644
index 0000000..f524718
--- /dev/null
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/RemoteCollectorOutputFormatExample.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.examples.java.misc;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.aggregation.Aggregations;
+import org.apache.flink.api.java.io.RemoteCollectorConsumer;
+import org.apache.flink.api.java.io.RemoteCollectorImpl;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Collector;
+
+/**
+ * Implements the "WordCount" program that computes a simple word occurrence
+ * histogram over some sample data and collects the results with an
+ * implementation of a {@link RemoteCollectorConsumer}.
+ */
+@SuppressWarnings("serial")
+public class RemoteCollectorOutputFormatExample {
+
+ public static void main(String[] args) throws Exception {
+
+ /**
+ * We create a remote {@link ExecutionEnvironment} here, because this
+ * OutputFormat is designed for use in a distributed setting. For local
+ * use you should consider using the {@link LocalCollectionOutputFormat
+ * <T>}.
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("<remote>", 6124,
+ "/path/to/your/file.jar");
+
+ // get input data
+ DataSet<String> text = env.fromElements(
+ "To be, or not to be,--that is the question:--",
+ "Whether 'tis nobler in the mind to suffer",
+ "The slings and arrows of outrageous fortune",
+ "Or to take arms against a sea of troubles,");
+
+ DataSet<Tuple2<String, Integer>> counts =
+ // split up the lines in pairs (2-tuples) containing: (word,1)
+ text.flatMap(new LineSplitter())
+ // group by the tuple field "0" and sum up tuple field "1"
+ .groupBy(0).aggregate(Aggregations.SUM, 1);
+
+ // emit result
+ RemoteCollectorImpl.collectLocal(counts,
+ new RemoteCollectorConsumer<Tuple2<String, Integer>>() {
+ // user defined IRemoteCollectorConsumer
+ @Override
+ public void collect(Tuple2<String, Integer> element) {
+ System.out.println("word/occurrences:" + element);
+ }
+ });
+
+ // local collection to store results in
+ Set<Tuple2<String, Integer>> collection = new HashSet<Tuple2<String, Integer>>();
+ // collect results from remote in local collection
+ RemoteCollectorImpl.collectLocal(counts, collection);
+
+ // execute program
+ env.execute("WordCount Example with RemoteCollectorOutputFormat");
+
+ System.out.println(collection);
+
+ RemoteCollectorImpl.shutdownAll();
+ }
+
+ //
+ // User Functions
+ //
+
+ /**
+ * Implements the string tokenizer that splits sentences into words as a
+ * user-defined FlatMapFunction. The function takes a line (String) and
+ * splits it into multiple pairs in the form of "(word,1)" (Tuple2<String,
+ * Integer>).
+ */
+ public static final class LineSplitter implements
+ FlatMapFunction<String, Tuple2<String, Integer>> {
+
+ @Override
+ public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
+ // normalize and split the line
+ String[] tokens = value.toLowerCase().split("\\W+");
+
+ // emit the pairs
+ for (String token : tokens) {
+ if (token.length() > 0) {
+ out.collect(new Tuple2<String, Integer>(token, 1));
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2557832a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/remotecollectoroutputformat/RemoteCollectorOutputFormatExample.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/remotecollectoroutputformat/RemoteCollectorOutputFormatExample.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/remotecollectoroutputformat/RemoteCollectorOutputFormatExample.java
deleted file mode 100644
index 36b5c82..0000000
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/remotecollectoroutputformat/RemoteCollectorOutputFormatExample.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.examples.java.remotecollectoroutputformat;
-
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.java.io.RemoteCollectorConsumer;
-import org.apache.flink.api.java.io.RemoteCollectorImpl;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.util.Collector;
-
-/**
- * Implements the "WordCount" program that computes a simple word occurrence
- * histogram over some sample data and collects the results with an
- * implementation of a {@link RemoteCollectorConsumer}.
- */
-@SuppressWarnings("serial")
-public class RemoteCollectorOutputFormatExample {
-
- public static void main(String[] args) throws Exception {
-
- /**
- * We create a remote {@link ExecutionEnvironment} here, because this
- * OutputFormat is designed for use in a distributed setting. For local
- * use you should consider using the {@link LocalCollectionOutputFormat
- * <T>}.
- */
- final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("<remote>", 6124,
- "/path/to/your/file.jar");
-
- // get input data
- DataSet<String> text = env.fromElements(
- "To be, or not to be,--that is the question:--",
- "Whether 'tis nobler in the mind to suffer",
- "The slings and arrows of outrageous fortune",
- "Or to take arms against a sea of troubles,");
-
- DataSet<Tuple2<String, Integer>> counts =
- // split up the lines in pairs (2-tuples) containing: (word,1)
- text.flatMap(new LineSplitter())
- // group by the tuple field "0" and sum up tuple field "1"
- .groupBy(0).aggregate(Aggregations.SUM, 1);
-
- // emit result
- RemoteCollectorImpl.collectLocal(counts,
- new RemoteCollectorConsumer<Tuple2<String, Integer>>() {
- // user defined IRemoteCollectorConsumer
- @Override
- public void collect(Tuple2<String, Integer> element) {
- System.out.println("word/occurrences:" + element);
- }
- });
-
- // local collection to store results in
- Set<Tuple2<String, Integer>> collection = new HashSet<Tuple2<String, Integer>>();
- // collect results from remote in local collection
- RemoteCollectorImpl.collectLocal(counts, collection);
-
- // execute program
- env.execute("WordCount Example with RemoteCollectorOutputFormat");
-
- System.out.println(collection);
-
- RemoteCollectorImpl.shutdownAll();
- }
-
- //
- // User Functions
- //
-
- /**
- * Implements the string tokenizer that splits sentences into words as a
- * user-defined FlatMapFunction. The function takes a line (String) and
- * splits it into multiple pairs in the form of "(word,1)" (Tuple2<String,
- * Integer>).
- */
- public static final class LineSplitter implements
- FlatMapFunction<String, Tuple2<String, Integer>> {
-
- @Override
- public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
- // normalize and split the line
- String[] tokens = value.toLowerCase().split("\\W+");
-
- // emit the pairs
- for (String token : tokens) {
- if (token.length() > 0) {
- out.collect(new Tuple2<String, Integer>(token, 1));
- }
- }
- }
- }
-}
\ No newline at end of file
[4/5] git commit: Implement coarse-grained fault tolerance
Posted by se...@apache.org.
Implement coarse-grained fault tolerance
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/dd687bc6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/dd687bc6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/dd687bc6
Branch: refs/heads/master
Commit: dd687bc6729d9539e05db9761e22a2aadc707341
Parents: 2557832
Author: Stephan Ewen <se...@apache.org>
Authored: Sun Oct 5 20:48:56 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Nov 3 16:08:14 2014 +0100
----------------------------------------------------------------------
.../plantranslate/NepheleJobGraphGenerator.java | 1 +
.../java/org/apache/flink/api/common/Plan.java | 30 +++
.../flink/configuration/ConfigConstants.java | 12 +-
.../flink/api/java/ExecutionEnvironment.java | 28 +++
.../flink/runtime/executiongraph/Execution.java | 24 +-
.../runtime/executiongraph/ExecutionEdge.java | 18 +-
.../runtime/executiongraph/ExecutionGraph.java | 75 +++++-
.../executiongraph/ExecutionJobVertex.java | 54 +++-
.../runtime/executiongraph/ExecutionVertex.java | 17 +-
.../apache/flink/runtime/instance/Instance.java | 4 +
.../runtime/io/network/ChannelManager.java | 16 +-
.../apache/flink/runtime/jobgraph/JobGraph.java | 31 +++
.../flink/runtime/jobgraph/JobStatus.java | 5 +-
.../flink/runtime/jobmanager/JobManager.java | 15 +-
.../scheduler/CoLocationConstraint.java | 6 +
.../jobmanager/scheduler/CoLocationGroup.java | 11 +-
.../jobmanager/scheduler/SlotSharingGroup.java | 9 +
.../BlobLibraryCacheManagerTest.java | 15 +-
.../ExecutionGraphDeploymentTest.java | 3 -
.../ExecutionGraphRestartTest.java | 127 ++++++++++
.../executiongraph/ExecutionGraphTestUtils.java | 6 +-
.../ExecutionStateProgressTest.java | 3 -
.../ExecutionVertexCancelTest.java | 3 -
.../ExecutionVertexDeploymentTest.java | 4 -
.../runtime/jobgraph/JobManagerTestUtils.java | 8 +
.../jobmanager/CoLocationConstraintITCase.java | 4 -
.../runtime/jobmanager/JobManagerITCase.java | 4 -
.../runtime/jobmanager/RecoveryITCase.java | 247 +++++++++++++++++++
.../runtime/jobmanager/SlotSharingITCase.java | 4 -
.../jobmanager/TaskManagerFailsITCase.java | 3 -
.../jobmanager/tasks/ReceiverBlockingOnce.java | 52 ++++
.../jobmanager/tasks/ReceiverFailingOnce.java | 50 ++++
.../flink/api/scala/ExecutionEnvironment.scala | 16 ++
33 files changed, 828 insertions(+), 77 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java b/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
index 3dd9685..d5f9b94 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
@@ -183,6 +183,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
// create the jobgraph object
JobGraph graph = new JobGraph(program.getJobName());
+ graph.setNumberOfExecutionRetries(program.getOriginalPactPlan().getNumberOfExecutionRetries());
graph.setAllowQueuedScheduling(false);
// add vertices to the graph
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/Plan.java b/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
index f7e93b4..f299ef4 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
@@ -67,6 +67,11 @@ public class Plan implements Visitable<Operator<?>> {
* The default parallelism to use for nodes that have no explicitly specified parallelism.
*/
protected int defaultParallelism = DEFAULT_PARALELLISM;
+
+ /**
+ * The number of times failed tasks are re-executed.
+ */
+ protected int numberOfExecutionRetries;
/**
* Hash map for files in the distributed cache: registered name to cache entry.
@@ -259,6 +264,31 @@ public class Plan implements Visitable<Operator<?>> {
}
/**
+ * Sets the number of times that failed tasks are re-executed. A value of zero
+ * effectively disables fault tolerance. A value of {@code -1} indicates that the system
+ * default value (as defined in the configuration) should be used.
+ *
+ * @param numberOfExecutionRetries The number of times the system will try to re-execute failed tasks.
+ */
+ public void setNumberOfExecutionRetries(int numberOfExecutionRetries) {
+ if (numberOfExecutionRetries < -1) {
+ throw new IllegalArgumentException("The number of execution retries must be non-negative, or -1 (use system default)");
+ }
+ this.numberOfExecutionRetries = numberOfExecutionRetries;
+ }
+
+ /**
+ * Gets the number of times the system will try to re-execute failed tasks. A value
+ * of {@code -1} indicates that the system default value (as defined in the configuration)
+ * should be used.
+ *
+ * @return The number of times the system will try to re-execute failed tasks.
+ */
+ public int getNumberOfExecutionRetries() {
+ return numberOfExecutionRetries;
+ }
+
+ /**
* Gets the optimizer post-pass class for this job. The post-pass typically creates utility classes
* for data types and is specific to a particular data model (record, tuple, Scala, ...)
*
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 05b7047..75ebe54 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.configuration;
/**
@@ -36,6 +35,12 @@ public final class ConfigConstants {
*/
public static final String DEFAULT_PARALLELIZATION_DEGREE_KEY = "parallelization.degree.default";
+ /**
+ * Config parameter for the number of re-tries for failed tasks. Setting this
+ * value to 0 effectively disables fault tolerance.
+ */
+ public static final String DEFAULT_EXECUTION_RETRIES_KEY = "execution-retries.default";
+
// -------------------------------- Runtime -------------------------------
/**
@@ -313,6 +318,11 @@ public final class ConfigConstants {
*/
public static final int DEFAULT_PARALLELIZATION_DEGREE = 1;
+ /**
+ * The default number of execution retries.
+ */
+ public static final int DEFAULT_EXECUTION_RETRIES = 0;
+
// ------------------------------ Runtime ---------------------------------
/**
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 54e36c0..6b95ad8 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -96,6 +96,8 @@ public abstract class ExecutionEnvironment {
private int degreeOfParallelism = -1;
+ private int numberOfExecutionRetries = -1;
+
// --------------------------------------------------------------------------------------------
// Constructor and Properties
@@ -144,6 +146,31 @@ public abstract class ExecutionEnvironment {
}
/**
+ * Sets the number of times that failed tasks are re-executed. A value of zero
+ * effectively disables fault tolerance. A value of {@code -1} indicates that the system
+ * default value (as defined in the configuration) should be used.
+ *
+ * @param numberOfExecutionRetries The number of times the system will try to re-execute failed tasks.
+ */
+ public void setNumberOfExecutionRetries(int numberOfExecutionRetries) {
+ if (numberOfExecutionRetries < -1) {
+ throw new IllegalArgumentException("The number of execution retries must be non-negative, or -1 (use system default)");
+ }
+ this.numberOfExecutionRetries = numberOfExecutionRetries;
+ }
+
+ /**
+ * Gets the number of times the system will try to re-execute failed tasks. A value
+ * of {@code -1} indicates that the system default value (as defined in the configuration)
+ * should be used.
+ *
+ * @return The number of times the system will try to re-execute failed tasks.
+ */
+ public int getNumberOfExecutionRetries() {
+ return numberOfExecutionRetries;
+ }
+
+ /**
* Gets the UUID by which this environment is identified. The UUID sets the execution context
* in the cluster or local environment.
*
@@ -652,6 +679,7 @@ public abstract class ExecutionEnvironment {
if (getDegreeOfParallelism() > 0) {
plan.setDefaultParallelism(getDegreeOfParallelism());
}
+ plan.setNumberOfExecutionRetries(this.numberOfExecutionRetries);
try {
registerCachedFilesWithPlan(plan);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 2f881d7..3cc6b02 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -391,13 +391,13 @@ public class Execution {
if (transitionState(current, FINISHED)) {
try {
- vertex.executionFinished();
- return;
+ assignedResource.releaseSlot();
+ vertex.getExecutionGraph().deregisterExecution(this);
}
finally {
- vertex.getExecutionGraph().deregisterExecution(this);
- assignedResource.releaseSlot();
+ vertex.executionFinished();
}
+ return;
}
}
else if (current == CANCELING) {
@@ -433,14 +433,14 @@ public class Execution {
if (current == CANCELED) {
return;
}
- else if (current == CANCELING || current == RUNNING) {
+ else if (current == CANCELING || current == RUNNING || current == DEPLOYING) {
if (transitionState(current, CANCELED)) {
try {
- vertex.executionCanceled();
+ assignedResource.releaseSlot();
+ vertex.getExecutionGraph().deregisterExecution(this);
}
finally {
- vertex.getExecutionGraph().deregisterExecution(this);
- assignedResource.releaseSlot();
+ vertex.executionCanceled();
}
return;
}
@@ -493,13 +493,13 @@ public class Execution {
this.failureCause = t;
try {
- vertex.getExecutionGraph().deregisterExecution(this);
- vertex.executionFailed(t);
- }
- finally {
if (assignedResource != null) {
assignedResource.releaseSlot();
}
+ vertex.getExecutionGraph().deregisterExecution(this);
+ }
+ finally {
+ vertex.executionFailed(t);
}
if (!isCallback && (current == RUNNING || current == DEPLOYING)) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionEdge.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionEdge.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionEdge.java
index 918a0ca..92ca394 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionEdge.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionEdge.java
@@ -28,9 +28,9 @@ public class ExecutionEdge {
private final int inputNum;
- private final ChannelID inputChannelId;
+ private ChannelID inputChannelId;
- private final ChannelID outputChannelId;
+ private ChannelID outputChannelId;
public ExecutionEdge(IntermediateResultPartition source, ExecutionVertex target, int inputNum) {
@@ -42,15 +42,6 @@ public class ExecutionEdge {
this.outputChannelId = new ChannelID();
}
- public ExecutionEdge(IntermediateResultPartition source, ExecutionVertex target, int inputNum, ChannelID inputChannelId, ChannelID outputChannelId) {
- this.source = source;
- this.target = target;
- this.inputNum = inputNum;
-
- this.inputChannelId = inputChannelId;
- this.outputChannelId = outputChannelId;
- }
-
public IntermediateResultPartition getSource() {
return source;
@@ -71,4 +62,9 @@ public class ExecutionEdge {
public ChannelID getOutputChannelId() {
return outputChannelId;
}
+
+ public void assignNewChannelIDs() {
+ inputChannelId = new ChannelID();
+ outputChannelId = new ChannelID();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 1954d70..9a33dbf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -33,7 +33,6 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.blob.BlobKey;
@@ -106,6 +105,8 @@ public class ExecutionGraph {
private int nextVertexToFinish;
+ private int numberOfRetriesLeft;
+
private volatile JobStatus state = JobStatus.CREATED;
private volatile Throwable failureCause;
@@ -147,6 +148,17 @@ public class ExecutionGraph {
// --------------------------------------------------------------------------------------------
+ public void setNumberOfRetriesLeft(int numberOfRetriesLeft) {
+ if (numberOfRetriesLeft < -1) {
+ throw new IllegalArgumentException();
+ }
+ this.numberOfRetriesLeft = numberOfRetriesLeft;
+ }
+
+ public int getNumberOfRetriesLeft() {
+ return numberOfRetriesLeft;
+ }
+
public void attachJobGraph(List<AbstractJobVertex> topologiallySorted) throws JobException {
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Attaching %d topologically sorted vertices to existing job graph with %d "
@@ -344,8 +356,14 @@ public class ExecutionGraph {
public void waitForJobEnd(long timeout) throws InterruptedException {
synchronized (progressLock) {
- while (nextVertexToFinish < verticesInCreationOrder.size()) {
- progressLock.wait(timeout);
+
+ long now = System.currentTimeMillis();
+ long deadline = timeout == 0 ? Long.MAX_VALUE : now + timeout;
+
+
+ while (now < deadline && !state.isTerminalState()) {
+ progressLock.wait(deadline - now);
+ now = System.currentTimeMillis();
}
}
}
@@ -403,8 +421,21 @@ public class ExecutionGraph {
if (current == JobStatus.CANCELLING && transitionState(current, JobStatus.CANCELED)) {
break;
}
- if (current == JobStatus.FAILING && transitionState(current, JobStatus.FAILED, failureCause)) {
- break;
+ if (current == JobStatus.FAILING) {
+ if (numberOfRetriesLeft > 0 && transitionState(current, JobStatus.RESTARTING)) {
+ numberOfRetriesLeft--;
+
+ execute(new Runnable() {
+ @Override
+ public void run() {
+ restart();
+ }
+ });
+ break;
+ }
+ else if (numberOfRetriesLeft <= 0 && transitionState(current, JobStatus.FAILED, failureCause)) {
+ break;
+ }
}
if (current == JobStatus.CANCELED || current == JobStatus.CREATED || current == JobStatus.FINISHED) {
fail(new Exception("ExecutionGraph went into final state from state " + current));
@@ -659,4 +690,38 @@ public class ExecutionGraph {
action.run();
}
}
+
+ public void restart() {
+ try {
+ if (state == JobStatus.FAILED) {
+ transitionState(JobStatus.FAILED, JobStatus.RESTARTING);
+ }
+ synchronized (progressLock) {
+ if (state != JobStatus.RESTARTING) {
+ throw new IllegalStateException("Can only restart job from state restarting.");
+ }
+ if (scheduler == null) {
+ throw new IllegalStateException("The execution graph has not been schedudled before - scheduler is null.");
+ }
+
+ this.currentExecutions.clear();
+ this.edges.clear();
+
+ for (ExecutionJobVertex jv : this.verticesInCreationOrder) {
+ jv.resetForNewExecution();
+ }
+
+ for (int i = 0; i < stateTimestamps.length; i++) {
+ stateTimestamps[i] = 0;
+ }
+ nextVertexToFinish = 0;
+ transitionState(JobStatus.RESTARTING, JobStatus.CREATED);
+ }
+
+ scheduleForExecution(scheduler);
+ }
+ catch (Throwable t) {
+ fail(t);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 37a1893..73534f5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -56,19 +56,20 @@ public class ExecutionJobVertex {
private final List<IntermediateResult> inputs;
- private final InputSplitAssigner splitAssigner;
-
private final int parallelism;
private final boolean[] finishedSubtasks;
private volatile int numSubtasksInFinalState;
-
private final SlotSharingGroup slotSharingGroup;
private final CoLocationGroup coLocationGroup;
+ private final InputSplit[] inputSplits;
+
+ private InputSplitAssigner splitAssigner;
+
public ExecutionJobVertex(ExecutionGraph graph, AbstractJobVertex jobVertex, int defaultParallelism) throws JobException {
this(graph, jobVertex, defaultParallelism, System.currentTimeMillis());
@@ -126,9 +127,10 @@ public class ExecutionJobVertex {
@SuppressWarnings("unchecked")
InputSplitSource<InputSplit> splitSource = (InputSplitSource<InputSplit>) jobVertex.getInputSplitSource();
if (splitSource != null) {
- InputSplit[] splits = splitSource.createInputSplits(numTaskVertices);
- this.splitAssigner = splitSource.getInputSplitAssigner(splits);
+ this.inputSplits = splitSource.createInputSplits(numTaskVertices);
+ this.splitAssigner = splitSource.getInputSplitAssigner(this.inputSplits);
} else {
+ this.inputSplits = null;
this.splitAssigner = null;
}
}
@@ -259,6 +261,48 @@ public class ExecutionJobVertex {
}
}
+ public void resetForNewExecution() {
+ if (!(numSubtasksInFinalState == 0 || numSubtasksInFinalState == parallelism)) {
+ throw new IllegalStateException("Cannot reset vertex that is not in final state");
+ }
+
+ synchronized (stateMonitor) {
+ // check and reset the sharing groups with scheduler hints
+ if (slotSharingGroup != null) {
+ slotSharingGroup.clearTaskAssignment();
+ }
+ if (coLocationGroup != null) {
+ coLocationGroup.resetConstraints();
+ }
+
+ // reset vertices one by one. if one reset fails, the "vertices in final state"
+ // fields will be consistent to handle triggered cancel calls
+ for (int i = 0; i < parallelism; i++) {
+ taskVertices[i].resetForNewExecution();
+ if (finishedSubtasks[i]) {
+ finishedSubtasks[i] = false;
+ numSubtasksInFinalState--;
+ }
+ }
+
+ if (numSubtasksInFinalState != 0) {
+ throw new RuntimeException("Bug: resetting the execution job vertex failed.");
+ }
+
+ // set up the input splits again
+ try {
+ if (this.inputSplits != null) {
+ @SuppressWarnings("unchecked")
+ InputSplitSource<InputSplit> splitSource = (InputSplitSource<InputSplit>) jobVertex.getInputSplitSource();
+ this.splitAssigner = splitSource.getInputSplitAssigner(this.inputSplits);
+ }
+ }
+ catch (Throwable t) {
+ throw new RuntimeException("Re-creating the input split assigner failed: " + t.getMessage(), t);
+ }
+ }
+ }
+
//---------------------------------------------------------------------------------------------
// Notifications
//---------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 3ea1afc..26dd19e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -67,7 +67,7 @@ public class ExecutionVertex {
private final List<Execution> priorExecutions;
- private final CoLocationConstraint locationConstraint;
+ private volatile CoLocationConstraint locationConstraint;
private volatile Execution currentExecution; // this field must never be null
@@ -316,6 +316,21 @@ public class ExecutionVertex {
if (state == FINISHED || state == CANCELED || state == FAILED) {
priorExecutions.add(execution);
currentExecution = new Execution(this, execution.getAttemptNumber()+1, System.currentTimeMillis());
+
+ CoLocationGroup grp = jobVertex.getCoLocationGroup();
+ if (grp != null) {
+ this.locationConstraint = grp.getLocationConstraint(subTaskIndex);
+ }
+
+ // temp: assign new channel IDs.
+ ExecutionGraph graph = getExecutionGraph();
+
+ for (ExecutionEdge[] input : this.inputEdges) {
+ for (ExecutionEdge e : input) {
+ e.assignNewChannelIDs();
+ graph.registerExecutionEdge(e);
+ }
+ }
}
else {
throw new IllegalStateException("Cannot reset a vertex that is in state " + state);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
index 88450c2..0cafcec 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
@@ -157,6 +157,10 @@ public class Instance {
// --------------------------------------------------------------------------------------------
public TaskOperationProtocol getTaskManagerProxy() throws IOException {
+ if (isDead) {
+ throw new IOException("Instance has died");
+ }
+
TaskOperationProtocol tm = this.taskManager;
if (tm == null) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
index e48f3af..5f302e3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
@@ -396,7 +396,9 @@ public class ChannelManager implements EnvelopeDispatcher, BufferProviderBroker
}
}
- this.receiverCache.put(sourceChannelID, receiverList);
+ if (channels.containsKey(sourceChannelID)) {
+ this.receiverCache.put(sourceChannelID, receiverList);
+ }
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Receiver for %s: %s [%s])",
@@ -659,4 +661,16 @@ public class ChannelManager implements EnvelopeDispatcher, BufferProviderBroker
}
}
}
+
+ public void verifyAllCachesEmpty() {
+ if (!channels.isEmpty()) {
+ throw new IllegalStateException("Channel manager caches not empty: There are still registered channels.");
+ }
+ if (!localBuffersPools.isEmpty()) {
+ throw new IllegalStateException("Channel manager caches not empty: There are still local buffer pools.");
+ }
+ if (!receiverCache.isEmpty()) {
+ throw new IllegalStateException("Channel manager caches not empty: There are still entries in the receiver cache.");
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index c42bf92..4a8ca11 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -71,6 +71,10 @@ public class JobGraph implements IOReadableWritable {
/** Name of this job. */
private String jobName;
+ /** The number of times that failed tasks should be re-executed */
+ private int numExecutionRetries;
+
+ /** flag to enable queued scheduling */
private boolean allowQueuedScheduling;
// --------------------------------------------------------------------------------------------
@@ -165,6 +169,31 @@ public class JobGraph implements IOReadableWritable {
return this.jobConfiguration;
}
+ /**
+ * Sets the number of times that failed tasks are re-executed. A value of zero
+ * effectively disables fault tolerance. A value of {@code -1} indicates that the system
+ * default value (as defined in the configuration) should be used.
+ *
+ * @param numberOfExecutionRetries The number of times the system will try to re-execute failed tasks.
+ */
+ public void setNumberOfExecutionRetries(int numberOfExecutionRetries) {
+ if (numberOfExecutionRetries < -1) {
+ throw new IllegalArgumentException("The number of execution retries must be non-negative, or -1 (use system default)");
+ }
+ this.numExecutionRetries = numberOfExecutionRetries;
+ }
+
+ /**
+ * Gets the number of times the system will try to re-execute failed tasks. A value
+ * of {@code -1} indicates that the system default value (as defined in the configuration)
+ * should be used.
+ *
+ * @return The number of times the system will try to re-execute failed tasks.
+ */
+ public int getNumberOfExecutionRetries() {
+ return numExecutionRetries;
+ }
+
public void setAllowQueuedScheduling(boolean allowQueuedScheduling) {
this.allowQueuedScheduling = allowQueuedScheduling;
}
@@ -318,6 +347,7 @@ public class JobGraph implements IOReadableWritable {
this.jobID.read(in);
this.jobName = StringValue.readString(in);
this.jobConfiguration.read(in);
+ this.numExecutionRetries = in.readInt();
this.allowQueuedScheduling = in.readBoolean();
final int numVertices = in.readInt();
@@ -347,6 +377,7 @@ public class JobGraph implements IOReadableWritable {
this.jobID.write(out);
StringValue.writeString(this.jobName, out);
this.jobConfiguration.write(out);
+ out.writeInt(numExecutionRetries);
out.writeBoolean(allowQueuedScheduling);
// write the task vertices using java serialization (to resolve references in the object graph)
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java
index 857d999..3722945 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java
@@ -42,7 +42,10 @@ public enum JobStatus {
CANCELED(true),
/** All of the job's tasks have successfully finished. */
- FINISHED(true);
+ FINISHED(true),
+
+ /** The job is currently undergoing a reset and total restart */
+ RESTARTING(false);
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
index c93eee3..5a32244 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
@@ -115,8 +115,7 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
/** Executor service for asynchronous commands (to relieve the RPC threads of work) */
- private final ExecutorService executorService = Executors.newFixedThreadPool(2 * Hardware
- .getNumberCPUCores(), ExecutorThreadFactory.INSTANCE);
+ private final ExecutorService executorService = Executors.newFixedThreadPool(2 * Hardware.getNumberCPUCores(), ExecutorThreadFactory.INSTANCE);
/** The RPC end point through which the JobManager gets its calls */
@@ -140,7 +139,9 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
private final int recommendedClientPollingInterval;
// end: these will be consolidated / removed
-
+
+ private final int defaultExecutionRetries;
+
private final AtomicBoolean isShutdownInProgress = new AtomicBoolean(false);
private volatile boolean isShutDown;
@@ -173,6 +174,10 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
// Read the suggested client polling interval
this.recommendedClientPollingInterval = GlobalConfiguration.getInteger(
ConfigConstants.JOBCLIENT_POLLING_INTERVAL_KEY, ConfigConstants.DEFAULT_JOBCLIENT_POLLING_INTERVAL);
+
+ // read the default number of times that failed tasks should be re-executed
+ this.defaultExecutionRetries = GlobalConfiguration.getInteger(
+ ConfigConstants.DEFAULT_EXECUTION_RETRIES_KEY, ConfigConstants.DEFAULT_EXECUTION_RETRIES);
// Load the job progress collector
this.eventCollector = new EventCollector(this.recommendedClientPollingInterval);
@@ -326,6 +331,10 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
executionGraph = new ExecutionGraph(job.getJobID(), job.getName(),
job.getJobConfiguration(), job.getUserJarBlobKeys(), this.executorService);
+
+ executionGraph.setNumberOfRetriesLeft(job.getNumberOfExecutionRetries() >= 0 ?
+ job.getNumberOfExecutionRetries() : this.defaultExecutionRetries);
+
ExecutionGraph previous = this.currentJobs.putIfAbsent(job.getJobID(), executionGraph);
if (previous != null) {
throw new JobException("Concurrent submission of a job with the same jobId: " + job.getJobID());
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
index f554bbb..36430de 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
@@ -63,7 +63,13 @@ public class CoLocationConstraint {
return this.sharedSlot == null;
}
+ public boolean isUnassignedOrDisposed() {
+ return this.sharedSlot == null || this.sharedSlot.isDisposed();
+ }
+
public AbstractID getGroupId() {
return this.group.getId();
}
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroup.java
index d1c3bd5..fa379cf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroup.java
@@ -74,7 +74,7 @@ public class CoLocationGroup implements java.io.Serializable {
return constraints.get(subtask);
}
- public void ensureConstraints(int num) {
+ private void ensureConstraints(int num) {
if (constraints == null) {
constraints = new ArrayList<CoLocationConstraint>(num);
} else {
@@ -92,4 +92,13 @@ public class CoLocationGroup implements java.io.Serializable {
public AbstractID getId() {
return id;
}
+
+ public void resetConstraints() {
+ for (CoLocationConstraint c : this.constraints) {
+ if (!c.isUnassignedOrDisposed()) {
+ throw new IllegalStateException("Cannot reset co-location group: some constraints still have executing vertices.");
+ }
+ }
+ this.constraints.clear();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java
index c5a88f3..dcde6b2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java
@@ -70,6 +70,15 @@ public class SlotSharingGroup implements java.io.Serializable {
return this.taskAssignment;
}
+ public void clearTaskAssignment() {
+ if (this.taskAssignment != null) {
+ if (this.taskAssignment.getNumberOfSlots() > 0) {
+ throw new IllegalStateException("SlotSharingGroup cannot clear task assignment, group still has allocated resources.");
+ }
+ }
+ this.taskAssignment = null;
+ }
+
// --------------------------------------------------------------------------------------------
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
index 606fff1..df32a81 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
@@ -21,11 +21,9 @@ package org.apache.flink.runtime.execution.librarycache;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.runtime.blob.BlobCache;
import org.apache.flink.runtime.blob.BlobClient;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.blob.BlobServer;
-import org.apache.flink.runtime.blob.BlobService;
import org.apache.flink.runtime.jobgraph.JobID;
import org.junit.Test;
import static org.junit.Assert.*;
@@ -33,7 +31,6 @@ import static org.junit.Assert.*;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.net.URL;
import java.util.ArrayList;
import java.util.List;
@@ -90,11 +87,15 @@ public class BlobLibraryCacheManagerTest {
}
assertEquals(2, caughtExceptions);
- }catch(Exception e){
+
+ bc.close();
+ }
+ catch(Exception e){
e.printStackTrace();
fail(e.getMessage());
- }finally{
- if(server != null){
+ }
+ finally{
+ if (server != null){
try {
server.shutdown();
} catch (IOException e) {
@@ -102,7 +103,7 @@ public class BlobLibraryCacheManagerTest {
}
}
- if(libraryCacheManager != null){
+ if (libraryCacheManager != null){
try {
libraryCacheManager.shutdown();
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 74ab08b..4cddcbd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -27,17 +27,14 @@ import static org.mockito.Mockito.when;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.doAnswer;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.instance.AllocatedSlot;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
new file mode 100644
index 0000000..f1855f2
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getInstance;
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getSimpleAcknowledgingTaskmanager;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import org.apache.flink.runtime.jobmanager.tasks.NoOpInvokable;
+import org.apache.flink.runtime.protocols.TaskOperationProtocol;
+import org.junit.Test;
+
+public class ExecutionGraphRestartTest {
+
+ @Test
+ public void testRestartManually() {
+ final int NUM_TASKS = 31;
+
+ try {
+ TaskOperationProtocol tm = getSimpleAcknowledgingTaskmanager();
+ Instance instance = getInstance(tm);
+
+ Scheduler scheduler = new Scheduler();
+ scheduler.newInstanceAvailable(instance);
+
+ // The job:
+
+ final AbstractJobVertex sender = new AbstractJobVertex("Task");
+ sender.setInvokableClass(NoOpInvokable.class);
+ sender.setParallelism(NUM_TASKS);
+
+ final JobGraph jobGraph = new JobGraph("Pointwise Job", sender);
+
+ ExecutionGraph eg = new ExecutionGraph(new JobID(), "test job", new Configuration());
+ eg.setNumberOfRetriesLeft(0);
+ eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
+
+ assertEquals(JobStatus.CREATED, eg.getState());
+
+ eg.scheduleForExecution(scheduler);
+ assertEquals(JobStatus.RUNNING, eg.getState());
+
+ eg.getAllExecutionVertices().iterator().next().fail(new Exception("Test Exception"));
+ assertEquals(JobStatus.FAILED, eg.getState());
+
+ eg.restart();
+ assertEquals(JobStatus.RUNNING, eg.getState());
+
+ for (ExecutionVertex v : eg.getAllExecutionVertices()) {
+ v.executionFinished();
+ }
+ assertEquals(JobStatus.FINISHED, eg.getState());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testRestartSelf() {
+ final int NUM_TASKS = 31;
+
+ try {
+ TaskOperationProtocol tm = getSimpleAcknowledgingTaskmanager();
+ Instance instance = getInstance(tm);
+
+ Scheduler scheduler = new Scheduler();
+ scheduler.newInstanceAvailable(instance);
+
+ // The job:
+
+ final AbstractJobVertex sender = new AbstractJobVertex("Task");
+ sender.setInvokableClass(NoOpInvokable.class);
+ sender.setParallelism(NUM_TASKS);
+
+ final JobGraph jobGraph = new JobGraph("Pointwise Job", sender);
+
+ ExecutionGraph eg = new ExecutionGraph(new JobID(), "test job", new Configuration());
+ eg.setNumberOfRetriesLeft(1);
+ eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
+
+ assertEquals(JobStatus.CREATED, eg.getState());
+
+ eg.scheduleForExecution(scheduler);
+ assertEquals(JobStatus.RUNNING, eg.getState());
+
+ eg.getAllExecutionVertices().iterator().next().fail(new Exception("Test Exception"));
+
+ // should have restarted itself
+ assertEquals(JobStatus.RUNNING, eg.getState());
+
+ for (ExecutionVertex v : eg.getAllExecutionVertices()) {
+ v.executionFinished();
+ }
+ assertEquals(JobStatus.FINISHED, eg.getState());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 7eefa7e..30b05ee 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -96,11 +96,15 @@ public class ExecutionGraphTestUtils {
// --------------------------------------------------------------------------------------------
public static Instance getInstance(final TaskOperationProtocol top) throws Exception {
+ return getInstance(top, 1);
+ }
+
+ public static Instance getInstance(final TaskOperationProtocol top, int numSlots) throws Exception {
HardwareDescription hardwareDescription = new HardwareDescription(4, 2L*1024*1024*1024, 1024*1024*1024, 512*1024*1024);
InetAddress address = InetAddress.getByName("127.0.0.1");
InstanceConnectionInfo connection = new InstanceConnectionInfo(address, 10000, 10001);
- return new Instance(connection, new InstanceID(), hardwareDescription, 1) {
+ return new Instance(connection, new InstanceID(), hardwareDescription, numSlots) {
@Override
public TaskOperationProtocol getTaskManagerProxy() {
return top;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
index f5a4d39..2848466 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
@@ -22,12 +22,9 @@ import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.*;
import static org.junit.Assert.*;
import static org.mockito.Mockito.mock;
-import java.util.ArrayList;
import java.util.Arrays;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.blob.BlobKey;
-import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.instance.AllocatedSlot;
import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
import org.apache.flink.runtime.jobgraph.JobID;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
index 1f74ae3..9769529 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
@@ -26,12 +26,9 @@ import static org.mockito.Mockito.when;
import static org.mockito.Mockito.times;
import java.io.IOException;
-import java.util.ArrayList;
-import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.instance.AllocatedSlot;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.jobgraph.JobID;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
index efb2af4..f3081bc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
@@ -28,10 +28,8 @@ import static org.mockito.Mockito.when;
import static org.mockito.Mockito.times;
import static org.mockito.Matchers.any;
-import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.instance.AllocatedSlot;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.jobgraph.JobID;
@@ -43,8 +41,6 @@ import org.junit.Test;
import org.mockito.Matchers;
-import java.util.ArrayList;
-
public class ExecutionVertexDeploymentTest {
@Test
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java
index 8ed7a6d..168c454 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java
@@ -36,6 +36,10 @@ public class JobManagerTestUtils {
}
public static final JobManager startJobManager(int numTaskManagers, int numSlotsPerTaskManager) throws Exception {
+ return startJobManager(numTaskManagers, numSlotsPerTaskManager, null);
+ }
+
+ public static final JobManager startJobManager(int numTaskManagers, int numSlotsPerTaskManager, Configuration additionalParams) throws Exception {
Configuration cfg = new Configuration();
cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, getAvailablePort());
@@ -43,6 +47,10 @@ public class JobManagerTestUtils {
cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTaskManager);
cfg.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskManagers);
+ if (additionalParams != null) {
+ cfg.addAll(additionalParams);
+ }
+
GlobalConfiguration.includeConfiguration(cfg);
JobManager jm = new JobManager(ExecutionMode.LOCAL);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.java
index f8b229f..23a75cc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/CoLocationConstraintITCase.java
@@ -22,10 +22,8 @@ import static org.apache.flink.runtime.jobgraph.JobManagerTestUtils.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
-import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.client.AbstractJobResult;
import org.apache.flink.runtime.client.JobSubmissionResult;
-import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.instance.LocalInstanceManager;
import org.apache.flink.runtime.io.network.bufferprovider.GlobalBufferPool;
@@ -38,8 +36,6 @@ import org.apache.flink.runtime.jobmanager.tasks.Receiver;
import org.apache.flink.runtime.jobmanager.tasks.Sender;
import org.junit.Test;
-import java.util.ArrayList;
-
public class CoLocationConstraintITCase {
/**
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
index 0952f60..ae7857f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
@@ -24,10 +24,8 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.client.AbstractJobResult;
import org.apache.flink.runtime.client.JobSubmissionResult;
-import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.instance.LocalInstanceManager;
import org.apache.flink.runtime.io.network.api.RecordReader;
@@ -47,8 +45,6 @@ import org.apache.flink.runtime.jobmanager.tasks.Sender;
import org.apache.flink.runtime.types.IntegerRecord;
import org.junit.Test;
-import java.util.ArrayList;
-
/**
* This test is intended to cover the basic functionality of the {@link JobManager}.
*/
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/RecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/RecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/RecoveryITCase.java
new file mode 100644
index 0000000..0b8518f
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/RecoveryITCase.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import static org.apache.flink.runtime.jobgraph.JobManagerTestUtils.startJobManager;
+import static org.apache.flink.runtime.jobgraph.JobManagerTestUtils.waitForTaskThreadsToBeTerminated;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.client.AbstractJobResult;
+import org.apache.flink.runtime.client.JobSubmissionResult;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.instance.LocalInstanceManager;
+import org.apache.flink.runtime.io.network.bufferprovider.GlobalBufferPool;
+import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.jobmanager.tasks.ReceiverBlockingOnce;
+import org.apache.flink.runtime.jobmanager.tasks.ReceiverFailingOnce;
+import org.apache.flink.runtime.jobmanager.tasks.Sender;
+import org.junit.Test;
+
+/**
+ * This test is intended to cover the basic functionality of the {@link JobManager}.
+ */
+public class RecoveryITCase {
+
+ @Test
+ public void testForwardJob() {
+
+ ReceiverFailingOnce.resetFailedBefore();
+
+ final int NUM_TASKS = 31;
+
+ JobManager jm = null;
+
+ try {
+ final AbstractJobVertex sender = new AbstractJobVertex("Sender");
+ final AbstractJobVertex receiver = new AbstractJobVertex("Receiver");
+
+ sender.setInvokableClass(Sender.class);
+ receiver.setInvokableClass(ReceiverFailingOnce.class);
+
+ sender.setParallelism(NUM_TASKS);
+ receiver.setParallelism(NUM_TASKS);
+
+ receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE);
+
+ final JobGraph jobGraph = new JobGraph("Pointwise Job", sender, receiver);
+ jobGraph.setNumberOfExecutionRetries(1);
+
+ jm = startJobManager(2 * NUM_TASKS);
+
+ final GlobalBufferPool bp = ((LocalInstanceManager) jm.getInstanceManager())
+ .getTaskManagers()[0].getChannelManager().getGlobalBufferPool();
+
+ JobSubmissionResult result = jm.submitJob(jobGraph);
+
+ if (result.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS) {
+ System.out.println(result.getDescription());
+ }
+ assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode());
+
+ // monitor the execution
+ ExecutionGraph eg = jm.getCurrentJobs().get(jobGraph.getJobID());
+
+ if (eg != null) {
+ eg.waitForJobEnd();
+ assertEquals(JobStatus.FINISHED, eg.getState());
+ }
+ else {
+ // already done, that was fast;
+ }
+
+ // make sure that in any case, the network buffers are all returned
+ waitForTaskThreadsToBeTerminated();
+ assertEquals(bp.numBuffers(), bp.numAvailableBuffers());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ finally {
+ if (jm != null) {
+ jm.shutdown();
+ }
+ }
+ }
+
+ @Test
+ public void testForwardJobWithSlotSharing() {
+
+ ReceiverFailingOnce.resetFailedBefore();
+
+ final int NUM_TASKS = 31;
+
+ JobManager jm = null;
+
+ try {
+ final AbstractJobVertex sender = new AbstractJobVertex("Sender");
+ final AbstractJobVertex receiver = new AbstractJobVertex("Receiver");
+
+ sender.setInvokableClass(Sender.class);
+ receiver.setInvokableClass(ReceiverFailingOnce.class);
+
+ sender.setParallelism(NUM_TASKS);
+ receiver.setParallelism(NUM_TASKS);
+
+ receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE);
+
+ SlotSharingGroup sharingGroup = new SlotSharingGroup();
+ sender.setSlotSharingGroup(sharingGroup);
+ receiver.setSlotSharingGroup(sharingGroup);
+
+ final JobGraph jobGraph = new JobGraph("Pointwise Job", sender, receiver);
+ jobGraph.setNumberOfExecutionRetries(1);
+
+ jm = startJobManager(NUM_TASKS);
+
+ final GlobalBufferPool bp = ((LocalInstanceManager) jm.getInstanceManager())
+ .getTaskManagers()[0].getChannelManager().getGlobalBufferPool();
+
+ JobSubmissionResult result = jm.submitJob(jobGraph);
+
+ if (result.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS) {
+ System.out.println(result.getDescription());
+ }
+ assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode());
+
+ // monitor the execution
+ ExecutionGraph eg = jm.getCurrentJobs().get(jobGraph.getJobID());
+
+ if (eg != null) {
+ eg.waitForJobEnd();
+ assertEquals(JobStatus.FINISHED, eg.getState());
+ }
+ else {
+ // already done, that was fast;
+ }
+
+ // make sure that in any case, the network buffers are all returned
+ waitForTaskThreadsToBeTerminated();
+ assertEquals(bp.numBuffers(), bp.numAvailableBuffers());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ finally {
+ if (jm != null) {
+ jm.shutdown();
+ }
+ }
+ }
+
+ @Test
+ public void testRecoverTaskManagerFailure() {
+
+ final int NUM_TASKS = 31;
+
+ JobManager jm = null;
+
+ try {
+ final AbstractJobVertex sender = new AbstractJobVertex("Sender");
+ final AbstractJobVertex receiver = new AbstractJobVertex("Receiver");
+
+ sender.setInvokableClass(Sender.class);
+ receiver.setInvokableClass(ReceiverBlockingOnce.class);
+ sender.setParallelism(NUM_TASKS);
+ receiver.setParallelism(NUM_TASKS);
+
+ receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE);
+
+ SlotSharingGroup sharingGroup = new SlotSharingGroup();
+ sender.setSlotSharingGroup(sharingGroup);
+ receiver.setSlotSharingGroup(sharingGroup);
+
+ final JobGraph jobGraph = new JobGraph("Pointwise Job", sender, receiver);
+ jobGraph.setNumberOfExecutionRetries(1);
+
+ // make sure we have fast heartbeats and failure detection
+ Configuration cfg = new Configuration();
+ cfg.setInteger(ConfigConstants.JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT_KEY, 2000);
+ cfg.setInteger(ConfigConstants.TASK_MANAGER_HEARTBEAT_INTERVAL_KEY, 500);
+
+ jm = startJobManager(2, NUM_TASKS, cfg);
+
+ JobSubmissionResult result = jm.submitJob(jobGraph);
+
+ if (result.getReturnCode() != AbstractJobResult.ReturnCode.SUCCESS) {
+ System.out.println(result.getDescription());
+ }
+ assertEquals(AbstractJobResult.ReturnCode.SUCCESS, result.getReturnCode());
+
+ // monitor the execution
+ ExecutionGraph eg = jm.getCurrentJobs().get(jobGraph.getJobID());
+
+ // wait for a bit until all is running, make sure the second attempt does not block
+ Thread.sleep(300);
+ ReceiverBlockingOnce.setShouldNotBlock();
+
+ // shutdown one of the taskmanagers
+ ((LocalInstanceManager) jm.getInstanceManager()).getTaskManagers()[0].shutdown();
+
+ // wait for the recovery to do its work
+ if (eg != null) {
+ eg.waitForJobEnd();
+ assertEquals(JobStatus.FINISHED, eg.getState());
+ }
+ else {
+ // already done, that was fast;
+ }
+
+ // make sure that in any case, the network buffers are all returned
+ waitForTaskThreadsToBeTerminated();
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ finally {
+ if (jm != null) {
+ jm.shutdown();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotSharingITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotSharingITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotSharingITCase.java
index 29293da..5d79aab 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotSharingITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotSharingITCase.java
@@ -22,10 +22,8 @@ import static org.apache.flink.runtime.jobgraph.JobManagerTestUtils.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
-import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.client.AbstractJobResult;
import org.apache.flink.runtime.client.JobSubmissionResult;
-import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.instance.LocalInstanceManager;
import org.apache.flink.runtime.io.network.bufferprovider.GlobalBufferPool;
@@ -39,8 +37,6 @@ import org.apache.flink.runtime.jobmanager.tasks.Receiver;
import org.apache.flink.runtime.jobmanager.tasks.Sender;
import org.junit.Test;
-import java.util.ArrayList;
-
public class SlotSharingITCase {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.java
index 19ff690..6b8be15 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.java
@@ -22,11 +22,9 @@ import static org.apache.flink.runtime.jobgraph.JobManagerTestUtils.startJobMana
import static org.apache.flink.runtime.jobgraph.JobManagerTestUtils.waitForTaskThreadsToBeTerminated;
import static org.junit.Assert.*;
-import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.client.AbstractJobResult;
import org.apache.flink.runtime.client.JobSubmissionResult;
import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.instance.LocalInstanceManager;
@@ -39,7 +37,6 @@ import org.apache.flink.runtime.jobmanager.tasks.Sender;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.junit.Test;
-import java.util.ArrayList;
public class TaskManagerFailsITCase {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/ReceiverBlockingOnce.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/ReceiverBlockingOnce.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/ReceiverBlockingOnce.java
new file mode 100644
index 0000000..3425842
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/ReceiverBlockingOnce.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.tasks;
+
+import org.apache.flink.runtime.io.network.api.RecordReader;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.types.IntegerRecord;
+
+public final class ReceiverBlockingOnce extends AbstractInvokable {
+
+ private static boolean shouldBlock = true;
+
+ private RecordReader<IntegerRecord> reader;
+
+ @Override
+ public void registerInputOutput() {
+ reader = new RecordReader<IntegerRecord>(this, IntegerRecord.class);
+ }
+
+ @Override
+ public void invoke() throws Exception {
+ if (shouldBlock) {
+
+ Object o = new Object();
+ synchronized (o) {
+ o.wait();
+ }
+ }
+
+ while (reader.next() != null);
+ }
+
+ public static void setShouldNotBlock() {
+ shouldBlock = false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/ReceiverFailingOnce.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/ReceiverFailingOnce.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/ReceiverFailingOnce.java
new file mode 100644
index 0000000..3fad6b1
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/tasks/ReceiverFailingOnce.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.tasks;
+
+import org.apache.flink.runtime.io.network.api.RecordReader;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.types.IntegerRecord;
+
+public final class ReceiverFailingOnce extends AbstractInvokable {
+
+ private static boolean hasFailedBefore = false;
+
+ private RecordReader<IntegerRecord> reader;
+
+ @Override
+ public void registerInputOutput() {
+ reader = new RecordReader<IntegerRecord>(this, IntegerRecord.class);
+ }
+
+ @Override
+ public void invoke() throws Exception {
+ if (!hasFailedBefore && getEnvironment().getIndexInSubtaskGroup() == 0) {
+ hasFailedBefore = true;
+ throw new Exception("Test exception");
+ }
+
+ while (reader.next() != null);
+ }
+
+
+ public static void resetFailedBefore() {
+ hasFailedBefore = false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/dd687bc6/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index 53d57d2..ff96519 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -75,6 +75,22 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
* value can be overridden by individual operations using [[DataSet.setParallelism]]
*/
def getDegreeOfParallelism = javaEnv.getDegreeOfParallelism
+
+ /**
+ * Sets the number of times that failed tasks are re-executed. A value of zero
+ * effectively disables fault tolerance. A value of "-1" indicates that the system
+ * default value (as defined in the configuration) should be used.
+ */
+ def setNumberOfExecutionRetries(numRetries: Int): Unit = {
+ javaEnv.setNumberOfExecutionRetries(numRetries)
+ }
+
+ /**
+ * Gets the number of times the system will try to re-execute failed tasks. A value
+ * of "-1" indicates that the system default value (as defined in the configuration)
+ * should be used.
+ */
+ def getNumberOfExecutionRetries = javaEnv.getNumberOfExecutionRetries
/**
* Gets the UUID by which this environment is identified. The UUID sets the execution context
[2/5] git commit: [FLINK-1202] Remove incomplete file outputs on
failure
Posted by se...@apache.org.
[FLINK-1202] Remove incomplete file outputs on failure
This closes #175
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/a747b614
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/a747b614
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/a747b614
Branch: refs/heads/master
Commit: a747b6146ed5d5766b42e6bed3c2e7a811e8d00e
Parents: f42dcc3
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Nov 3 11:47:51 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Nov 3 16:04:46 2014 +0100
----------------------------------------------------------------------
.../api/common/io/CleanupWhenUnsuccessful.java | 32 +++++++++++++++++
.../flink/api/common/io/FileOutputFormat.java | 37 +++++++++++++++-----
.../flink/runtime/operators/DataSinkTask.java | 26 ++++++++++++++
.../runtime/operators/DataSinkTaskTest.java | 7 ++--
4 files changed, 90 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a747b614/flink-core/src/main/java/org/apache/flink/api/common/io/CleanupWhenUnsuccessful.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/CleanupWhenUnsuccessful.java b/flink-core/src/main/java/org/apache/flink/api/common/io/CleanupWhenUnsuccessful.java
new file mode 100644
index 0000000..4b912e1
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/CleanupWhenUnsuccessful.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.io;
+
+/**
+ * {@link OutputFormat}s may implement this interface to run a cleanup hook when the execution is not successful.
+ */
+public interface CleanupWhenUnsuccessful {
+
+ /**
+ * Hook that is called upon an unsuccessful execution.
+ *
+ * @throws Exception The method may forward exceptions when the cleanup fails.
+ */
+ void tryCleanupOnError() throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a747b614/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java
index a9beddb..bc7ab73 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java
@@ -18,6 +18,7 @@
package org.apache.flink.api.common.io;
+import java.io.FileNotFoundException;
import java.io.IOException;
import org.slf4j.Logger;
@@ -35,7 +36,7 @@ import org.apache.flink.core.fs.FileSystem.WriteMode;
* The abstract base class for all output formats that are file based. Contains the logic to open/close the target
* file streams.
*/
-public abstract class FileOutputFormat<IT> implements OutputFormat<IT>, InitializeOnMaster {
+public abstract class FileOutputFormat<IT> implements OutputFormat<IT>, InitializeOnMaster, CleanupWhenUnsuccessful {
private static final long serialVersionUID = 1L;
@@ -106,10 +107,14 @@ public abstract class FileOutputFormat<IT> implements OutputFormat<IT>, Initiali
// --------------------------------------------------------------------------------------------
- /**
- * The stream to which the data is written;
- */
+ /** The stream to which the data is written; */
protected transient FSDataOutputStream stream;
+
+ /** The path that is actually written to (may a a file in a the directory defined by {@code outputFilePath} ) */
+ private transient Path actualFilePath;
+
+ /** Flag indicating whether this format actually created a file, which should be removed on cleanup. */
+ private transient boolean fileCreated;
// --------------------------------------------------------------------------------------------
@@ -231,12 +236,13 @@ public abstract class FileOutputFormat<IT> implements OutputFormat<IT>, Initiali
// Suffix the path with the parallel instance index, if needed
- if (numTasks > 1 || outputDirectoryMode == OutputDirectoryMode.ALWAYS) {
- p = p.suffix("/" + (taskNumber+1));
- }
+ this.actualFilePath = (numTasks > 1 || outputDirectoryMode == OutputDirectoryMode.ALWAYS) ? p.suffix("/" + (taskNumber+1)) : p;
// create output file
- this.stream = fs.create(p, writeMode == WriteMode.OVERWRITE);
+ this.stream = fs.create(this.actualFilePath, writeMode == WriteMode.OVERWRITE);
+
+ // at this point, the file creation must have succeeded, or an exception has been thrown
+ this.fileCreated = true;
}
@Override
@@ -283,6 +289,21 @@ public abstract class FileOutputFormat<IT> implements OutputFormat<IT>, Initiali
}
}
+ @Override
+ public void tryCleanupOnError() {
+ if (this.fileCreated) {
+ this.fileCreated = false;
+
+ try {
+ FileSystem.get(this.actualFilePath.toUri()).delete(actualFilePath, false);
+ } catch (FileNotFoundException e) {
+ // ignore, may not be visible yet or may be already removed
+ } catch (Throwable t) {
+ LOG.error("Could not remove the incomplete file " + actualFilePath);
+ }
+ }
+ }
+
// ============================================================================================
/**
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a747b614/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
index 1a378b2..b1185c2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.operators;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.flink.api.common.io.CleanupWhenUnsuccessful;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -78,6 +79,8 @@ public class DataSinkTask<IT> extends AbstractInvokable {
// cancel flag
private volatile boolean taskCanceled;
+ private volatile boolean cleanupCalled;
+
@Override
public void registerInputOutput() {
@@ -180,6 +183,18 @@ public class DataSinkTask<IT> extends AbstractInvokable {
}
}
catch (Exception ex) {
+
+ // make a best effort to clean up
+ try {
+ if (!cleanupCalled && format instanceof CleanupWhenUnsuccessful) {
+ cleanupCalled = true;
+ ((CleanupWhenUnsuccessful) format).tryCleanupOnError();
+ }
+ }
+ catch (Throwable t) {
+ LOG.error("Cleanup on error failed.", t);
+ }
+
ex = ExceptionInChainedStubException.exceptionUnwrap(ex);
if (ex instanceof CancelTaskException) {
@@ -237,6 +252,17 @@ public class DataSinkTask<IT> extends AbstractInvokable {
try {
this.format.close();
} catch (Throwable t) {}
+
+ // make a best effort to clean up
+ try {
+ if (!cleanupCalled && format instanceof CleanupWhenUnsuccessful) {
+ cleanupCalled = true;
+ ((CleanupWhenUnsuccessful) format).tryCleanupOnError();
+ }
+ }
+ catch (Throwable t) {
+ LOG.error("Cleanup on error failed.", t);
+ }
}
if (LOG.isDebugEnabled()) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a747b614/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
index f5381e4..3219a21 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
@@ -306,7 +306,7 @@ public class DataSinkTaskTest extends TaskTestBase
// assert that temp file was created
File tempTestFile = new File(this.tempTestPath);
- Assert.assertTrue("Temp output file does not exist",tempTestFile.exists());
+ Assert.assertFalse("Temp output file has not been removed", tempTestFile.exists());
}
@@ -347,7 +347,7 @@ public class DataSinkTaskTest extends TaskTestBase
// assert that temp file was created
File tempTestFile = new File(this.tempTestPath);
- Assert.assertTrue("Temp output file does not exist",tempTestFile.exists());
+ Assert.assertFalse("Temp output file has not been removed", tempTestFile.exists());
}
@@ -388,8 +388,7 @@ public class DataSinkTaskTest extends TaskTestBase
// assert that temp file was created
File tempTestFile = new File(this.tempTestPath);
- Assert.assertTrue("Temp output file does not exist",tempTestFile.exists());
-
+ Assert.assertFalse("Temp output file has not been removed", tempTestFile.exists());
}
@Test