You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/05/17 23:02:50 UTC
[1/2] flink git commit: [hotfix] Make TaskManagerRunner shutdown
asynchronously
Repository: flink
Updated Branches:
refs/heads/master 15cdc5cc7 -> 4922ced71
[hotfix] Make TaskManagerRunner shutdown asynchronously
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4922ced7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4922ced7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4922ced7
Branch: refs/heads/master
Commit: 4922ced71a307a26b9f5070b41f72fd5d93b0ac8
Parents: c832f52
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu May 17 19:48:08 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu May 17 19:49:44 2018 +0200
----------------------------------------------------------------------
.../runtime/taskexecutor/TaskManagerRunner.java | 90 ++++++++++++++------
.../taskexecutor/TaskManagerRunnerTest.java | 80 +++++++++++++++++
2 files changed, 142 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/4922ced7/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 08335b2..2a775bf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -27,6 +27,7 @@ import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobCacheService;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
@@ -47,6 +48,7 @@ import org.apache.flink.runtime.util.Hardware;
import org.apache.flink.runtime.util.JvmShutdownSafeguard;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.util.AutoCloseableAsync;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.ExecutorUtils;
@@ -56,6 +58,8 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
@@ -69,10 +73,12 @@ import static org.apache.flink.util.Preconditions.checkState;
* It constructs the related components (network, I/O manager, memory manager, RPC service, HA service)
* and starts them.
*/
-public class TaskManagerRunner implements FatalErrorHandler {
+public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync {
private static final Logger LOG = LoggerFactory.getLogger(TaskManagerRunner.class);
+ private static final long FATAL_ERROR_SHUTDOWN_TIMEOUT_MS = 10000L;
+
private static final int STARTUP_FAILURE_RETURN_CODE = 1;
private static final int RUNTIME_FAILURE_RETURN_CODE = 2;
@@ -98,6 +104,10 @@ public class TaskManagerRunner implements FatalErrorHandler {
private final TaskExecutor taskManager;
+ private final CompletableFuture<Void> terminationFuture;
+
+ private boolean shutdown;
+
public TaskManagerRunner(Configuration configuration, ResourceID resourceId) throws Exception {
this.configuration = checkNotNull(configuration);
this.resourceId = checkNotNull(resourceId);
@@ -137,6 +147,9 @@ public class TaskManagerRunner implements FatalErrorHandler {
blobCacheService,
false,
this);
+
+ this.terminationFuture = new CompletableFuture<>();
+ this.shutdown = false;
}
// --------------------------------------------------------------------------------------------
@@ -147,19 +160,37 @@ public class TaskManagerRunner implements FatalErrorHandler {
taskManager.start();
}
- public void shutDown() throws Exception {
- shutDownInternally();
- }
-
- protected void shutDownInternally() throws Exception {
- Exception exception = null;
-
+ @Override
+ public CompletableFuture<Void> closeAsync() {
synchronized (lock) {
- try {
+ if (!shutdown) {
+ shutdown = true;
+
taskManager.shutDown();
- } catch (Exception e) {
- exception = e;
+ final CompletableFuture<Void> taskManagerTerminationFuture = taskManager.getTerminationFuture();
+
+ final CompletableFuture<Void> serviceTerminationFuture = FutureUtils.composeAfterwards(
+ taskManagerTerminationFuture,
+ this::shutDownServices);
+
+ serviceTerminationFuture.whenComplete(
+ (Void ignored, Throwable throwable) -> {
+ if (throwable != null) {
+ terminationFuture.completeExceptionally(throwable);
+ } else {
+ terminationFuture.complete(null);
+ }
+ });
}
+ }
+
+ return terminationFuture;
+ }
+
+ private CompletableFuture<Void> shutDownServices() {
+ synchronized (lock) {
+ Collection<CompletableFuture<Void>> terminationFutures = new ArrayList<>(3);
+ Exception exception = null;
try {
blobCacheService.close();
@@ -174,32 +205,26 @@ public class TaskManagerRunner implements FatalErrorHandler {
}
try {
- rpcService.stopService().get();
- } catch (InterruptedException ie) {
- exception = ExceptionUtils.firstOrSuppressed(ie, exception);
-
- Thread.currentThread().interrupt();
- } catch (Exception e) {
- exception = ExceptionUtils.firstOrSuppressed(e, exception);
- }
-
- try {
highAvailabilityServices.close();
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}
- ExecutorUtils.gracefulShutdown(timeout.toMilliseconds(), TimeUnit.MILLISECONDS, executor);
+ terminationFutures.add(rpcService.stopService());
+
+ terminationFutures.add(ExecutorUtils.nonBlockingShutdown(timeout.toMilliseconds(), TimeUnit.MILLISECONDS, executor));
if (exception != null) {
- throw exception;
+ terminationFutures.add(FutureUtils.completedExceptionally(exception));
}
+
+ return FutureUtils.completeAll(terminationFutures);
}
}
// export the termination future for caller to know it is terminated
public CompletableFuture<Void> getTerminationFuture() {
- return taskManager.getTerminationFuture();
+ return terminationFuture;
}
// --------------------------------------------------------------------------------------------
@@ -210,12 +235,21 @@ public class TaskManagerRunner implements FatalErrorHandler {
public void onFatalError(Throwable exception) {
LOG.error("Fatal error occurred while executing the TaskManager. Shutting it down...", exception);
- try {
- shutDown();
- } catch (Throwable t) {
- LOG.error("Could not properly shut down TaskManager.", t);
+ if (ExceptionUtils.isJvmFatalOrOutOfMemoryError(exception)) {
+ terminateJVM();
+ } else {
+ closeAsync();
+
+ FutureUtils.orTimeout(terminationFuture, FATAL_ERROR_SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+
+ terminationFuture.whenComplete(
+ (Void ignored, Throwable throwable) -> {
+ terminateJVM();
+ });
}
+ }
+ protected void terminateJVM() {
System.exit(RUNTIME_FAILURE_RETURN_CODE);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/4922ced7/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java
new file mode 100644
index 0000000..f2f748d
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.net.ServerSocket;
+import java.util.concurrent.CompletableFuture;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the {@link TaskManagerRunner}.
+ */
+public class TaskManagerRunnerTest extends TestLogger {
+
+ @Test
+ public void testTaskManagerRunnerShutdown() throws Exception {
+ final Configuration configuration = new Configuration();
+ final ResourceID taskManagerResourceId = ResourceID.generate();
+
+ final ServerSocket localhost = new ServerSocket(0);
+
+ configuration.setString(JobManagerOptions.ADDRESS, localhost.getInetAddress().getHostName());
+ configuration.setInteger(JobManagerOptions.PORT, localhost.getLocalPort());
+ configuration.setString(TaskManagerOptions.REGISTRATION_TIMEOUT, "10 ms");
+ final CompletableFuture<Void> jvmTerminationFuture = new CompletableFuture<>();
+ final TestingTaskManagerRunner taskManagerRunner = new TestingTaskManagerRunner(configuration, taskManagerResourceId, jvmTerminationFuture);
+
+ taskManagerRunner.start();
+
+ try {
+ // wait until we trigger the jvm termination
+ jvmTerminationFuture.get();
+
+ assertThat(taskManagerRunner.getTerminationFuture().isDone(), is(true));
+ } finally {
+ localhost.close();
+ taskManagerRunner.close();
+ }
+ }
+
+ private static class TestingTaskManagerRunner extends TaskManagerRunner {
+
+ private final CompletableFuture<Void> jvmTerminationFuture;
+
+ public TestingTaskManagerRunner(Configuration configuration, ResourceID resourceId, CompletableFuture<Void> jvmTerminationFuture) throws Exception {
+ super(configuration, resourceId);
+ this.jvmTerminationFuture = jvmTerminationFuture;
+ }
+
+ @Override
+ protected void terminateJVM() {
+ jvmTerminationFuture.complete(null);
+ }
+ }
+}
[2/2] flink git commit: [FLINK-6160] Add reconnection attempts in
case of heartbeat timeouts to JobMaster and TaskExecutor
Posted by tr...@apache.org.
[FLINK-6160] Add reconnection attempts in case of heartbeat timeouts to JobMaster and TaskExecutor
If a timeout with the RM occurs on on the JobMaster and TaskExecutor, then they will both try to reconnect
to the last known RM address.
Additionally, we now respect the TaskManagerOption#REGISTRATION_TIMEOUT on the TaskExecutor. This means that
if the TaskExecutor could not register at a RM within the given registration timeout, it will fail with a
fatal exception. This allows to fail the TaskExecutor process in case that it cannot establish a connection
and ultimately frees the occupied resources.
The commit also changes the default value for TaskManagerOption#REGISTRATION_TIMEOUT from "Inf" to "5 min".
This closes #6035.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c832f52a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c832f52a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c832f52a
Branch: refs/heads/master
Commit: c832f52a9ca489b72e3eddcb51c288d23d66b571
Parents: 15cdc5c
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu May 17 14:44:14 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu May 17 19:49:44 2018 +0200
----------------------------------------------------------------------
.../generated/task_manager_configuration.html | 2 +-
.../flink/configuration/TaskManagerOptions.java | 2 +-
.../flink/runtime/jobmaster/JobMaster.java | 41 ++++++---
.../runtime/taskexecutor/TaskExecutor.java | 85 +++++++++++++----
.../taskexecutor/TaskManagerConfiguration.java | 7 +-
.../RegistrationTimeoutException.java | 41 +++++++++
.../flink/runtime/jobmaster/JobMasterTest.java | 20 ++--
.../utils/TestingResourceManagerGateway.java | 4 +
.../runtime/taskexecutor/TaskExecutorTest.java | 96 ++++++++++++++++++++
9 files changed, 257 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c832f52a/docs/_includes/generated/task_manager_configuration.html
----------------------------------------------------------------------
diff --git a/docs/_includes/generated/task_manager_configuration.html b/docs/_includes/generated/task_manager_configuration.html
index fe999a8..fdb975f 100644
--- a/docs/_includes/generated/task_manager_configuration.html
+++ b/docs/_includes/generated/task_manager_configuration.html
@@ -154,7 +154,7 @@
</tr>
<tr>
<td><h5>taskmanager.registration.timeout</h5></td>
- <td style="word-wrap: break-word;">"Inf"</td>
+ <td style="word-wrap: break-word;">"5 min"</td>
<td>Defines the timeout for the TaskManager registration. If the duration is exceeded without a successful registration, then the TaskManager terminates.</td>
</tr>
<tr>
http://git-wip-us.apache.org/repos/asf/flink/blob/c832f52a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
index b726e07..8017f7a 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
@@ -139,7 +139,7 @@ public class TaskManagerOptions {
*/
public static final ConfigOption<String> REGISTRATION_TIMEOUT =
key("taskmanager.registration.timeout")
- .defaultValue("Inf")
+ .defaultValue("5 min")
.withDeprecatedKeys("taskmanager.maxRegistrationDuration")
.withDescription("Defines the timeout for the TaskManager registration. If the duration is" +
" exceeded without a successful registration, then the TaskManager terminates.");
http://git-wip-us.apache.org/repos/asf/flink/blob/c832f52a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index f1dbbb4..5f5a9a9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -1260,20 +1260,24 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
}
if (resourceManagerAddress != null) {
- log.info("Attempting to register at ResourceManager {}", resourceManagerAddress);
+ createResourceManagerConnection(resourceManagerAddress, resourceManagerId);
+ }
+ }
- resourceManagerConnection = new ResourceManagerConnection(
- log,
- jobGraph.getJobID(),
- resourceId,
- getAddress(),
- getFencingToken(),
- resourceManagerAddress,
- resourceManagerId,
- scheduledExecutorService);
+ private void createResourceManagerConnection(String resourceManagerAddress, ResourceManagerId resourceManagerId) {
+ log.info("Attempting to register at ResourceManager {}", resourceManagerAddress);
- resourceManagerConnection.start();
- }
+ resourceManagerConnection = new ResourceManagerConnection(
+ log,
+ jobGraph.getJobID(),
+ resourceId,
+ getAddress(),
+ getFencingToken(),
+ resourceManagerAddress,
+ resourceManagerId,
+ scheduledExecutorService);
+
+ resourceManagerConnection.start();
}
private void establishResourceManagerConnection(final JobMasterRegistrationSuccess success) {
@@ -1605,9 +1609,16 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
runAsync(() -> {
log.info("The heartbeat of ResourceManager with id {} timed out.", resourceId);
- closeResourceManagerConnection(
- new TimeoutException(
- "The heartbeat of ResourceManager with id " + resourceId + " timed out."));
+ if (establishedResourceManagerConnection != null && establishedResourceManagerConnection.getResourceManagerResourceID().equals(resourceId)) {
+ final String resourceManagerAddress = establishedResourceManagerConnection.getResourceManagerGateway().getAddress();
+ final ResourceManagerId resourceManagerId = establishedResourceManagerConnection.getResourceManagerId();
+
+ closeResourceManagerConnection(
+ new TimeoutException(
+ "The heartbeat of ResourceManager with id " + resourceId + " timed out."));
+
+ createResourceManagerConnection(resourceManagerAddress, resourceManagerId);
+ }
});
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c832f52a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 6fdb2bd..e87c44d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -78,6 +78,7 @@ import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.TaskStateManagerImpl;
import org.apache.flink.runtime.taskexecutor.exceptions.CheckpointException;
import org.apache.flink.runtime.taskexecutor.exceptions.PartitionException;
+import org.apache.flink.runtime.taskexecutor.exceptions.RegistrationTimeoutException;
import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
import org.apache.flink.runtime.taskexecutor.exceptions.SlotOccupiedException;
import org.apache.flink.runtime.taskexecutor.exceptions.TaskException;
@@ -101,6 +102,9 @@ import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
@@ -187,6 +191,9 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
private FileCache fileCache;
+ @Nullable
+ private UUID currentRegistrationTimeoutId;
+
public TaskExecutor(
RpcService rpcService,
TaskManagerConfiguration taskManagerConfiguration,
@@ -232,8 +239,10 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
rpcService.getScheduledExecutor(),
log);
- hardwareDescription = HardwareDescription.extractFromSystem(
+ this.hardwareDescription = HardwareDescription.extractFromSystem(
taskExecutorServices.getMemoryManager().getMemorySize());
+
+ this.currentRegistrationTimeoutId = null;
}
// ------------------------------------------------------------------------
@@ -258,6 +267,8 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
jobLeaderService.start(getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl());
fileCache = new FileCache(taskManagerConfiguration.getTmpDirectories(), blobCacheService.getPermanentBlobService());
+
+ startRegistrationTimeout();
}
/**
@@ -269,7 +280,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
Throwable throwable = null;
- if (isConnectedToResourceManager()) {
+ if (resourceManagerConnection != null) {
resourceManagerConnection.close();
}
@@ -876,24 +887,28 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
// establish a connection to the new leader
if (newLeaderAddress != null) {
- log.info("Attempting to register at ResourceManager {} ({})", newLeaderAddress, newResourceManagerId);
- resourceManagerConnection =
- new TaskExecutorToResourceManagerConnection(
- log,
- getRpcService(),
- getAddress(),
- getResourceID(),
- taskSlotTable.createSlotReport(getResourceID()),
- taskManagerLocation.dataPort(),
- hardwareDescription,
- newLeaderAddress,
- newResourceManagerId,
- getMainThreadExecutor(),
- new ResourceManagerRegistrationListener());
- resourceManagerConnection.start();
+ createResourceManagerConnection(newLeaderAddress, newResourceManagerId);
}
}
+ private void createResourceManagerConnection(String newLeaderAddress, ResourceManagerId newResourceManagerId) {
+ log.info("Attempting to register at ResourceManager {} ({})", newLeaderAddress, newResourceManagerId);
+ resourceManagerConnection =
+ new TaskExecutorToResourceManagerConnection(
+ log,
+ getRpcService(),
+ getAddress(),
+ getResourceID(),
+ taskSlotTable.createSlotReport(getResourceID()),
+ taskManagerLocation.dataPort(),
+ hardwareDescription,
+ newLeaderAddress,
+ newResourceManagerId,
+ getMainThreadExecutor(),
+ new ResourceManagerRegistrationListener());
+ resourceManagerConnection.start();
+ }
+
private void establishResourceManagerConnection(
ResourceID resourceManagerResourceId,
ClusterInformation clusterInformation) {
@@ -917,6 +932,8 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
clusterInformation.getBlobServerPort());
blobCacheService.setBlobServerAddress(blobServerAddress);
+
+ stopRegistrationTimeout();
}
private void closeResourceManagerConnection(Exception cause) {
@@ -947,6 +964,34 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
resourceManagerConnection.close();
resourceManagerConnection = null;
}
+
+ startRegistrationTimeout();
+ }
+
+ private void startRegistrationTimeout() {
+ final Time maxRegistrationDuration = taskManagerConfiguration.getMaxRegistrationDuration();
+
+ if (maxRegistrationDuration != null) {
+ final UUID newRegistrationTimeoutId = UUID.randomUUID();
+ currentRegistrationTimeoutId = newRegistrationTimeoutId;
+ scheduleRunAsync(() -> registrationTimeout(newRegistrationTimeoutId), maxRegistrationDuration);
+ }
+ }
+
+ private void stopRegistrationTimeout() {
+ currentRegistrationTimeoutId = null;
+ }
+
+ private void registrationTimeout(@Nonnull UUID registrationTimeoutId) {
+ if (registrationTimeoutId.equals(currentRegistrationTimeoutId)) {
+ final Time maxRegistrationDuration = taskManagerConfiguration.getMaxRegistrationDuration();
+
+ onFatalError(
+ new RegistrationTimeoutException(
+ String.format("Could not register at the ResourceManager within the specified maximum " +
+ "registration duration %s. This indicates a problem with this instance. Terminating now.",
+ maxRegistrationDuration)));
+ }
}
// ------------------------------------------------------------------------
@@ -1556,9 +1601,15 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
if (resourceManagerConnection != null && resourceManagerConnection.getResourceManagerId().equals(resourceId)) {
log.info("The heartbeat of ResourceManager with id {} timed out.", resourceId);
+ final String resourceManagerAddress = resourceManagerConnection.getTargetAddress();
+ final ResourceManagerId resourceManagerId = resourceManagerConnection.getTargetLeaderId();
+
closeResourceManagerConnection(
new TimeoutException(
"The heartbeat of ResourceManager with id " + resourceId + " timed out."));
+
+ // try to reconnect to old ResourceManager
+ createResourceManagerConnection(resourceManagerAddress, resourceManagerId);
} else {
log.debug("Received heartbeat timeout for outdated ResourceManager id {}. Ignoring the timeout.", resourceId);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c832f52a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
index e8a7ae8..91beabf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
@@ -50,8 +50,11 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {
private final String[] tmpDirectories;
private final Time timeout;
+
// null indicates an infinite duration
+ @Nullable
private final Time maxRegistrationDuration;
+
private final Time initialRegistrationPause;
private final Time maxRegistrationPause;
private final Time refusedRegistrationPause;
@@ -74,7 +77,7 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {
int numberSlots,
String[] tmpDirectories,
Time timeout,
- Time maxRegistrationDuration,
+ @Nullable Time maxRegistrationDuration,
Time initialRegistrationPause,
Time maxRegistrationPause,
Time refusedRegistrationPause,
@@ -108,6 +111,7 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {
return timeout;
}
+ @Nullable
public Time getMaxRegistrationDuration() {
return maxRegistrationDuration;
}
@@ -116,6 +120,7 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo {
return initialRegistrationPause;
}
+ @Nullable
public Time getMaxRegistrationPause() {
return maxRegistrationPause;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c832f52a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/RegistrationTimeoutException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/RegistrationTimeoutException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/RegistrationTimeoutException.java
new file mode 100644
index 0000000..561089a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/RegistrationTimeoutException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.exceptions;
+
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+
+/**
+ * Exception which indicates that the {@link TaskExecutor} could not register at
+ * the master in time.
+ */
+public class RegistrationTimeoutException extends TaskManagerException {
+ private static final long serialVersionUID = -6377818046575001931L;
+
+ public RegistrationTimeoutException(String message) {
+ super(message);
+ }
+
+ public RegistrationTimeoutException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public RegistrationTimeoutException(Throwable cause) {
+ super(cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c832f52a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index f2de134..09640d5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -90,6 +90,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -243,17 +244,21 @@ public class JobMasterTest extends TestLogger {
resourceManagerId,
rmResourceId,
fastHeartbeatInterval,
- "localhost",
+ resourceManagerAddress,
"localhost");
final CompletableFuture<Tuple3<JobMasterId, ResourceID, JobID>> jobManagerRegistrationFuture = new CompletableFuture<>();
final CompletableFuture<JobID> disconnectedJobManagerFuture = new CompletableFuture<>();
+ final CountDownLatch registrationAttempts = new CountDownLatch(2);
- resourceManagerGateway.setRegisterJobManagerConsumer(tuple -> jobManagerRegistrationFuture.complete(
- Tuple3.of(
- tuple.f0,
- tuple.f1,
- tuple.f3)));
+ resourceManagerGateway.setRegisterJobManagerConsumer(tuple -> {
+ jobManagerRegistrationFuture.complete(
+ Tuple3.of(
+ tuple.f0,
+ tuple.f1,
+ tuple.f3));
+ registrationAttempts.countDown();
+ });
resourceManagerGateway.setDisconnectJobManagerConsumer(tuple -> disconnectedJobManagerFuture.complete(tuple.f0));
@@ -286,6 +291,9 @@ public class JobMasterTest extends TestLogger {
// heartbeat timeout should trigger disconnect JobManager from ResourceManager
assertThat(disconnectedJobManager, Matchers.equalTo(jobGraph.getJobID()));
+
+ // the JobMaster should try to reconnect to the RM
+ registrationAttempts.await();
} finally {
jobManagerSharedServices.shutdown();
RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout);
http://git-wip-us.apache.org/repos/asf/flink/blob/c832f52a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
index 9b40414..106fd73 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
@@ -109,6 +109,10 @@ public class TestingResourceManagerGateway implements ResourceManagerGateway {
this.requestSlotConsumer = null;
}
+ public ResourceID getOwnResourceId() {
+ return ownResourceId;
+ }
+
public void setRequestSlotFuture(CompletableFuture<Acknowledge> slotFuture) {
this.slotFutureReference.set(slotFuture);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c832f52a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 6b3c8f6..ca31f76 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -22,6 +22,8 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blob.BlobCacheService;
import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
@@ -76,6 +78,7 @@ import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
+import org.apache.flink.runtime.taskexecutor.exceptions.RegistrationTimeoutException;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
import org.apache.flink.runtime.taskexecutor.slot.TimerService;
@@ -87,6 +90,7 @@ import org.apache.flink.runtime.taskmanager.TaskManagerActions;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
@@ -113,12 +117,15 @@ import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@@ -302,9 +309,11 @@ public class TaskExecutorTest extends TestLogger {
new ClusterInformation("localhost", 1234));
final CompletableFuture<ResourceID> taskExecutorRegistrationFuture = new CompletableFuture<>();
+ final CountDownLatch registrationAttempts = new CountDownLatch(2);
rmGateway.setRegisterTaskExecutorFunction(
registration -> {
taskExecutorRegistrationFuture.complete(registration.f1);
+ registrationAttempts.countDown();
return CompletableFuture.completedFuture(registrationResponse);
});
@@ -353,6 +362,9 @@ public class TaskExecutorTest extends TestLogger {
// heartbeat timeout should trigger disconnect TaskManager from ResourceManager
assertThat(taskExecutorDisconnectFuture.get(heartbeatTimeout * 50L, TimeUnit.MILLISECONDS), equalTo(taskManagerLocation.getResourceID()));
+ // the TaskExecutor should try to reconnect to the RM
+ registrationAttempts.await();
+
} finally {
RpcUtils.terminateRpcEndpoint(taskManager, timeout);
}
@@ -1387,6 +1399,90 @@ public class TaskExecutorTest extends TestLogger {
}
}
+ @Test
+ public void testMaximumRegistrationDuration() throws Exception {
+ configuration.setString(TaskManagerOptions.REGISTRATION_TIMEOUT, "10 ms");
+
+ final TaskExecutor taskExecutor = new TaskExecutor(
+ rpc,
+ TaskManagerConfiguration.fromConfiguration(configuration),
+ haServices,
+ new TaskManagerServicesBuilder().build(),
+ new HeartbeatServices(1000L, 1000L),
+ UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
+ dummyBlobCacheService,
+ testingFatalErrorHandler);
+
+ taskExecutor.start();
+
+ try {
+ final Throwable error = testingFatalErrorHandler.getErrorFuture().get();
+ assertThat(error, is(notNullValue()));
+ assertThat(ExceptionUtils.stripExecutionException(error), instanceOf(RegistrationTimeoutException.class));
+
+ testingFatalErrorHandler.clearError();
+ } finally {
+ RpcUtils.terminateRpcEndpoint(taskExecutor, timeout);
+ }
+ }
+
+ @Test
+ public void testMaximumRegistrationDurationAfterConnectionLoss() throws Exception {
+ configuration.setString(TaskManagerOptions.REGISTRATION_TIMEOUT, "100 ms");
+ final TaskSlotTable taskSlotTable = new TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), timerService);
+
+ final long heartbeatInterval = 10L;
+ final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder().setTaskSlotTable(taskSlotTable).build();
+ final TaskExecutor taskExecutor = new TaskExecutor(
+ rpc,
+ TaskManagerConfiguration.fromConfiguration(configuration),
+ haServices,
+ taskManagerServices,
+ new HeartbeatServices(heartbeatInterval, 10L),
+ UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
+ dummyBlobCacheService,
+ testingFatalErrorHandler);
+
+ taskExecutor.start();
+
+ final CompletableFuture<ResourceID> registrationFuture = new CompletableFuture<>();
+ final OneShotLatch secondRegistration = new OneShotLatch();
+ try {
+ final TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
+ testingResourceManagerGateway.setRegisterTaskExecutorFunction(
+ tuple -> {
+ if (registrationFuture.complete(tuple.f1)) {
+ return CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(
+ new InstanceID(),
+ testingResourceManagerGateway.getOwnResourceId(),
+ heartbeatInterval,
+ new ClusterInformation("localhost", 1234)));
+ } else {
+ secondRegistration.trigger();
+ return CompletableFuture.completedFuture(new RegistrationResponse.Decline("Only the first registration should succeed."));
+ }
+ }
+ );
+ rpc.registerGateway(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway);
+
+ resourceManagerLeaderRetriever.notifyListener(testingResourceManagerGateway.getAddress(), UUID.randomUUID());
+
+ final ResourceID registrationResourceId = registrationFuture.get();
+
+ assertThat(registrationResourceId, equalTo(taskManagerServices.getTaskManagerLocation().getResourceID()));
+
+ secondRegistration.await();
+
+ final Throwable error = testingFatalErrorHandler.getErrorFuture().get();
+ assertThat(error, is(notNullValue()));
+ assertThat(ExceptionUtils.stripExecutionException(error), instanceOf(RegistrationTimeoutException.class));
+
+ testingFatalErrorHandler.clearError();
+ } finally {
+ RpcUtils.terminateRpcEndpoint(taskExecutor, timeout);
+ }
+ }
+
private static final class StartStopNotifyingLeaderRetrievalService implements LeaderRetrievalService {
private final CompletableFuture<LeaderRetrievalListener> startFuture;