You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/05/17 23:02:50 UTC

[1/2] flink git commit: [hotfix] Make TaskManagerRunner shutdown asynchronously

Repository: flink
Updated Branches:
  refs/heads/master 15cdc5cc7 -> 4922ced71


[hotfix] Make TaskManagerRunner shutdown asynchronously


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4922ced7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4922ced7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4922ced7

Branch: refs/heads/master
Commit: 4922ced71a307a26b9f5070b41f72fd5d93b0ac8
Parents: c832f52
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu May 17 19:48:08 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu May 17 19:49:44 2018 +0200

----------------------------------------------------------------------
 .../runtime/taskexecutor/TaskManagerRunner.java | 90 ++++++++++++++------
 .../taskexecutor/TaskManagerRunnerTest.java     | 80 +++++++++++++++++
 2 files changed, 142 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4922ced7/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 08335b2..2a775bf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -27,6 +27,7 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobCacheService;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
@@ -47,6 +48,7 @@ import org.apache.flink.runtime.util.Hardware;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.util.AutoCloseableAsync;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.ExecutorUtils;
 
@@ -56,6 +58,8 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
@@ -69,10 +73,12 @@ import static org.apache.flink.util.Preconditions.checkState;
  * It constructs the related components (network, I/O manager, memory manager, RPC service, HA service)
  * and starts them.
  */
