You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/11/03 17:04:13 UTC

[5/5] git commit: Introduce a delay before restarts to make sure that taskmanager failures are detected before restart.

Introduce a delay before restarts to make sure that taskmanager failures are detected before restart.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/f0fd8823
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/f0fd8823
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/f0fd8823

Branch: refs/heads/master
Commit: f0fd8823ee8d157f9a5a00b2687827b94206d7b7
Parents: dd687bc
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Oct 6 22:01:36 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Nov 3 16:08:14 2014 +0100

----------------------------------------------------------------------
 .../api/common/typeutils/TypeSerializer.java      |  4 ++--
 .../flink/configuration/ConfigConstants.java      |  2 +-
 .../runtime/executiongraph/ExecutionGraph.java    | 18 ++++++++++++++++++
 .../flink/runtime/instance/InstanceManager.java   |  8 ++++++--
 .../flink/runtime/jobmanager/JobManager.java      |  8 ++++++++
 .../runtime/instance/InstanceManagerTest.java     | 12 +++++-------
 .../runtime/jobgraph/JobManagerTestUtils.java     |  2 ++
 .../flink/runtime/jobmanager/RecoveryITCase.java  |  4 ++--
 8 files changed, 44 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f0fd8823/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
index 87d7e20..5e32c86 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
@@ -117,7 +117,7 @@ public abstract class TypeSerializer<T> implements Serializable {
 	 * De-serializes a record from the given source input view.
 	 * 
 	 * @param source The input view from which to read the data.
-	 * @result The deserialized element.
+	 * @return The deserialized element.
 	 * 
 	 * @throws IOException Thrown, if the de-serialization encountered an I/O related error. Typically raised by the
 	 *                     input view, which may have an underlying I/O channel from which it reads.
@@ -129,7 +129,7 @@ public abstract class TypeSerializer<T> implements Serializable {
 	 * 
 	 * @param reuse The record instance into which to de-serialize the data.
 	 * @param source The input view from which to read the data.
-	 * @result The deserialized element.
+	 * @return The deserialized element.
 	 * 
 	 * @throws IOException Thrown, if the de-serialization encountered an I/O related error. Typically raised by the
 	 *                     input view, which may have an underlying I/O channel from which it reads.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f0fd8823/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 75ebe54..dcd9342 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -344,7 +344,7 @@ public final class ConfigConstants {
 	 * Default number of seconds after which a task manager is marked as failed.
 	 */
 	// 30 seconds (its enough to get to mars, should be enough to detect failure)
-	public static final int DEFAULT_JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT = 30;
+	public static final int DEFAULT_JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT = 30*1000;
 	
 	/**
 	 * The default network port the task manager expects incoming IPC connections. The {@code -1} means that

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f0fd8823/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 9a33dbf..3df3452 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -107,6 +107,8 @@ public class ExecutionGraph {
 	
 	private int numberOfRetriesLeft;
 	
+	private long delayBeforeRetrying;
+	
 	private volatile JobStatus state = JobStatus.CREATED;
 	
 	private volatile Throwable failureCause;
@@ -159,6 +161,17 @@ public class ExecutionGraph {
 		return numberOfRetriesLeft;
 	}
 	
+	public void setDelayBeforeRetrying(long delayBeforeRetrying) {
+		if (delayBeforeRetrying < 0) {
+			throw new IllegalArgumentException("Delay before retry must be non-negative.");
+		}
+		this.delayBeforeRetrying = delayBeforeRetrying;
+	}
+	
+	public long getDelayBeforeRetrying() {
+		return delayBeforeRetrying;
+	}
+	
 	public void attachJobGraph(List<AbstractJobVertex> topologiallySorted) throws JobException {
 		if (LOG.isDebugEnabled()) {
 			LOG.debug(String.format("Attaching %d topologically sorted vertices to existing job graph with %d "
@@ -428,6 +441,11 @@ public class ExecutionGraph {
 								execute(new Runnable() {
 									@Override
 									public void run() {
+										try {
+											Thread.sleep(delayBeforeRetrying);
+										} catch (InterruptedException e) {
+											// should only happen on shutdown
+										}
 										restart();
 									}
 								});

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f0fd8823/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
index 8127f1d..ced1afe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
@@ -77,7 +77,7 @@ public class InstanceManager {
 	 * where a task manager is still considered alive.
 	 */
 	public InstanceManager() {
-		this(1000 * GlobalConfiguration.getLong(
+		this(GlobalConfiguration.getLong(
 				ConfigConstants.JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT_KEY,
 				ConfigConstants.DEFAULT_JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT));
 	}
@@ -98,6 +98,10 @@ public class InstanceManager {
 
 		new Timer(true).schedule(cleanupStaleMachines, cleanupInterval, cleanupInterval);
 	}
+	
+	public long getHeartbeatTimeout() {
+		return heartbeatTimeout;
+	}
 
 	public void shutdown() {
 		synchronized (this.lock) {
@@ -126,7 +130,7 @@ public class InstanceManager {
 		
 		synchronized (this.lock) {
 			if (this.shutdown) {
-				throw new IllegalStateException("InstanceManager is shut down.");
+				return false;
 			}
 			
 			Instance host = registeredHostsById.get(instanceId);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f0fd8823/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
index 5a32244..4bce4a4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
@@ -142,6 +142,8 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
 	
 	private final int defaultExecutionRetries;
 	
+	private final long delayBetweenRetries;
+	
 	private final AtomicBoolean isShutdownInProgress = new AtomicBoolean(false);
 	
 	private volatile boolean isShutDown;
@@ -179,6 +181,11 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
 		this.defaultExecutionRetries = GlobalConfiguration.getInteger(
 			ConfigConstants.DEFAULT_EXECUTION_RETRIES_KEY, ConfigConstants.DEFAULT_EXECUTION_RETRIES);
 
+		// delay between retries should be one heartbeat timeout
+		this.delayBetweenRetries = 2 * GlobalConfiguration.getLong(
+				ConfigConstants.JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT_KEY,
+				ConfigConstants.DEFAULT_JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT);
+		
 		// Load the job progress collector
 		this.eventCollector = new EventCollector(this.recommendedClientPollingInterval);
 
@@ -334,6 +341,7 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
 
 				executionGraph.setNumberOfRetriesLeft(job.getNumberOfExecutionRetries() >= 0 ?
 						job.getNumberOfExecutionRetries() : this.defaultExecutionRetries);
+				executionGraph.setDelayBeforeRetrying(this.delayBetweenRetries);
 
 				ExecutionGraph previous = this.currentJobs.putIfAbsent(job.getJobID(), executionGraph);
 				if (previous != null) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f0fd8823/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
index 34d86b4..ecdf891 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
@@ -43,6 +43,10 @@ public class InstanceManagerTest {
 		try {
 			InstanceManager cm = new InstanceManager();
 			
+			// catches error that some parts assumed config values in seconds, others in
+			// milliseconds by verifying that the timeout is not larger than 2 minutes.
+			assertTrue(cm.getHeartbeatTimeout() < 2 * 60 * 1000);
+			
 			final int ipcPort = 10000;
 			final int dataPort = 20000;
 
@@ -182,13 +186,7 @@ public class InstanceManagerTest {
 				// expected
 			}
 			
-			try {
-				cm.reportHeartBeat(new InstanceID());
-				fail("Should raise exception in shutdown state");
-			}
-			catch (IllegalStateException e) {
-				// expected
-			}
+			assertFalse(cm.reportHeartBeat(new InstanceID()));
 		}
 		catch (Exception e) {
 			System.err.println(e.getMessage());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f0fd8823/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java
index 168c454..cc767c2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobManagerTestUtils.java
@@ -46,6 +46,8 @@ public class JobManagerTestUtils {
 		cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 10);
 		cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTaskManager);
 		cfg.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskManagers);
+		cfg.setInteger(ConfigConstants.TASK_MANAGER_HEARTBEAT_INTERVAL_KEY, 500);
+		cfg.setInteger(ConfigConstants.JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT_KEY, 2000);
 		
 		if (additionalParams != null) {
 			cfg.addAll(additionalParams);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f0fd8823/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/RecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/RecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/RecoveryITCase.java
index 0b8518f..9e259f6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/RecoveryITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/RecoveryITCase.java
@@ -200,8 +200,8 @@ public class RecoveryITCase {
 			
 			// make sure we have fast heartbeats and failure detection
 			Configuration cfg = new Configuration();
-			cfg.setInteger(ConfigConstants.JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT_KEY, 2000);
-			cfg.setInteger(ConfigConstants.TASK_MANAGER_HEARTBEAT_INTERVAL_KEY, 500);
+			cfg.setInteger(ConfigConstants.JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT_KEY, 3000);
+			cfg.setInteger(ConfigConstants.TASK_MANAGER_HEARTBEAT_INTERVAL_KEY, 1000);
 			
 			jm = startJobManager(2, NUM_TASKS, cfg);