You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/11/03 17:04:13 UTC
[5/5] git commit: Introduce a delay before restarts to make sure that
taskmanager failures are detected before restart.
Introduce a delay before restarts to make sure that taskmanager failures are detected before restart.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/f0fd8823
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/f0fd8823
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/f0fd8823
Branch: refs/heads/master
Commit: f0fd8823ee8d157f9a5a00b2687827b94206d7b7
Parents: dd687bc
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Oct 6 22:01:36 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Nov 3 16:08:14 2014 +0100
----------------------------------------------------------------------
.../api/common/typeutils/TypeSerializer.java | 4 ++--
.../flink/configuration/ConfigConstants.java | 2 +-
.../runtime/executiongraph/ExecutionGraph.java | 18 ++++++++++++++++++
.../flink/runtime/instance/InstanceManager.java | 8 ++++++--
.../flink/runtime/jobmanager/JobManager.java | 8 ++++++++
.../runtime/instance/InstanceManagerTest.java | 12 +++++-------
.../runtime/jobgraph/JobManagerTestUtils.java | 2 ++
.../flink/runtime/jobmanager/RecoveryITCase.java | 4 ++--
8 files changed, 44 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f0fd8823/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
index 87d7e20..5e32c86 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
@@ -117,7 +117,7 @@ public abstract class TypeSerializer<T> implements Serializable {
* De-serializes a record from the given source input view.
*
* @param source The input view from which to read the data.
- * @result The deserialized element.
+ * @return The deserialized element.
*
* @throws IOException Thrown, if the de-serialization encountered an I/O related error. Typically raised by the
* input view, which may have an underlying I/O channel from which it reads.
@@ -129,7 +129,7 @@ public abstract class TypeSerializer<T> implements Serializable {
*
* @param reuse The record instance into which to de-serialize the data.
* @param source The input view from which to read the data.
- * @result The deserialized element.
+ * @return The deserialized element.
*
* @throws IOException Thrown, if the de-serialization encountered an I/O related error. Typically raised by the
* input view, which may have an underlying I/O channel from which it reads.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f0fd8823/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 75ebe54..dcd9342 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -344,7 +344,7 @@ public final class ConfigConstants {
* Default number of seconds after which a task manager is marked as failed.
*/
// 30 seconds (its enough to get to mars, should be enough to detect failure)
- public static final int DEFAULT_JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT = 30;
+ public static final int DEFAULT_JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT = 30*1000;
/**
* The default network port the task manager expects incoming IPC connections. The {@code -1} means that
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f0fd8823/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 9a33dbf..3df3452 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -107,6 +107,8 @@ public class ExecutionGraph {
private int numberOfRetriesLeft;
+ private long delayBeforeRetrying;
+
private volatile JobStatus state = JobStatus.CREATED;
private volatile Throwable failureCause;
@@ -159,6 +161,17 @@ public class ExecutionGraph {
return numberOfRetriesLeft;
}
+ public void setDelayBeforeRetrying(long delayBeforeRetrying) {
+ if (delayBeforeRetrying < 0) {
+ throw new IllegalArgumentException("Delay before retry must be non-negative.");
+ }
+ this.delayBeforeRetrying = delayBeforeRetrying;
+ }
+
+ public long getDelayBeforeRetrying() {
+ return delayBeforeRetrying;
+ }
+
public void attachJobGraph(List<AbstractJobVertex> topologiallySorted) throws JobException {
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Attaching %d topologically sorted vertices to existing job graph with %d "
@@ -428,6 +441,11 @@ public class ExecutionGraph {
execute(new Runnable() {
@Override
public void run() {
+ try {
+ Thread.sleep(delayBeforeRetrying);
+ } catch (InterruptedException e) {
+ // should only happen on shutdown
+ }
restart();
}
});
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f0fd8823/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
index 8127f1d..ced1afe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
@@ -77,7 +77,7 @@ public class InstanceManager {
* where a task manager is still considered alive.
*/
public InstanceManager() {
- this(1000 * GlobalConfiguration.getLong(
+ this(GlobalConfiguration.getLong(
ConfigConstants.JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT_KEY,
ConfigConstants.DEFAULT_JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT));
}
@@ -98,6 +98,10 @@ public class InstanceManager {
new Timer(true).schedule(cleanupStaleMachines, cleanupInterval, cleanupInterval);
}
+
+ public long getHeartbeatTimeout() {
+ return heartbeatTimeout;
+ }
public void shutdown() {
synchronized (this.lock) {
@@ -126,7 +130,7 @@ public class InstanceManager {
synchronized (this.lock) {
if (this.shutdown) {
- throw new IllegalStateException("InstanceManager is shut down.");
+ return false;
}
Instance host = registeredHostsById.get(instanceId);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f0fd8823/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
index 5a32244..4bce4a4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
@@ -142,6 +142,8 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
private final int defaultExecutionRetries;
+ private final long delayBetweenRetries;
+
private final AtomicBoolean isShutdownInProgress = new AtomicBoolean(false);
private volatile boolean isShutDown;
@@ -179,6 +181,11 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
this.defaultExecutionRetries = GlobalConfiguration.getInteger(
ConfigConstants.DEFAULT_EXECUTION_RETRIES_KEY, ConfigConstants.DEFAULT_EXECUTION_RETRIES);
+ // delay between retries should be one heartbeat timeout
+ this.delayBetweenRetries = 2 * GlobalConfiguration.getLong(
+ ConfigConstants.JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT_KEY,
+ ConfigConstants.DEFAULT_JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT);
+
// Load the job progress collector
this.eventCollector = new EventCollector(this.recommendedClientPollingInterval);
@@ -334,6 +341,7 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
executionGraph.setNumberOfRetriesLeft(job.getNumberOfExecutionRetries() >= 0 ?
job.getNumberOfExecutionRetries() : this.defaultExecutionRetries);
+ executionGraph.setDelayBeforeRetrying(this.delayBetweenRetries);
ExecutionGraph previous = this.currentJobs.putIfAbsent(job.getJobID(), executionGraph);
if (previous != null) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f0fd8823/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
index 34d86b4..ecdf891 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
@@ -43,6 +43,10 @@ public class InstanceManagerTest {
try {
InstanceManager cm = new InstanceManager();
+ // catches error that some parts assumed config values in seconds, others in
+ // milliseconds by verifying that the timeout is not larger than 2 minutes.
+ assertTrue(cm.getHeartbeatTimeout() < 2 * 60 * 1000);
+
final int ipcPort = 10000;
final int dataPort = 20000;
@@ -182,13 +186,7 @@ public class InstanceManagerTest {
// expected
}
- try {
- cm.reportHeartBeat(new InstanceID());
- fail("Should raise exception in shutdown state");
- }
- catch (IllegalStateException e) {
- // expected
- }
+ assertFalse(cm.reportHeartBeat(new InstanceID()));
}
catch (Exception e) {
System.err.println(e.getMessage());
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f0fd8823/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java
index 168c454..cc767c2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java
@@ -46,6 +46,8 @@ public class JobManagerTestUtils {
cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 10);
cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTaskManager);
cfg.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskManagers);
+ cfg.setInteger(ConfigConstants.TASK_MANAGER_HEARTBEAT_INTERVAL_KEY, 500);
+ cfg.setInteger(ConfigConstants.JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT_KEY, 2000);
if (additionalParams != null) {
cfg.addAll(additionalParams);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f0fd8823/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/RecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/RecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/RecoveryITCase.java
index 0b8518f..9e259f6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/RecoveryITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/RecoveryITCase.java
@@ -200,8 +200,8 @@ public class RecoveryITCase {
// make sure we have fast heartbeats and failure detection
Configuration cfg = new Configuration();
- cfg.setInteger(ConfigConstants.JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT_KEY, 2000);
- cfg.setInteger(ConfigConstants.TASK_MANAGER_HEARTBEAT_INTERVAL_KEY, 500);
+ cfg.setInteger(ConfigConstants.JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT_KEY, 3000);
+ cfg.setInteger(ConfigConstants.TASK_MANAGER_HEARTBEAT_INTERVAL_KEY, 1000);
jm = startJobManager(2, NUM_TASKS, cfg);