-public class TaskManagerRunner implements FatalErrorHandler {
+public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync {
 
 	private static final Logger LOG = LoggerFactory.getLogger(TaskManagerRunner.class);
 
+	private static final long FATAL_ERROR_SHUTDOWN_TIMEOUT_MS = 10000L;
+
 	private static final int STARTUP_FAILURE_RETURN_CODE = 1;
 
 	private static final int RUNTIME_FAILURE_RETURN_CODE = 2;
@@ -98,6 +104,10 @@ public class TaskManagerRunner implements FatalErrorHandler {
 
 	private final TaskExecutor taskManager;
 
+	private final CompletableFuture<Void> terminationFuture;
+
+	private boolean shutdown;
+
 	public TaskManagerRunner(Configuration configuration, ResourceID resourceId) throws Exception {
 		this.configuration = checkNotNull(configuration);
 		this.resourceId = checkNotNull(resourceId);
@@ -137,6 +147,9 @@ public class TaskManagerRunner implements FatalErrorHandler {
 			blobCacheService,
 			false,
 			this);
+
+		this.terminationFuture = new CompletableFuture<>();
+		this.shutdown = false;
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -147,19 +160,37 @@ public class TaskManagerRunner implements FatalErrorHandler {
 		taskManager.start();
 	}
 
-	public void shutDown() throws Exception {
-		shutDownInternally();
-	}
-
-	protected void shutDownInternally() throws Exception {
-		Exception exception = null;
-
+	@Override
+	public CompletableFuture<Void> closeAsync() {
 		synchronized (lock) {
-			try {
+			if (!shutdown) {
+				shutdown = true;
+
 				taskManager.shutDown();
-			} catch (Exception e) {
-				exception = e;
+				final CompletableFuture<Void> taskManagerTerminationFuture = taskManager.getTerminationFuture();
+
+				final CompletableFuture<Void> serviceTerminationFuture = FutureUtils.composeAfterwards(
+					taskManagerTerminationFuture,
+					this::shutDownServices);
+
+				serviceTerminationFuture.whenComplete(
+					(Void ignored, Throwable throwable) -> {
+						if (throwable != null) {
+							terminationFuture.completeExceptionally(throwable);
+						} else {
+							terminationFuture.complete(null);
+						}
+					});
 			}
+		}
+
+		return terminationFuture;
+	}
+
+	private CompletableFuture<Void> shutDownServices() {
+		synchronized (lock) {
+			Collection<CompletableFuture<Void>> terminationFutures = new ArrayList<>(3);
+			Exception exception = null;
 
 			try {
 				blobCacheService.close();
@@ -174,32 +205,26 @@ public class TaskManagerRunner implements FatalErrorHandler {
 			}
 
 			try {
-				rpcService.stopService().get();
-			} catch (InterruptedException ie) {
-				exception = ExceptionUtils.firstOrSuppressed(ie, exception);
-
-				Thread.currentThread().interrupt();
-			} catch (Exception e) {
-				exception = ExceptionUtils.firstOrSuppressed(e, exception);
-			}
-
-			try {
 				highAvailabilityServices.close();
 			} catch (Exception e) {
 				exception = ExceptionUtils.firstOrSuppressed(e, exception);
 			}
 
-			ExecutorUtils.gracefulShutdown(timeout.toMilliseconds(), TimeUnit.MILLISECONDS, executor);
+			terminationFutures.add(rpcService.stopService());
+
+			terminationFutures.add(ExecutorUtils.nonBlockingShutdown(timeout.toMilliseconds(), TimeUnit.MILLISECONDS, executor));
 
 			if (exception != null) {
-				throw exception;
+				terminationFutures.add(FutureUtils.completedExceptionally(exception));
 			}
+
+			return FutureUtils.completeAll(terminationFutures);
 		}
 	}
 
 	// export the termination future for caller to know it is terminated
 	public CompletableFuture<Void> getTerminationFuture() {
-		return taskManager.getTerminationFuture();
+		return terminationFuture;
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -210,12 +235,21 @@ public class TaskManagerRunner implements FatalErrorHandler {
 	public void onFatalError(Throwable exception) {
 		LOG.error("Fatal error occurred while executing the TaskManager. Shutting it down...", exception);
 
-		try {
-			shutDown();
-		} catch (Throwable t) {
-			LOG.error("Could not properly shut down TaskManager.", t);
+		if (ExceptionUtils.isJvmFatalOrOutOfMemoryError(exception)) {
+			terminateJVM();
+		} else {
+			closeAsync();
+
+			FutureUtils.orTimeout(terminationFuture, FATAL_ERROR_SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+
+			terminationFuture.whenComplete(
+				(Void ignored, Throwable throwable) -> {
+					terminateJVM();
+				});
 		}
+	}
 
+	protected void terminateJVM() {
 		System.exit(RUNTIME_FAILURE_RETURN_CODE);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4922ced7/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java
new file mode 100644
index 0000000..f2f748d
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.net.ServerSocket;
+import java.util.concurrent.CompletableFuture;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the {@link TaskManagerRunner}.
+ */
+public class TaskManagerRunnerTest extends TestLogger {
+
+	@Test
+	public void testTaskManagerRunnerShutdown() throws Exception {
+		final Configuration configuration = new Configuration();
+		final ResourceID taskManagerResourceId = ResourceID.generate();
+
+		final ServerSocket localhost = new ServerSocket(0);
+
+		configuration.setString(JobManagerOptions.ADDRESS, localhost.getInetAddress().getHostName());
+		configuration.setInteger(JobManagerOptions.PORT, localhost.getLocalPort());
+		configuration.setString(TaskManagerOptions.REGISTRATION_TIMEOUT, "10 ms");
+		final CompletableFuture<Void> jvmTerminationFuture = new CompletableFuture<>();
+		final TestingTaskManagerRunner taskManagerRunner = new TestingTaskManagerRunner(configuration, taskManagerResourceId, jvmTerminationFuture);
+
+		taskManagerRunner.start();
+
+		try {
+			// wait until we trigger the jvm termination
+			jvmTerminationFuture.get();
+
+			assertThat(taskManagerRunner.getTerminationFuture().isDone(), is(true));
+		} finally {
+			localhost.close();
+			taskManagerRunner.close();
+		}
+	}
+
+	private static class TestingTaskManagerRunner extends TaskManagerRunner {
+
+		private final CompletableFuture<Void> jvmTerminationFuture;
+
+		public TestingTaskManagerRunner(Configuration configuration, ResourceID resourceId, CompletableFuture<Void> jvmTerminationFuture) throws Exception {
+			super(configuration, resourceId);
+			this.jvmTerminationFuture = jvmTerminationFuture;
+		}
+
+		@Override
+		protected void terminateJVM() {
+			jvmTerminationFuture.complete(null);
+		}
+	}
+}


[2/2] flink git commit: [FLINK-6160] Add reconnection attempts in case of heartbeat timeouts to JobMaster and TaskExecutor

Posted by tr...@apache.org.
[FLINK-6160] Add reconnection attempts in case of heartbeat timeouts to JobMaster and TaskExecutor

If a timeout with the RM occurs on on the JobMaster and TaskExecutor, then they will both try to reconnect
to the last known RM address.

Additionally, we now respect the TaskManagerOption#REGISTRATION_TIMEOUT on the TaskExecutor. This means that
if the TaskExecutor could not register at a RM within the given registration timeout, it will fail with a
fatal exception. This allows to fail the TaskExecutor process in case that it cannot establish a connection
and ultimately frees the occupied resources.

The commit also changes the default value for TaskManagerOption#REGISTRATION_TIMEOUT from "Inf" to "5 min".

This closes #6035.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c832f52a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c832f52a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c832f52a

Branch: refs/heads/master
Commit: c832f52a9ca489b72e3eddcb51c288d23d66b571
Parents: 15cdc5c
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu May 17 14:44:14 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu May 17 19:49:44 2018 +0200

----------------------------------------------------------------------
 .../generated/task_manager_configuration.html   |  2 +-
 .../flink/configuration/TaskManagerOptions.java |  2 +-
 .../flink/runtime/jobmaster/JobMaster.java      | 41 ++++++---
 .../runtime/taskexecutor/TaskExecutor.java      | 85 +++++++++++++----
 .../taskexecutor/TaskManagerConfiguration.java  |  7 +-
 .../RegistrationTimeoutException.java           | 41 +++++++++
 .../flink/runtime/jobmaster/JobMasterTest.java  | 20 ++--
 .../utils/TestingResourceManagerGateway.java    |  4 +
 .../runtime/taskexecutor/TaskExecutorTest.java  | 96 ++++++++++++++++++++
 9 files changed, 257 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c832f52a/docs/_includes/generated/task_manager_configuration.html
----------------------------------------------------------------------
diff --git a/docs/_includes/generated/task_manager_configuration.html b/docs/_includes/generated/task_manager_configuration.html
index fe999a8..fdb975f 100644
--- a/docs/_includes/generated/task_manager_configuration.html
+++ b/docs/_includes/generated/task_manager_configuration.html
@@ -154,7 +154,7 @@
         </tr>
         <tr>
             <td><h5>taskmanager.registration.timeout</h5></td>
-            <td style="word-wrap: break-word;">"Inf"</td>
+            <td style="word-wrap: break-word;">"5 min"</td>
             <td>Defines the timeout for the TaskManager registration. If the duration is exceeded without a successful registration, then the TaskManager terminates.</td>
         </tr>
         <tr>

http://git-wip-us.apache.org/repos/asf/flink/blob/c832f52a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
index b726e07..8017f7a 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
@@ -139,7 +139,7 @@ public class TaskManagerOptions {
 	 */
 	public static final ConfigOption<String> REGISTRATION_TIMEOUT =
 		key("taskmanager.registration.timeout")
-			.defaultValue("Inf")
+			.defaultValue("5 min")
 			.withDeprecatedKeys("taskmanager.maxRegistrationDuration")
 			.withDescription("Defines the timeout for the TaskManager registration. If the duration is" +
 				" exceeded without a successful registration, then the TaskManager terminates.");

http://git-wip-us.apache.org/repos/asf/flink/blob/c832f52a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index f1dbbb4..5f5a9a9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -1260,20 +1260,24 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 		}
 
 		if (resourceManagerAddress != null) {
-			log.info("Attempting to register at ResourceManager {}", resourceManagerAddress);
+			createResourceManagerConnection(resourceManagerAddress, resourceManagerId);
+		}
+	}
 
-			resourceManagerConnection = new ResourceManagerConnection(
-				log,
-				jobGraph.getJobID(),
-				resourceId,
-				getAddress(),
-				getFencingToken(),
-				resourceManagerAddress,
-				resourceManagerId,
-				scheduledExecutorService);
+	private void createResourceManagerConnection(String resourceManagerAddress, ResourceManagerId resourceManagerId) {
+		log.info("Attempting to register at ResourceManager {}", resourceManagerAddress);
 
-			resourceManagerConnection.start();
-		}
+		resourceManagerConnection = new ResourceManagerConnection(
+			log,
+			jobGraph.getJobID(),
+			resourceId,
+			getAddress(),
+			getFencingToken(),
+			resourceManagerAddress,
+			resourceManagerId,
+			scheduledExecutorService);
+
+		resourceManagerConnection.start();
 	}
 
 	private void establishResourceManagerConnection(final JobMasterRegistrationSuccess success) {
@@ -1605,9 +1609,16 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 			runAsync(() -> {
 				log.info("The heartbeat of ResourceManager with id {} timed out.", resourceId);
 
-				closeResourceManagerConnection(
-					new TimeoutException(
-						"The heartbeat of ResourceManager with id " + resourceId + " timed out."));
+				if (establishedResourceManagerConnection != null && establishedResourceManagerConnection.getResourceManagerResourceID().equals(resourceId)) {
+					final String resourceManagerAddress = establishedResourceManagerConnection.getResourceManagerGateway().getAddress();
+					final ResourceManagerId resourceManagerId = establishedResourceManagerConnection.getResourceManagerId();
+
+					closeResourceManagerConnection(
+						new TimeoutException(
+							"The heartbeat of ResourceManager with id " + resourceId + " timed out."));
+
+					createResourceManagerConnection(resourceManagerAddress, resourceManagerId);
+				}
 			});
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c832f52a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 6fdb2bd..e87c44d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -78,6 +78,7 @@ import org.apache.flink.runtime.state.TaskStateManager;
 import org.apache.flink.runtime.state.TaskStateManagerImpl;
 import org.apache.flink.runtime.taskexecutor.exceptions.CheckpointException;
 import org.apache.flink.runtime.taskexecutor.exceptions.PartitionException;
+import org.apache.flink.runtime.taskexecutor.exceptions.RegistrationTimeoutException;
 import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
 import org.apache.flink.runtime.taskexecutor.exceptions.SlotOccupiedException;
 import org.apache.flink.runtime.taskexecutor.exceptions.TaskException;
@@ -101,6 +102,9 @@ import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
@@ -187,6 +191,9 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 
 	private FileCache fileCache;
 
+	@Nullable
+	private UUID currentRegistrationTimeoutId;
+
 	public TaskExecutor(
 			RpcService rpcService,
 			TaskManagerConfiguration taskManagerConfiguration,
@@ -232,8 +239,10 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 			rpcService.getScheduledExecutor(),
 			log);
 
-		hardwareDescription = HardwareDescription.extractFromSystem(
+		this.hardwareDescription = HardwareDescription.extractFromSystem(
 			taskExecutorServices.getMemoryManager().getMemorySize());
+
+		this.currentRegistrationTimeoutId = null;
 	}
 
 	// ------------------------------------------------------------------------
@@ -258,6 +267,8 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 		jobLeaderService.start(getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl());
 
 		fileCache = new FileCache(taskManagerConfiguration.getTmpDirectories(), blobCacheService.getPermanentBlobService());
+
+		startRegistrationTimeout();
 	}
 
 	/**
@@ -269,7 +280,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 
 		Throwable throwable = null;
 
-		if (isConnectedToResourceManager()) {
+		if (resourceManagerConnection != null) {
 			resourceManagerConnection.close();
 		}
 
@@ -876,24 +887,28 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 
 		// establish a connection to the new leader
 		if (newLeaderAddress != null) {
-			log.info("Attempting to register at ResourceManager {} ({})", newLeaderAddress, newResourceManagerId);
-			resourceManagerConnection =
-				new TaskExecutorToResourceManagerConnection(
-					log,
-					getRpcService(),
-					getAddress(),
-					getResourceID(),
-					taskSlotTable.createSlotReport(getResourceID()),
-					taskManagerLocation.dataPort(),
-					hardwareDescription,
-					newLeaderAddress,
-					newResourceManagerId,
-					getMainThreadExecutor(),
-					new ResourceManagerRegistrationListener());
-			resourceManagerConnection.start();
+			createResourceManagerConnection(newLeaderAddress, newResourceManagerId);
 		}
 	}
 
+	private void createResourceManagerConnection(String newLeaderAddress, ResourceManagerId newResourceManagerId) {
+		log.info("Attempting to register at ResourceManager {} ({})", newLeaderAddress, newResourceManagerId);
+		resourceManagerConnection =
+			new TaskExecutorToResourceManagerConnection(
+				log,
+				getRpcService(),
+				getAddress(),
+				getResourceID(),
+				taskSlotTable.createSlotReport(getResourceID()),
+				taskManagerLocation.dataPort(),
+				hardwareDescription,
+				newLeaderAddress,
+				newResourceManagerId,
+				getMainThreadExecutor(),
+				new ResourceManagerRegistrationListener());
+		resourceManagerConnection.start();
+	}
+
 	private void establishResourceManagerConnection(
 			ResourceID resourceManagerResourceId,
 			ClusterInformation clusterInformation) {
@@ -917,6 +932,8 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 			clusterInformation.getBlobServerPort());
 
 		blobCacheService.setBlobServerAddress(blobServerAddress);
+
+		stopRegistrationTimeout();
 	}
 
 	private void closeResourceManagerConnection(Exception cause) {
@@ -947,6 +964,34 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 			resourceManagerConnection.close();
 			resourceManagerConnection = null;
 		}
+
+		startRegistrationTimeout();
+	}
+
+	private void startRegistrationTimeout() {
+		final Time maxRegistrationDuration = taskManagerConfiguration.getMaxRegistrationDuration();
+
+		if (maxRegistrationDuration != null) {
+			final UUID newRegistrationTimeoutId = UUID.randomUUID();
+			currentRegistrationTimeoutId = newRegistrationTimeoutId;
+			scheduleRunAsync(() -> registrationTimeout(newRegistrationTimeoutId), maxRegistrationDuration);
+		}
+	}
+
+	private void stopRegistrationTimeout() {
+		currentRegistrationTimeoutId = null;
+	}
+
+	private void registrationTimeout(@Nonnull UUID registrationTimeoutId) {
+		if (registrationTimeoutId.equals(currentRegistrationTimeoutId)) {
+			final Time maxRegistrationDuration = taskManagerConfiguration.getMaxRegistrationDuration();
+
+			onFatalError(
+				new RegistrationTimeoutException(
+					String.format("Could not register at the ResourceManager within the specified maximum " +
+						"registration duration %s. This indicates a problem with this instance. Terminating now.",
+						maxRegistrationDuration)));
+		}
 	}
 
 	// ------------------------------------------------------------------------
@@ -1556,9 +1601,15 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 				if (resourceManagerConnection != null && resourceManagerConnection.getResourceManagerId().equals(resourceId)) {
 					log.info("The heartbeat of ResourceManager with id {} timed out.", resourceId);
 
+					final String resourceManagerAddress = resourceManagerConnection.getTargetAddress();
+					final ResourceManagerId resourceManagerId = resourceManagerConnection.getTargetLeaderId();
+
 					closeResourceManagerConnection(
 						new TimeoutException(
 							"The heartbeat of ResourceManager with id " + resourceId + " timed out."));
+
+					// try to reconnect to old ResourceManager
+					createResourceManagerConnection(resourceManagerAddress, resourceManagerId);
 				} else {
 					log.debug("Received heartbeat timeout for outdated ResourceManager id {}. Ignoring the timeout.", resourceId);
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/c832f52a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
index e8a7ae8..91beabf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
@@ -50,8 +50,11 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {
 	private final String[] tmpDirectories;
 
 	private final Time timeout;
+
 	// null indicates an infinite duration
+	@Nullable
 	private final Time maxRegistrationDuration;
+
 	private final Time initialRegistrationPause;
 	private final Time maxRegistrationPause;
 	private final Time refusedRegistrationPause;
@@ -74,7 +77,7 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {
 		int numberSlots,
 		String[] tmpDirectories,
 		Time timeout,
-		Time maxRegistrationDuration,
+		@Nullable Time maxRegistrationDuration,
 		Time initialRegistrationPause,
 		Time maxRegistrationPause,
 		Time refusedRegistrationPause,
@@ -108,6 +111,7 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {
 		return timeout;
 	}
 
+	@Nullable
 	public Time getMaxRegistrationDuration() {
 		return maxRegistrationDuration;
 	}
@@ -116,6 +120,7 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {
 		return initialRegistrationPause;
 	}
 
+	@Nullable
 	public Time getMaxRegistrationPause() {
 		return maxRegistrationPause;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/c832f52a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/RegistrationTimeoutException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/RegistrationTimeoutException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/RegistrationTimeoutException.java
new file mode 100644
index 0000000..561089a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/RegistrationTimeoutException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.exceptions;
+
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+
+/**
+ * Exception which indicates that the {@link TaskExecutor} could not register at
+ * the master in time.
+ */
+public class RegistrationTimeoutException extends TaskManagerException {
+	private static final long serialVersionUID = -6377818046575001931L;
+
+	public RegistrationTimeoutException(String message) {
+		super(message);
+	}
+
+	public RegistrationTimeoutException(String message, Throwable cause) {
+		super(message, cause);
+	}
+
+	public RegistrationTimeoutException(Throwable cause) {
+		super(cause);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c832f52a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index f2de134..09640d5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -90,6 +90,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -243,17 +244,21 @@ public class JobMasterTest extends TestLogger {
 			resourceManagerId,
 			rmResourceId,
 			fastHeartbeatInterval,
-			"localhost",
+			resourceManagerAddress,
 			"localhost");
 
 		final CompletableFuture<Tuple3<JobMasterId, ResourceID, JobID>> jobManagerRegistrationFuture = new CompletableFuture<>();
 		final CompletableFuture<JobID> disconnectedJobManagerFuture = new CompletableFuture<>();
+		final CountDownLatch registrationAttempts = new CountDownLatch(2);
 
-		resourceManagerGateway.setRegisterJobManagerConsumer(tuple -> jobManagerRegistrationFuture.complete(
-			Tuple3.of(
-				tuple.f0,
-				tuple.f1,
-				tuple.f3)));
+		resourceManagerGateway.setRegisterJobManagerConsumer(tuple -> {
+			jobManagerRegistrationFuture.complete(
+				Tuple3.of(
+					tuple.f0,
+					tuple.f1,
+					tuple.f3));
+			registrationAttempts.countDown();
+		});
 
 		resourceManagerGateway.setDisconnectJobManagerConsumer(tuple -> disconnectedJobManagerFuture.complete(tuple.f0));
 
@@ -286,6 +291,9 @@ public class JobMasterTest extends TestLogger {
 
 			// heartbeat timeout should trigger disconnect JobManager from ResourceManager
 			assertThat(disconnectedJobManager, Matchers.equalTo(jobGraph.getJobID()));
+
+			// the JobMaster should try to reconnect to the RM
+			registrationAttempts.await();
 		} finally {
 			jobManagerSharedServices.shutdown();
 			RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout);

http://git-wip-us.apache.org/repos/asf/flink/blob/c832f52a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
index 9b40414..106fd73 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
@@ -109,6 +109,10 @@ public class TestingResourceManagerGateway implements ResourceManagerGateway {
 		this.requestSlotConsumer = null;
 	}
 
+	public ResourceID getOwnResourceId() {
+		return ownResourceId;
+	}
+
 	public void setRequestSlotFuture(CompletableFuture<Acknowledge> slotFuture) {
 		this.slotFutureReference.set(slotFuture);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/c832f52a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 6b3c8f6..ca31f76 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -22,6 +22,8 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.blob.BlobCacheService;
 import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
@@ -76,6 +78,7 @@ import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
+import org.apache.flink.runtime.taskexecutor.exceptions.RegistrationTimeoutException;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
 import org.apache.flink.runtime.taskexecutor.slot.TimerService;
@@ -87,6 +90,7 @@ import org.apache.flink.runtime.taskmanager.TaskManagerActions;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
@@ -113,12 +117,15 @@ import java.util.UUID;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -302,9 +309,11 @@ public class TaskExecutorTest extends TestLogger {
 			new ClusterInformation("localhost", 1234));
 
 		final CompletableFuture<ResourceID> taskExecutorRegistrationFuture = new CompletableFuture<>();
+		final CountDownLatch registrationAttempts = new CountDownLatch(2);
 		rmGateway.setRegisterTaskExecutorFunction(
 			registration -> {
 				taskExecutorRegistrationFuture.complete(registration.f1);
+				registrationAttempts.countDown();
 				return CompletableFuture.completedFuture(registrationResponse);
 			});
 
@@ -353,6 +362,9 @@ public class TaskExecutorTest extends TestLogger {
 			// heartbeat timeout should trigger disconnect TaskManager from ResourceManager
 			assertThat(taskExecutorDisconnectFuture.get(heartbeatTimeout * 50L, TimeUnit.MILLISECONDS), equalTo(taskManagerLocation.getResourceID()));
 
+			// the TaskExecutor should try to reconnect to the RM
+			registrationAttempts.await();
+
 		} finally {
 			RpcUtils.terminateRpcEndpoint(taskManager, timeout);
 		}
@@ -1387,6 +1399,90 @@ public class TaskExecutorTest extends TestLogger {
 		}
 	}
 
+	@Test
+	public void testMaximumRegistrationDuration() throws Exception {
+		configuration.setString(TaskManagerOptions.REGISTRATION_TIMEOUT, "10 ms");
+
+		final TaskExecutor taskExecutor = new TaskExecutor(
+			rpc,
+			TaskManagerConfiguration.fromConfiguration(configuration),
+			haServices,
+			new TaskManagerServicesBuilder().build(),
+			new HeartbeatServices(1000L, 1000L),
+			UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
+			dummyBlobCacheService,
+			testingFatalErrorHandler);
+
+		taskExecutor.start();
+
+		try {
+			final Throwable error = testingFatalErrorHandler.getErrorFuture().get();
+			assertThat(error, is(notNullValue()));
+			assertThat(ExceptionUtils.stripExecutionException(error), instanceOf(RegistrationTimeoutException.class));
+
+			testingFatalErrorHandler.clearError();
+		} finally {
+			RpcUtils.terminateRpcEndpoint(taskExecutor, timeout);
+		}
+	}
+
+	@Test
+	public void testMaximumRegistrationDurationAfterConnectionLoss() throws Exception {
+		configuration.setString(TaskManagerOptions.REGISTRATION_TIMEOUT, "100 ms");
+		final TaskSlotTable taskSlotTable = new TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), timerService);
+
+		final long heartbeatInterval = 10L;
+		final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder().setTaskSlotTable(taskSlotTable).build();
+		final TaskExecutor taskExecutor = new TaskExecutor(
+			rpc,
+			TaskManagerConfiguration.fromConfiguration(configuration),
+			haServices,
+			taskManagerServices,
+			new HeartbeatServices(heartbeatInterval, 10L),
+			UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
+			dummyBlobCacheService,
+			testingFatalErrorHandler);
+
+		taskExecutor.start();
+
+		final CompletableFuture<ResourceID> registrationFuture = new CompletableFuture<>();
+		final OneShotLatch secondRegistration = new OneShotLatch();
+		try {
+			final TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
+			testingResourceManagerGateway.setRegisterTaskExecutorFunction(
+				tuple -> {
+					if (registrationFuture.complete(tuple.f1)) {
+						return CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(
+							new InstanceID(),
+							testingResourceManagerGateway.getOwnResourceId(),
+							heartbeatInterval,
+							new ClusterInformation("localhost", 1234)));
+					} else {
+						secondRegistration.trigger();
+						return CompletableFuture.completedFuture(new RegistrationResponse.Decline("Only the first registration should succeed."));
+					}
+				}
+			);
+			rpc.registerGateway(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway);
+
+			resourceManagerLeaderRetriever.notifyListener(testingResourceManagerGateway.getAddress(), UUID.randomUUID());
+
+			final ResourceID registrationResourceId = registrationFuture.get();
+
+			assertThat(registrationResourceId, equalTo(taskManagerServices.getTaskManagerLocation().getResourceID()));
+
+			secondRegistration.await();
+
+			final Throwable error = testingFatalErrorHandler.getErrorFuture().get();
+			assertThat(error, is(notNullValue()));
+			assertThat(ExceptionUtils.stripExecutionException(error), instanceOf(RegistrationTimeoutException.class));
+
+			testingFatalErrorHandler.clearError();
+		} finally {
+			RpcUtils.terminateRpcEndpoint(taskExecutor, timeout);
+		}
+	}
+
 	private static final class StartStopNotifyingLeaderRetrievalService implements LeaderRetrievalService {
 		private final CompletableFuture<LeaderRetrievalListener> startFuture;