You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/03/23 12:59:10 UTC
[2/2] flink git commit: [FLINK-4354] [heartbeat] Implement heartbeat
logic between TaskManager and ResourceManager
[FLINK-4354] [heartbeat] Implement heartbeat logic between TaskManager and ResourceManager
This closes #3591.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fd90672f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fd90672f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fd90672f
Branch: refs/heads/master
Commit: fd90672f9ccf7a0e02e5eb9c6251dc3d451ce8ba
Parents: d20fb09
Author: Zhijiang <wa...@aliyun.com>
Authored: Wed Mar 22 15:12:33 2017 +0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Mar 23 13:58:44 2017 +0100
----------------------------------------------------------------------
.../heartbeat/TestingHeartbeatServices.java | 52 ++++++++
.../flink/runtime/jobmaster/JobMaster.java | 4 +-
.../flink/runtime/minicluster/MiniCluster.java | 5 +-
.../RegistrationConnectionListener.java | 40 ++++++
.../resourcemanager/ResourceManager.java | 117 ++++++++++++++----
.../resourcemanager/ResourceManagerGateway.java | 16 +++
.../resourcemanager/ResourceManagerRunner.java | 9 +-
.../StandaloneResourceManager.java | 5 +
.../runtime/taskexecutor/TaskExecutor.java | 106 ++++++++++++++--
.../taskexecutor/TaskExecutorGateway.java | 14 +++
.../TaskExecutorRegistrationSuccess.java | 16 ++-
...TaskExecutorToResourceManagerConnection.java | 21 +++-
.../clusterframework/ResourceManagerTest.java | 108 +++++++++++++++++
.../flink/runtime/jobmaster/JobMasterTest.java | 38 +-----
.../resourcemanager/ResourceManagerHATest.java | 7 ++
.../ResourceManagerJobMasterTest.java | 7 ++
.../ResourceManagerTaskExecutorTest.java | 7 ++
.../slotmanager/SlotProtocolTest.java | 17 +++
.../taskexecutor/TaskExecutorITCase.java | 3 +
.../runtime/taskexecutor/TaskExecutorTest.java | 121 +++++++++++++++++--
.../yarn/YarnFlinkApplicationMasterRunner.java | 21 ++--
.../apache/flink/yarn/YarnResourceManager.java | 8 +-
22 files changed, 640 insertions(+), 102 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatServices.java
new file mode 100644
index 0000000..e628db5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatServices.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.heartbeat;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+
+public class TestingHeartbeatServices extends HeartbeatServices {
+
+ private final ScheduledExecutor scheduledExecutorToUse;
+
+ public TestingHeartbeatServices(long heartbeatInterval, long heartbeatTimeout, ScheduledExecutor scheduledExecutorToUse) {
+ super(heartbeatInterval, heartbeatTimeout);
+
+ this.scheduledExecutorToUse = Preconditions.checkNotNull(scheduledExecutorToUse);
+ }
+
+ @Override
+ public <I, O> HeartbeatManager<I, O> createHeartbeatManagerSender(
+ ResourceID resourceId,
+ HeartbeatListener<I, O> heartbeatListener,
+ ScheduledExecutor scheduledExecutor,
+ Logger log) {
+
+ return new HeartbeatManagerSenderImpl<>(
+ heartbeatInterval,
+ heartbeatTimeout,
+ resourceId,
+ heartbeatListener,
+ org.apache.flink.runtime.concurrent.Executors.directExecutor(),
+ scheduledExecutorToUse,
+ log);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 243b57f..81fc541 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -1043,11 +1043,11 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
@Override
public void notifyHeartbeatTimeout(ResourceID resourceID) {
- log.info("Task manager with id {} timed out.", resourceID);
+ log.info("Task manager with id {} heartbeat timed out.", resourceID);
getSelf().disconnectTaskManager(
resourceID,
- new TimeoutException("The heartbeat of TaskManager with id " + resourceID + " timed out."));
+ new TimeoutException("Task manager with id " + resourceID + " heartbeat timed out."));
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 25c4aba..2cfba7b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -241,7 +241,7 @@ public class MiniCluster {
// bring up the ResourceManager(s)
LOG.info("Starting {} ResourceManger(s)", numResourceManagers);
resourceManagerRunners = startResourceManagers(
- configuration, haServices, metricRegistry, numResourceManagers, resourceManagerRpcServices);
+ configuration, haServices, heartbeatServices, metricRegistry, numResourceManagers, resourceManagerRpcServices);
// bring up the TaskManager(s) for the mini cluster
LOG.info("Starting {} TaskManger(s)", numTaskManagers);
@@ -508,6 +508,7 @@ public class MiniCluster {
protected ResourceManagerRunner[] startResourceManagers(
Configuration configuration,
HighAvailabilityServices haServices,
+ HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry,
int numResourceManagers,
RpcService[] resourceManagerRpcServices) throws Exception {
@@ -517,9 +518,11 @@ public class MiniCluster {
for (int i = 0; i < numResourceManagers; i++) {
resourceManagerRunners[i] = new ResourceManagerRunner(
+ ResourceID.generate(),
configuration,
resourceManagerRpcServices[i],
haServices,
+ heartbeatServices,
metricRegistry);
resourceManagerRunners[i].start();
http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegistrationConnectionListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegistrationConnectionListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegistrationConnectionListener.java
new file mode 100644
index 0000000..360f982
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegistrationConnectionListener.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.registration;
+
+/**
+ * Classes which want to be notified about the registration result by the {@link RegisteredRpcConnection}
+ * have to implement this interface.
+ */
+public interface RegistrationConnectionListener<Success extends RegistrationResponse.Success> {
+
+ /**
+ * This method is called by the {@link RegisteredRpcConnection} when the registration is success.
+ *
+ * @param success The concrete response information for successful registration.
+ */
+ void onRegistrationSuccess(Success success);
+
+ /**
+ * This method is called by the {@link RegisteredRpcConnection} when the registration fails.
+ *
+ * @param failure The exception which causes the registration failure.
+ */
+ void onRegistrationFailure(Throwable failure);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 1430a49..9a7a790 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -30,6 +30,10 @@ import org.apache.flink.runtime.concurrent.ApplyFunction;
import org.apache.flink.runtime.concurrent.BiFunction;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.heartbeat.HeartbeatListener;
+import org.apache.flink.runtime.heartbeat.HeartbeatManager;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.heartbeat.HeartbeatTarget;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.LeaderIdMismatchException;
import org.apache.flink.runtime.instance.InstanceID;
@@ -64,6 +68,7 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
+import java.util.concurrent.TimeoutException;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -81,6 +86,9 @@ public abstract class ResourceManager<WorkerType extends Serializable>
extends RpcEndpoint<ResourceManagerGateway>
implements LeaderContender {
+ /** Unique id of the resource manager */
+ private final ResourceID resourceId;
+
/** Configuration of the resource manager */
private final ResourceManagerConfiguration resourceManagerConfiguration;
@@ -96,6 +104,9 @@ public abstract class ResourceManager<WorkerType extends Serializable>
/** High availability services for leader retrieval and election. */
private final HighAvailabilityServices highAvailabilityServices;
+ /** The heartbeat manager with task managers. */
+ private final HeartbeatManager<Void, Void> taskManagerHeartbeatManager;
+
/** The factory to construct the SlotManager. */
private final SlotManagerFactory slotManagerFactory;
@@ -118,9 +129,11 @@ public abstract class ResourceManager<WorkerType extends Serializable>
private ConcurrentMap<String, InfoMessageListenerRpcGateway> infoMessageListeners;
public ResourceManager(
+ ResourceID resourceId,
RpcService rpcService,
ResourceManagerConfiguration resourceManagerConfiguration,
HighAvailabilityServices highAvailabilityServices,
+ HeartbeatServices heartbeatServices,
SlotManagerFactory slotManagerFactory,
MetricRegistry metricRegistry,
JobLeaderIdService jobLeaderIdService,
@@ -128,6 +141,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
super(rpcService);
+ this.resourceId = checkNotNull(resourceId);
this.resourceManagerConfiguration = checkNotNull(resourceManagerConfiguration);
this.highAvailabilityServices = checkNotNull(highAvailabilityServices);
this.slotManagerFactory = checkNotNull(slotManagerFactory);
@@ -135,6 +149,12 @@ public abstract class ResourceManager<WorkerType extends Serializable>
this.jobLeaderIdService = checkNotNull(jobLeaderIdService);
this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
+ this.taskManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender(
+ resourceId,
+ new TaskManagerHeartbeatListener(),
+ rpcService.getScheduledExecutor(),
+ log);
+
this.jobManagerRegistrations = new HashMap<>(4);
this.taskExecutors = new HashMap<>(8);
this.leaderSessionId = null;
@@ -178,6 +198,8 @@ public abstract class ResourceManager<WorkerType extends Serializable>
public void shutDown() throws Exception {
Exception exception = null;
+ taskManagerHeartbeatManager.stop();
+
try {
super.shutDown();
} catch (Exception e) {
@@ -326,7 +348,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
*
* @param resourceManagerLeaderId The fencing token for the ResourceManager leader
* @param taskExecutorAddress The address of the TaskExecutor that registers
- * @param resourceID The resource ID of the TaskExecutor that registers
+ * @param taskExecutorResourceId The resource ID of the TaskExecutor that registers
*
* @return The response by the ResourceManager.
*/
@@ -334,7 +356,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
public Future<RegistrationResponse> registerTaskExecutor(
final UUID resourceManagerLeaderId,
final String taskExecutorAddress,
- final ResourceID resourceID,
+ final ResourceID taskExecutorResourceId,
final SlotReport slotReport) {
if (leaderSessionId.equals(resourceManagerLeaderId)) {
@@ -342,25 +364,37 @@ public abstract class ResourceManager<WorkerType extends Serializable>
return taskExecutorGatewayFuture.handleAsync(new BiFunction<TaskExecutorGateway, Throwable, RegistrationResponse>() {
@Override
- public RegistrationResponse apply(TaskExecutorGateway taskExecutorGateway, Throwable throwable) {
+ public RegistrationResponse apply(final TaskExecutorGateway taskExecutorGateway, Throwable throwable) {
if (throwable != null) {
return new RegistrationResponse.Decline(throwable.getMessage());
} else {
- WorkerRegistration<WorkerType> oldRegistration = taskExecutors.remove(resourceID);
+ WorkerRegistration<WorkerType> oldRegistration = taskExecutors.remove(taskExecutorResourceId);
if (oldRegistration != null) {
// TODO :: suggest old taskExecutor to stop itself
- log.info("Replacing old instance of worker for ResourceID {}", resourceID);
+ log.info("Replacing old instance of worker for ResourceID {}", taskExecutorResourceId);
}
- WorkerType newWorker = workerStarted(resourceID);
+ WorkerType newWorker = workerStarted(taskExecutorResourceId);
WorkerRegistration<WorkerType> registration =
new WorkerRegistration<>(taskExecutorGateway, newWorker);
- taskExecutors.put(resourceID, registration);
- slotManager.registerTaskExecutor(resourceID, registration, slotReport);
+ taskExecutors.put(taskExecutorResourceId, registration);
+ slotManager.registerTaskExecutor(taskExecutorResourceId, registration, slotReport);
+
+ taskManagerHeartbeatManager.monitorTarget(taskExecutorResourceId, new HeartbeatTarget<Void>() {
+ @Override
+ public void receiveHeartbeat(ResourceID resourceID, Void payload) {
+ // the task manager will not request heartbeat, so this method will never be called currently
+ }
+
+ @Override
+ public void requestHeartbeat(ResourceID resourceID, Void payload) {
+ taskExecutorGateway.heartbeatFromResourceManager(resourceID);
+ }
+ });
return new TaskExecutorRegistrationSuccess(
- registration.getInstanceID(),
+ registration.getInstanceID(), resourceId,
resourceManagerConfiguration.getHeartbeatInterval().toMilliseconds());
}
}
@@ -368,7 +402,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
} else {
log.warn("Discard registration from TaskExecutor {} at ({}) because the expected leader session ID {} did " +
"not equal the received leader session ID {}",
- resourceID, taskExecutorAddress, leaderSessionId, resourceManagerLeaderId);
+ taskExecutorResourceId, taskExecutorAddress, leaderSessionId, resourceManagerLeaderId);
return FlinkCompletableFuture.<RegistrationResponse>completed(
new RegistrationResponse.Decline("Discard registration because the leader id " +
@@ -377,6 +411,16 @@ public abstract class ResourceManager<WorkerType extends Serializable>
}
}
+ @RpcMethod
+ public void heartbeatFromTaskManager(final ResourceID resourceID) {
+ taskManagerHeartbeatManager.receiveHeartbeat(resourceID, null);
+ }
+
+ @RpcMethod
+ public void disconnectTaskManager(final ResourceID resourceId, final Exception cause) {
+ closeTaskManagerConnection(resourceId, cause);
+ }
+
/**
* Requests a slot from the resource manager.
*
@@ -716,24 +760,24 @@ public abstract class ResourceManager<WorkerType extends Serializable>
* This method should be called by the framework once it detects that a currently registered
* task executor has failed.
*
- * @param resourceID Id of the worker that has failed.
- * @param message An informational message that explains why the worker failed.
+ * @param resourceID Id of the TaskManager that has failed.
+ * @param cause The exception which cause the TaskManager failed.
*/
- public void notifyWorkerFailed(final ResourceID resourceID, final String message) {
- runAsync(new Runnable() {
- @Override
- public void run() {
- WorkerRegistration<WorkerType> workerRegistration = taskExecutors.remove(resourceID);
+ public void closeTaskManagerConnection(final ResourceID resourceID, final Exception cause) {
+ taskManagerHeartbeatManager.unmonitorTarget(resourceID);
- if (workerRegistration != null) {
- log.info("Task manager {} failed because {}.", resourceID, message);
- // TODO :: suggest failed task executor to stop itself
- slotManager.notifyTaskManagerFailure(resourceID);
- } else {
- log.debug("Could not find a registered task manager with the process id {}.", resourceID);
- }
- }
- });
+ WorkerRegistration<WorkerType> workerRegistration = taskExecutors.remove(resourceID);
+
+ if (workerRegistration != null) {
+ log.info("Task manager {} failed because {}.", resourceID, cause);
+
+ // TODO :: suggest failed task executor to stop itself
+ slotManager.notifyTaskManagerFailure(resourceID);
+
+ workerRegistration.getTaskExecutorGateway().disconnectResourceManager(cause);
+ } else {
+ log.debug("Could not find a registered task manager with the process id {}.", resourceID);
+ }
}
// ------------------------------------------------------------------------
@@ -827,5 +871,26 @@ public abstract class ResourceManager<WorkerType extends Serializable>
onFatalErrorAsync(error);
}
}
+
+ private class TaskManagerHeartbeatListener implements HeartbeatListener<Void, Void> {
+
+ @Override
+ public void notifyHeartbeatTimeout(ResourceID resourceID) {
+ log.info("The heartbeat of TaskManager with id {} timed out.", resourceID);
+
+ closeTaskManagerConnection(resourceID, new TimeoutException(
+ "Task manager with id " + resourceID + " heartbeat timed out."));
+ }
+
+ @Override
+ public void reportPayload(ResourceID resourceID, Void payload) {
+ // nothing to do since there is no payload
+ }
+
+ @Override
+ public Future<Void> retrievePayload() {
+ return FlinkCompletableFuture.completed(null);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index 8235ea7..7741e0d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -130,4 +130,20 @@ public interface ResourceManagerGateway extends RpcGateway {
* @return The future to the number of registered TaskManagers.
*/
Future<Integer> getNumberOfRegisteredTaskManagers(UUID leaderSessionId);
+
+ /**
+ * Sends the heartbeat to resource manager from task manager
+ *
+ * @param resourceID unique id of the task manager
+ */
+ void heartbeatFromTaskManager(final ResourceID resourceID);
+
+ /**
+ * Disconnects the given {@link org.apache.flink.runtime.taskexecutor.TaskExecutor} from the
+ * {@link ResourceManager}.
+ *
+ * @param resourceID identifying the TaskManager to disconnect
+ * @param cause for the disconnection of the TaskManager
+ */
+ void disconnectTaskManager(ResourceID resourceID, Exception cause);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
index 73b27b5..d07e373 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
@@ -19,6 +19,8 @@
package org.apache.flink.runtime.resourcemanager;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
@@ -43,14 +45,18 @@ public class ResourceManagerRunner implements FatalErrorHandler {
private final ResourceManager<?> resourceManager;
public ResourceManagerRunner(
+ final ResourceID resourceId,
final Configuration configuration,
final RpcService rpcService,
final HighAvailabilityServices highAvailabilityServices,
+ final HeartbeatServices heartbeatServices,
final MetricRegistry metricRegistry) throws Exception {
+ Preconditions.checkNotNull(resourceId);
Preconditions.checkNotNull(configuration);
Preconditions.checkNotNull(rpcService);
Preconditions.checkNotNull(highAvailabilityServices);
+ Preconditions.checkNotNull(heartbeatServices);
Preconditions.checkNotNull(metricRegistry);
final ResourceManagerConfiguration resourceManagerConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration);
@@ -63,9 +69,11 @@ public class ResourceManagerRunner implements FatalErrorHandler {
rpcService.getScheduledExecutor());
this.resourceManager = new StandaloneResourceManager(
+ resourceId,
rpcService,
resourceManagerConfiguration,
highAvailabilityServices,
+ heartbeatServices,
resourceManagerRuntimeServices.getSlotManagerFactory(),
metricRegistry,
resourceManagerRuntimeServices.getJobLeaderIdService(),
@@ -87,7 +95,6 @@ public class ResourceManagerRunner implements FatalErrorHandler {
private void shutDownInternally() throws Exception {
Exception exception = null;
synchronized (lock) {
-
try {
resourceManager.shutDown();
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
index 73c8a2d..e2d6538 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.resourcemanager;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
@@ -37,17 +38,21 @@ import org.apache.flink.runtime.rpc.RpcService;
public class StandaloneResourceManager extends ResourceManager<ResourceID> {
public StandaloneResourceManager(
+ ResourceID resourceId,
RpcService rpcService,
ResourceManagerConfiguration resourceManagerConfiguration,
HighAvailabilityServices highAvailabilityServices,
+ HeartbeatServices heartbeatServices,
SlotManagerFactory slotManagerFactory,
MetricRegistry metricRegistry,
JobLeaderIdService jobLeaderIdService,
FatalErrorHandler fatalErrorHandler) {
super(
+ resourceId,
rpcService,
resourceManagerConfiguration,
highAvailabilityServices,
+ heartbeatServices,
slotManagerFactory,
metricRegistry,
jobLeaderIdService,
http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 83c225f..f3e1ff3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -57,6 +57,7 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.registration.RegistrationConnectionListener;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
@@ -135,6 +136,9 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
/** The heartbeat manager for job manager in the task manager */
private final HeartbeatManager<Void, Void> jobManagerHeartbeatManager;
+ /** The heartbeat manager for resource manager in the task manager */
+ private final HeartbeatManager<Void, Void> resourceManagerHeartbeatManager;
+
/** The fatal error handler to use in case of a fatal error */
private final FatalErrorHandler fatalErrorHandler;
@@ -206,6 +210,12 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
new JobManagerHeartbeatListener(),
rpcService.getScheduledExecutor(),
log);
+
+ this.resourceManagerHeartbeatManager = heartbeatServices.createHeartbeatManager(
+ getResourceID(),
+ new ResourceManagerHeartbeatListener(),
+ rpcService.getScheduledExecutor(),
+ log);
}
// ------------------------------------------------------------------------
@@ -247,6 +257,8 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
jobManagerHeartbeatManager.stop();
+ resourceManagerHeartbeatManager.stop();
+
ioManager.shutdown();
memoryManager.shutdown();
@@ -497,6 +509,11 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
jobManagerHeartbeatManager.requestHeartbeat(resourceID, null);
}
+ @RpcMethod
+ public void heartbeatFromResourceManager(ResourceID resourceID) {
+ resourceManagerHeartbeatManager.requestHeartbeat(resourceID, null);
+ }
+
// ----------------------------------------------------------------------
// Checkpointing RPCs
// ----------------------------------------------------------------------
@@ -619,11 +636,20 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
return new TMSlotRequestRegistered(resourceManagerConnection.getRegistrationId(), getResourceID(), allocationId);
}
+ // ----------------------------------------------------------------------
+ // Disconnection RPCs
+ // ----------------------------------------------------------------------
+
@RpcMethod
public void disconnectJobManager(JobID jobId, Exception cause) {
closeJobManagerConnection(jobId, cause);
}
+ @RpcMethod
+ public void disconnectResourceManager(Exception cause) {
+ closeResourceManagerConnection(cause);
+ }
+
// ======================================================================
// Internal methods
// ======================================================================
@@ -665,11 +691,25 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
newLeaderAddress,
newLeaderId,
getMainThreadExecutor(),
- new ForwardingFatalErrorHandler());
+ new ResourceManagerRegistrationListener());
resourceManagerConnection.start();
}
}
+ private void closeResourceManagerConnection(Exception cause) {
+ log.info("Close ResourceManager connection for {}.", cause);
+
+ if (isConnectedToResourceManager()) {
+ resourceManagerHeartbeatManager.unmonitorTarget(resourceManagerConnection.getResourceManagerId());
+
+ ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway();
+ resourceManagerConnection.close();
+ resourceManagerConnection = null;
+
+ resourceManagerGateway.disconnectTaskManager(getResourceID(), cause);
+ }
+ }
+
// ------------------------------------------------------------------------
// Internal job manager connection methods
// ------------------------------------------------------------------------
@@ -747,10 +787,10 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
offerSlotsToJobManager(jobId);
} else {
log.warn("Slot offering to JobManager failed. Freeing the slots " +
- "and returning them to the ResourceManager.", throwable);
+ "and returning them to the ResourceManager.", throwable);
// We encountered an exception. Free the slots and return them to the RM.
- for (SlotOffer reservedSlot: reservedSlots) {
+ for (SlotOffer reservedSlot : reservedSlots) {
freeSlot(reservedSlot.getAllocationId(), throwable);
}
}
@@ -1137,11 +1177,32 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
}
}
- private final class ForwardingFatalErrorHandler implements FatalErrorHandler {
+ private final class ResourceManagerRegistrationListener implements RegistrationConnectionListener<TaskExecutorRegistrationSuccess> {
@Override
- public void onFatalError(Throwable exception) {
- onFatalErrorAsync(exception);
+ public void onRegistrationSuccess(TaskExecutorRegistrationSuccess success) {
+ final ResourceID resourceManagerId = success.getResourceManagerId();
+
+ // monitor the resource manager as heartbeat target
+ resourceManagerHeartbeatManager.monitorTarget(resourceManagerId, new HeartbeatTarget<Void>() {
+ @Override
+ public void receiveHeartbeat(ResourceID resourceID, Void payload) {
+ if (isConnectedToResourceManager()) {
+ ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway();
+ resourceManagerGateway.heartbeatFromTaskManager(resourceID);
+ }
+ }
+
+ @Override
+ public void requestHeartbeat(ResourceID resourceID, Void payload) {
+ // request heartbeat will never be called on the task manager side
+ }
+ });
+ }
+
+ @Override
+ public void onRegistrationFailure(Throwable failure) {
+ onFatalErrorAsync(failure);
}
}
@@ -1216,15 +1277,14 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
runAsync(new Runnable() {
@Override
public void run() {
- log.info("The JobManager connection {} has timed out.", resourceID);
+ log.info("Job manager with id {} heartbeat timed out.", resourceID);
if (jobManagerConnections.containsKey(resourceID)) {
JobManagerConnection jobManagerConnection = jobManagerConnections.get(resourceID);
if (jobManagerConnection != null) {
closeJobManagerConnection(
jobManagerConnection.getJobID(),
- new TimeoutException("The heartbeat of JobManager with id " +
- resourceID + " timed out."));
+ new TimeoutException("Job manager with id " + resourceID + " heartbeat timed out."));
}
}
}
@@ -1241,4 +1301,32 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
return FlinkCompletableFuture.completed(null);
}
}
+
+ private class ResourceManagerHeartbeatListener implements HeartbeatListener<Void, Void> {
+
+ @Override
+ public void notifyHeartbeatTimeout(final ResourceID resourceID) {
+ runAsync(new Runnable() {
+ @Override
+ public void run() {
+ log.info("Resource manager with id {} heartbeat timed out.", resourceID);
+
+ if (isConnectedToResourceManager() && resourceManagerConnection.getResourceManagerId().equals(resourceID)) {
+ closeResourceManagerConnection(
+ new TimeoutException("Resource manager with id " + resourceID + " heartbeat timed out."));
+ }
+ }
+ });
+ }
+
+ @Override
+ public void reportPayload(ResourceID resourceID, Void payload) {
+ // nothing to do since the payload is of type Void
+ }
+
+ @Override
+ public Future<Void> retrievePayload() {
+ return FlinkCompletableFuture.completed(null);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
index 2dcc3a4..2bbf0e6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
@@ -141,10 +141,24 @@ public interface TaskExecutorGateway extends RpcGateway {
void heartbeatFromJobManager(ResourceID heartbeatOrigin);
/**
+ * Heartbeat request from the resource manager
+ *
+ * @param heartbeatOrigin unique id of the resource manager
+ */
+ void heartbeatFromResourceManager(ResourceID heartbeatOrigin);
+
+ /**
* Disconnects the given JobManager from the TaskManager.
*
* @param jobId JobID for which the JobManager was the leader
* @param cause for the disconnection from the JobManager
*/
void disconnectJobManager(JobID jobId, Exception cause);
+
+ /**
+ * Disconnects the ResourceManager from the TaskManager.
+ *
+ * @param cause for the disconnection from the ResourceManager
+ */
+ void disconnectResourceManager(Exception cause);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorRegistrationSuccess.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorRegistrationSuccess.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorRegistrationSuccess.java
index b357f52..4b61f68 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorRegistrationSuccess.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorRegistrationSuccess.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.taskexecutor;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.registration.RegistrationResponse;
@@ -33,16 +34,20 @@ public final class TaskExecutorRegistrationSuccess extends RegistrationResponse.
private final InstanceID registrationId;
+ private final ResourceID resourceManagerResourceId;
+
private final long heartbeatInterval;
/**
* Create a new {@code TaskExecutorRegistrationSuccess} message.
*
* @param registrationId The ID that the ResourceManager assigned the registration.
+ * @param resourceManagerResourceId The unique ID that identifies the ResourceManager.
* @param heartbeatInterval The interval in which the ResourceManager will heartbeat the TaskExecutor.
*/
- public TaskExecutorRegistrationSuccess(InstanceID registrationId, long heartbeatInterval) {
+ public TaskExecutorRegistrationSuccess(InstanceID registrationId, ResourceID resourceManagerResourceId, long heartbeatInterval) {
this.registrationId = registrationId;
+ this.resourceManagerResourceId = resourceManagerResourceId;
this.heartbeatInterval = heartbeatInterval;
}
@@ -54,6 +59,13 @@ public final class TaskExecutorRegistrationSuccess extends RegistrationResponse.
}
/**
+ * Gets the unique ID that identifies the ResourceManager.
+ */
+ public ResourceID getResourceManagerId() {
+ return resourceManagerResourceId;
+ }
+
+ /**
* Gets the interval in which the ResourceManager will heartbeat the TaskExecutor.
*/
public long getHeartbeatInterval() {
@@ -62,7 +74,7 @@ public final class TaskExecutorRegistrationSuccess extends RegistrationResponse.
@Override
public String toString() {
- return "TaskExecutorRegistrationSuccess (" + registrationId + " / " + heartbeatInterval + ')';
+ return "TaskExecutorRegistrationSuccess (" + registrationId + " / " + resourceManagerResourceId + " / " + heartbeatInterval + ')';
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
index 6e3e39b..775482c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.registration.RegisteredRpcConnection;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.registration.RegistrationConnectionListener;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.registration.RetryingRegistration;
@@ -51,10 +51,12 @@ public class TaskExecutorToResourceManagerConnection
private final SlotReport slotReport;
- private final FatalErrorHandler fatalErrorHandler;
+ private final RegistrationConnectionListener<TaskExecutorRegistrationSuccess> registrationListener;
private InstanceID registrationId;
+ private ResourceID resourceManagerResourceId;
+
public TaskExecutorToResourceManagerConnection(
Logger log,
RpcService rpcService,
@@ -64,7 +66,7 @@ public class TaskExecutorToResourceManagerConnection
String resourceManagerAddress,
UUID resourceManagerLeaderId,
Executor executor,
- FatalErrorHandler fatalErrorHandler) {
+ RegistrationConnectionListener<TaskExecutorRegistrationSuccess> registrationListener) {
super(log, resourceManagerAddress, resourceManagerLeaderId, executor);
@@ -72,7 +74,7 @@ public class TaskExecutorToResourceManagerConnection
this.taskManagerAddress = Preconditions.checkNotNull(taskManagerAddress);
this.taskManagerResourceId = Preconditions.checkNotNull(taskManagerResourceId);
this.slotReport = Preconditions.checkNotNull(slotReport);
- this.fatalErrorHandler = Preconditions.checkNotNull(fatalErrorHandler);
+ this.registrationListener = Preconditions.checkNotNull(registrationListener);
}
@@ -94,13 +96,15 @@ public class TaskExecutorToResourceManagerConnection
getTargetAddress(), success.getRegistrationId());
registrationId = success.getRegistrationId();
+ resourceManagerResourceId = success.getResourceManagerId();
+ registrationListener.onRegistrationSuccess(success);
}
@Override
protected void onRegistrationFailure(Throwable failure) {
log.info("Failed to register at resource manager {}.", getTargetAddress(), failure);
- fatalErrorHandler.onFatalError(failure);
+ registrationListener.onRegistrationFailure(failure);
}
/**
@@ -111,6 +115,13 @@ public class TaskExecutorToResourceManagerConnection
return registrationId;
}
+ /**
+ * Gets the unique id of ResourceManager, that is returned when registration success.
+ */
+ public ResourceID getResourceManagerId() {
+ return resourceManagerResourceId;
+ }
+
// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
index ca8a07a..e7f2439 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.clusterframework;
import akka.actor.ActorSystem;
import akka.testkit.JavaTestKit;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
@@ -30,21 +31,47 @@ import org.apache.flink.runtime.clusterframework.messages.RemoveResource;
import org.apache.flink.runtime.clusterframework.messages.ResourceRemoved;
import org.apache.flink.runtime.clusterframework.messages.TriggerRegistrationAtJobManager;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
+import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
+import org.apache.flink.runtime.resourcemanager.TestingSlotManagerFactory;
+import org.apache.flink.runtime.rpc.TestingSerialRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.TestingResourceManager;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.mockito.ArgumentCaptor;
import scala.Option;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import static org.junit.Assert.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
/**
* General tests for the resource manager component.
@@ -335,4 +362,85 @@ public class ResourceManagerTest {
}};
}};
}
+
+ @Test
+ public void testHeartbeatTimeoutWithTaskExecutor() throws Exception {
+ final String taskManagerAddress = "tm";
+ final ResourceID taskManagerResourceID = new ResourceID(taskManagerAddress);
+ final ResourceID resourceManagerResourceID = ResourceID.generate();
+ final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
+
+ final TestingSerialRpcService rpcService = new TestingSerialRpcService();
+ rpcService.registerGateway(taskManagerAddress, taskExecutorGateway);
+
+ final ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration(
+ Time.seconds(5L),
+ Time.seconds(5L));
+
+ final TestingLeaderElectionService rmLeaderElectionService = new TestingLeaderElectionService();
+ final TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
+ highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService);
+
+ final long heartbeatInterval = 1L;
+ final long heartbeatTimeout = 5L;
+ final ScheduledExecutor scheduledExecutor = mock(ScheduledExecutor.class);
+ final HeartbeatServices heartbeatServices = new TestingHeartbeatServices(heartbeatInterval, heartbeatTimeout, scheduledExecutor);
+
+ final TestingSlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory();
+ final MetricRegistry metricRegistry = mock(MetricRegistry.class);
+ final JobLeaderIdService jobLeaderIdService = mock(JobLeaderIdService.class);
+ final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
+
+ try {
+ final StandaloneResourceManager resourceManager = new StandaloneResourceManager(
+ resourceManagerResourceID,
+ rpcService,
+ resourceManagerConfiguration,
+ highAvailabilityServices,
+ heartbeatServices,
+ slotManagerFactory,
+ metricRegistry,
+ jobLeaderIdService,
+ testingFatalErrorHandler);
+
+ resourceManager.start();
+
+ final UUID rmLeaderSessionId = UUID.randomUUID();
+ rmLeaderElectionService.isLeader(rmLeaderSessionId);
+
+ final SlotReport slotReport = new SlotReport();
+ // test registration response successful and it will trigger monitor heartbeat target, schedule heartbeat request at interval time
+ Future<RegistrationResponse> successfulFuture =
+ resourceManager.registerTaskExecutor(rmLeaderSessionId, taskManagerAddress, taskManagerResourceID, slotReport);
+ RegistrationResponse response = successfulFuture.get(5, TimeUnit.SECONDS);
+ assertTrue(response instanceof TaskExecutorRegistrationSuccess);
+
+ ArgumentCaptor<Runnable> heartbeatRunnableCaptor = ArgumentCaptor.forClass(Runnable.class);
+ verify(scheduledExecutor, times(1)).scheduleAtFixedRate(
+ heartbeatRunnableCaptor.capture(),
+ eq(0L),
+ eq(heartbeatInterval),
+ eq(TimeUnit.MILLISECONDS));
+
+ Runnable heartbeatRunnable = heartbeatRunnableCaptor.getValue();
+
+ ArgumentCaptor<Runnable> timeoutRunnableCaptor = ArgumentCaptor.forClass(Runnable.class);
+ verify(scheduledExecutor).schedule(timeoutRunnableCaptor.capture(), eq(heartbeatTimeout), eq(TimeUnit.MILLISECONDS));
+
+ Runnable timeoutRunnable = timeoutRunnableCaptor.getValue();
+
+ // run the first heartbeat request
+ heartbeatRunnable.run();
+
+ verify(taskExecutorGateway, times(1)).heartbeatFromResourceManager(eq(resourceManagerResourceID));
+
+ // run the timeout runnable to simulate a heartbeat timeout
+ timeoutRunnable.run();
+
+ verify(taskExecutorGateway).disconnectResourceManager(any(TimeoutException.class));
+
+ } finally {
+ rpcService.stopService();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 43536b6..73da244 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -26,10 +26,7 @@ import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoader;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
-import org.apache.flink.runtime.heartbeat.HeartbeatListener;
-import org.apache.flink.runtime.heartbeat.HeartbeatManager;
-import org.apache.flink.runtime.heartbeat.HeartbeatManagerSenderImpl;
-import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.heartbeat.*;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.OnCompletionActions;
@@ -38,14 +35,12 @@ import org.apache.flink.runtime.rpc.TestingSerialRpcService;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
-import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
-import org.slf4j.Logger;
import java.net.InetAddress;
import java.net.URL;
@@ -108,10 +103,9 @@ public class JobMasterTest extends TestLogger {
testingFatalErrorHandler,
new FlinkUserCodeClassLoader(new URL[0]));
- // also start the heartbeat manager in job manager
jobMaster.start(jmLeaderId);
- // register task manager will trigger monitoring heartbeat target, schedule heartbeat request in interval time
+ // register task manager will trigger monitor heartbeat target, schedule heartbeat request at interval time
jobMaster.registerTaskManager(taskManagerAddress, taskManagerLocation, jmLeaderId);
ArgumentCaptor<Runnable> heartbeatRunnableCaptor = ArgumentCaptor.forClass(Runnable.class);
@@ -145,32 +139,4 @@ public class JobMasterTest extends TestLogger {
rpc.stopService();
}
}
-
- private static class TestingHeartbeatServices extends HeartbeatServices {
-
- private final ScheduledExecutor scheduledExecutorToUse;
-
- public TestingHeartbeatServices(long heartbeatInterval, long heartbeatTimeout, ScheduledExecutor scheduledExecutorToUse) {
- super(heartbeatInterval, heartbeatTimeout);
-
- this.scheduledExecutorToUse = Preconditions.checkNotNull(scheduledExecutorToUse);
- }
-
- @Override
- public <I, O> HeartbeatManager<I, O> createHeartbeatManagerSender(
- ResourceID resourceId,
- HeartbeatListener<I, O> heartbeatListener,
- ScheduledExecutor scheduledExecutor,
- Logger log) {
-
- return new HeartbeatManagerSenderImpl<>(
- heartbeatInterval,
- heartbeatTimeout,
- resourceId,
- heartbeatListener,
- org.apache.flink.runtime.concurrent.Executors.directExecutor(),
- scheduledExecutorToUse,
- log);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
index 1aa799b..39594df 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
@@ -19,6 +19,8 @@
package org.apache.flink.runtime.resourcemanager;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.metrics.MetricRegistry;
@@ -40,12 +42,15 @@ public class ResourceManagerHATest {
@Test
public void testGrantAndRevokeLeadership() throws Exception {
+ ResourceID rmResourceId = ResourceID.generate();
RpcService rpcService = new TestingSerialRpcService();
TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService();
TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
highAvailabilityServices.setResourceManagerLeaderElectionService(leaderElectionService);
+ HeartbeatServices heartbeatServices = mock(HeartbeatServices.class);
+
ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration(
Time.seconds(5L),
Time.seconds(5L));
@@ -63,9 +68,11 @@ public class ResourceManagerHATest {
final ResourceManager resourceManager =
new StandaloneResourceManager(
+ rmResourceId,
rpcService,
resourceManagerConfiguration,
highAvailabilityServices,
+ heartbeatServices,
slotManagerFactory,
metricRegistry,
resourceManagerRuntimeServices.getJobLeaderIdService(),
http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
index 9a68eca..0401f9e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
@@ -20,7 +20,9 @@ package org.apache.flink.runtime.resourcemanager;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
@@ -196,10 +198,13 @@ public class ResourceManagerJobMasterTest {
JobID jobID,
TestingLeaderRetrievalService jobMasterLeaderRetrievalService,
FatalErrorHandler fatalErrorHandler) throws Exception {
+ ResourceID rmResourceId = ResourceID.generate();
TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
highAvailabilityServices.setResourceManagerLeaderElectionService(resourceManagerLeaderElectionService);
highAvailabilityServices.setJobMasterLeaderRetriever(jobID, jobMasterLeaderRetrievalService);
+ HeartbeatServices heartbeatServices = mock(HeartbeatServices.class);
+
ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration(
Time.seconds(5L),
Time.seconds(5L));
@@ -211,9 +216,11 @@ public class ResourceManagerJobMasterTest {
Time.minutes(5L));
ResourceManager resourceManager = new StandaloneResourceManager(
+ rmResourceId,
rpcService,
resourceManagerConfiguration,
highAvailabilityServices,
+ heartbeatServices,
slotManagerFactory,
metricRegistry,
jobLeaderIdService,
http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
index 0a1addb..7c811d9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.resourcemanager;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.metrics.MetricRegistry;
@@ -52,6 +53,8 @@ public class ResourceManagerTaskExecutorTest {
private ResourceID taskExecutorResourceID;
+ private ResourceID resourceManagerResourceID;
+
private StandaloneResourceManager resourceManager;
private UUID leaderSessionId;
@@ -63,6 +66,7 @@ public class ResourceManagerTaskExecutorTest {
rpcService = new TestingSerialRpcService();
taskExecutorResourceID = mockTaskExecutor(taskExecutorAddress);
+ resourceManagerResourceID = ResourceID.generate();
TestingLeaderElectionService rmLeaderElectionService = new TestingLeaderElectionService();
testingFatalErrorHandler = new TestingFatalErrorHandler();
resourceManager = createAndStartResourceManager(rmLeaderElectionService, testingFatalErrorHandler);
@@ -144,6 +148,7 @@ public class ResourceManagerTaskExecutorTest {
private StandaloneResourceManager createAndStartResourceManager(TestingLeaderElectionService rmLeaderElectionService, FatalErrorHandler fatalErrorHandler) throws Exception {
TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
+ HeartbeatServices heartbeatServices = mock(HeartbeatServices.class);
highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService);
TestingSlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory();
ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration(
@@ -158,9 +163,11 @@ public class ResourceManagerTaskExecutorTest {
StandaloneResourceManager resourceManager =
new StandaloneResourceManager(
+ resourceManagerResourceID,
rpcService,
resourceManagerConfiguration,
highAvailabilityServices,
+ heartbeatServices,
slotManagerFactory,
metricRegistry,
jobLeaderIdService,
http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
index ea660f8..28ed697 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
@@ -98,6 +99,7 @@ public class SlotProtocolTest extends TestLogger {
final String rmAddress = "/rm1";
final String jmAddress = "/jm1";
final JobID jobID = new JobID();
+ final ResourceID rmResourceId = new ResourceID(rmAddress);
testRpcService.registerGateway(jmAddress, mock(JobMasterGateway.class));
@@ -117,11 +119,16 @@ public class SlotProtocolTest extends TestLogger {
Time.seconds(5L));
final TestingSlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory();
+
+ final HeartbeatServices heartbeatServices = mock(HeartbeatServices.class);
+
SpiedResourceManager resourceManager =
new SpiedResourceManager(
+ rmResourceId,
testRpcService,
resourceManagerConfiguration,
testingHaServices,
+ heartbeatServices,
slotManagerFactory,
mock(MetricRegistry.class),
jobLeaderIdService,
@@ -198,6 +205,7 @@ public class SlotProtocolTest extends TestLogger {
final String jmAddress = "/jm1";
final String tmAddress = "/tm1";
final JobID jobID = new JobID();
+ final ResourceID rmResourceId = new ResourceID(rmAddress);
testRpcService.registerGateway(jmAddress, mock(JobMasterGateway.class));
@@ -224,11 +232,16 @@ public class SlotProtocolTest extends TestLogger {
Time.seconds(5L));
TestingSlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory();
+
+ HeartbeatServices heartbeatServices = mock(HeartbeatServices.class);
+
ResourceManager<ResourceID> resourceManager =
Mockito.spy(new StandaloneResourceManager(
+ rmResourceId,
testRpcService,
resourceManagerConfiguration,
testingHaServices,
+ heartbeatServices,
slotManagerFactory,
mock(MetricRegistry.class),
jobLeaderIdService,
@@ -302,17 +315,21 @@ public class SlotProtocolTest extends TestLogger {
private int startNewWorkerCalled = 0;
public SpiedResourceManager(
+ ResourceID resourceId,
RpcService rpcService,
ResourceManagerConfiguration resourceManagerConfiguration,
HighAvailabilityServices highAvailabilityServices,
+ HeartbeatServices heartbeatServices,
SlotManagerFactory slotManagerFactory,
MetricRegistry metricRegistry,
JobLeaderIdService jobLeaderIdService,
FatalErrorHandler fatalErrorHandler) {
super(
+ resourceId,
rpcService,
resourceManagerConfiguration,
highAvailabilityServices,
+ heartbeatServices,
slotManagerFactory,
metricRegistry,
jobLeaderIdService,
http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
index f6c2dce..4e76486 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
@@ -87,6 +87,7 @@ public class TaskExecutorITCase {
final String rmAddress = "rm";
final String jmAddress = "jm";
final UUID jmLeaderId = UUID.randomUUID();
+ final ResourceID rmResourceId = new ResourceID(rmAddress);
final JobID jobId = new JobID();
final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
@@ -119,9 +120,11 @@ public class TaskExecutorITCase {
final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation);
ResourceManager<ResourceID> resourceManager = new StandaloneResourceManager(
+ rmResourceId,
rpcService,
resourceManagerConfiguration,
testingHAServices,
+ heartbeatServices,
slotManagerFactory,
metricRegistry,
jobLeaderIdService,
http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 67196aa..d1f6e2e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -213,9 +213,104 @@ public class TaskExecutorTest extends TestLogger {
}
@Test
+ public void testHeartbeatTimeoutWithResourceManager() throws Exception {
+ final String rmAddress = "rm";
+ final String tmAddress = "tm";
+ final ResourceID rmResourceId = new ResourceID(rmAddress);
+ final ResourceID tmResourceId = new ResourceID(tmAddress);
+ final UUID rmLeaderId = UUID.randomUUID();
+
+ // register the mock resource manager gateway
+ ResourceManagerGateway rmGateway = mock(ResourceManagerGateway.class);
+ when(rmGateway.registerTaskExecutor(
+ any(UUID.class), anyString(), any(ResourceID.class), any(SlotReport.class), any(Time.class)))
+ .thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(
+ new TaskExecutorRegistrationSuccess(new InstanceID(), rmResourceId, 10L)));
+
+ final TestingSerialRpcService rpc = new TestingSerialRpcService();
+ rpc.registerGateway(rmAddress, rmGateway);
+
+ final TestingLeaderRetrievalService testLeaderService = new TestingLeaderRetrievalService();
+ final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
+ haServices.setResourceManagerLeaderRetriever(testLeaderService);
+
+ final TaskManagerConfiguration taskManagerConfiguration = mock(TaskManagerConfiguration.class);
+ when(taskManagerConfiguration.getNumberSlots()).thenReturn(1);
+
+ final TaskManagerLocation taskManagerLocation = mock(TaskManagerLocation.class);
+ when(taskManagerLocation.getResourceID()).thenReturn(tmResourceId);
+
+ final TaskSlotTable taskSlotTable = mock(TaskSlotTable.class);
+ final SlotReport slotReport = new SlotReport();
+ when(taskSlotTable.createSlotReport(any(ResourceID.class))).thenReturn(slotReport);
+
+ final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
+
+ final long heartbeatTimeout = 10L;
+ HeartbeatServices heartbeatServices = mock(HeartbeatServices.class);
+ when(heartbeatServices.createHeartbeatManager(
+ eq(taskManagerLocation.getResourceID()),
+ any(HeartbeatListener.class),
+ any(ScheduledExecutor.class),
+ any(Logger.class))).thenAnswer(
+ new Answer<HeartbeatManagerImpl<Void, Void>>() {
+ @Override
+ public HeartbeatManagerImpl<Void, Void> answer(InvocationOnMock invocation) throws Throwable {
+ return new HeartbeatManagerImpl<>(
+ heartbeatTimeout,
+ taskManagerLocation.getResourceID(),
+ (HeartbeatListener<Void, Void>)invocation.getArguments()[1],
+ (Executor)invocation.getArguments()[2],
+ (ScheduledExecutor)invocation.getArguments()[2],
+ (Logger)invocation.getArguments()[3]);
+ }
+ }
+ );
+
+ try {
+ final TaskExecutor taskManager = new TaskExecutor(
+ taskManagerConfiguration,
+ taskManagerLocation,
+ rpc,
+ mock(MemoryManager.class),
+ mock(IOManager.class),
+ mock(NetworkEnvironment.class),
+ haServices,
+ heartbeatServices,
+ mock(MetricRegistry.class),
+ mock(TaskManagerMetricGroup.class),
+ mock(BroadcastVariableManager.class),
+ mock(FileCache.class),
+ taskSlotTable,
+ mock(JobManagerTable.class),
+ mock(JobLeaderService.class),
+ testingFatalErrorHandler);
+
+ taskManager.start();
+
+ // define a leader and see that a registration happens
+ testLeaderService.notifyListener(rmAddress, rmLeaderId);
+
+ // register resource manager success will trigger monitoring heartbeat target between tm and rm
+ verify(rmGateway).registerTaskExecutor(
+ eq(rmLeaderId), eq(taskManager.getAddress()), eq(tmResourceId), any(SlotReport.class), any(Time.class));
+
+ // heartbeat timeout should trigger disconnect TaskManager from ResourceManager
+ verify(rmGateway, timeout(heartbeatTimeout * 5)).disconnectTaskManager(eq(taskManagerLocation.getResourceID()), any(TimeoutException.class));
+
+ // check if a concurrent error occurred
+ testingFatalErrorHandler.rethrowError();
+
+ } finally {
+ rpc.stopService();
+ }
+ }
+
+ @Test
public void testImmediatelyRegistersIfLeaderIsKnown() throws Exception {
final ResourceID resourceID = ResourceID.generate();
final String resourceManagerAddress = "/resource/manager/address/one";
+ final ResourceID resourceManagerResourceId = new ResourceID(resourceManagerAddress);
final TestingSerialRpcService rpc = new TestingSerialRpcService();
try {
@@ -223,7 +318,8 @@ public class TaskExecutorTest extends TestLogger {
ResourceManagerGateway rmGateway = mock(ResourceManagerGateway.class);
when(rmGateway.registerTaskExecutor(
any(UUID.class), anyString(), any(ResourceID.class), any(SlotReport.class), any(Time.class)))
- .thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new TaskExecutorRegistrationSuccess(new InstanceID(), 10L)));
+ .thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new TaskExecutorRegistrationSuccess(
+ new InstanceID(), resourceManagerResourceId, 10L)));
TaskManagerConfiguration taskManagerServicesConfiguration = mock(TaskManagerConfiguration.class);
when(taskManagerServicesConfiguration.getNumberSlots()).thenReturn(1);
@@ -275,12 +371,14 @@ public class TaskExecutorTest extends TestLogger {
@Test
public void testTriggerRegistrationOnLeaderChange() throws Exception {
- final ResourceID resourceID = ResourceID.generate();
+ final ResourceID tmResourceID = ResourceID.generate();
final String address1 = "/resource/manager/address/one";
final String address2 = "/resource/manager/address/two";
final UUID leaderId1 = UUID.randomUUID();
final UUID leaderId2 = UUID.randomUUID();
+ final ResourceID rmResourceId1 = new ResourceID(address1);
+ final ResourceID rmResourceId2 = new ResourceID(address2);
final TestingSerialRpcService rpc = new TestingSerialRpcService();
try {
@@ -291,11 +389,11 @@ public class TaskExecutorTest extends TestLogger {
when(rmGateway1.registerTaskExecutor(
any(UUID.class), anyString(), any(ResourceID.class), any(SlotReport.class), any(Time.class)))
.thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(
- new TaskExecutorRegistrationSuccess(new InstanceID(), 10L)));
+ new TaskExecutorRegistrationSuccess(new InstanceID(), rmResourceId1, 10L)));
when(rmGateway2.registerTaskExecutor(
any(UUID.class), anyString(), any(ResourceID.class), any(SlotReport.class), any(Time.class)))
.thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(
- new TaskExecutorRegistrationSuccess(new InstanceID(), 10L)));
+ new TaskExecutorRegistrationSuccess(new InstanceID(), rmResourceId2, 10L)));
rpc.registerGateway(address1, rmGateway1);
rpc.registerGateway(address2, rmGateway2);
@@ -313,7 +411,7 @@ public class TaskExecutorTest extends TestLogger {
when(taskManagerServicesConfiguration.getTmpDirectories()).thenReturn(new String[1]);
TaskManagerLocation taskManagerLocation = mock(TaskManagerLocation.class);
- when(taskManagerLocation.getResourceID()).thenReturn(resourceID);
+ when(taskManagerLocation.getResourceID()).thenReturn(tmResourceID);
when(taskManagerLocation.getHostname()).thenReturn("foobar");
final TaskSlotTable taskSlotTable = mock(TaskSlotTable.class);
@@ -350,7 +448,7 @@ public class TaskExecutorTest extends TestLogger {
testLeaderService.notifyListener(address1, leaderId1);
verify(rmGateway1).registerTaskExecutor(
- eq(leaderId1), eq(taskManagerAddress), eq(resourceID), any(SlotReport.class), any(Time.class));
+ eq(leaderId1), eq(taskManagerAddress), eq(tmResourceID), any(SlotReport.class), any(Time.class));
assertNotNull(taskManager.getResourceManagerConnection());
// cancel the leader
@@ -360,7 +458,7 @@ public class TaskExecutorTest extends TestLogger {
testLeaderService.notifyListener(address2, leaderId2);
verify(rmGateway2).registerTaskExecutor(
- eq(leaderId2), eq(taskManagerAddress), eq(resourceID), eq(slotReport), any(Time.class));
+ eq(leaderId2), eq(taskManagerAddress), eq(tmResourceID), eq(slotReport), any(Time.class));
assertNotNull(taskManager.getResourceManagerConnection());
// check if a concurrent error occurred
@@ -531,6 +629,7 @@ public class TaskExecutorTest extends TestLogger {
final String resourceManagerAddress = "rm";
final UUID resourceManagerLeaderId = UUID.randomUUID();
+ final ResourceID resourceManagerResourceId = new ResourceID(resourceManagerAddress);
final ResourceManagerGateway resourceManagerGateway = mock(ResourceManagerGateway.class);
final InstanceID registrationId = new InstanceID();
@@ -540,7 +639,7 @@ public class TaskExecutorTest extends TestLogger {
any(String.class),
eq(resourceId),
any(SlotReport.class),
- any(Time.class))).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new TaskExecutorRegistrationSuccess(registrationId, 1000L)));
+ any(Time.class))).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new TaskExecutorRegistrationSuccess(registrationId, resourceManagerResourceId, 1000L)));
final String jobManagerAddress = "jm";
final UUID jobManagerLeaderId = UUID.randomUUID();
@@ -638,6 +737,7 @@ public class TaskExecutorTest extends TestLogger {
final String resourceManagerAddress = "rm";
final UUID resourceManagerLeaderId = UUID.randomUUID();
+ final ResourceID resourceManagerResourceId = new ResourceID(resourceManagerAddress);
final String jobManagerAddress = "jm";
final UUID jobManagerLeaderId = UUID.randomUUID();
@@ -655,7 +755,7 @@ public class TaskExecutorTest extends TestLogger {
any(String.class),
eq(resourceId),
any(SlotReport.class),
- any(Time.class))).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new TaskExecutorRegistrationSuccess(registrationId, 1000L)));
+ any(Time.class))).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new TaskExecutorRegistrationSuccess(registrationId, resourceManagerResourceId, 1000L)));
final ResourceID jmResourceId = new ResourceID(jobManagerAddress);
final int blobPort = 42;
@@ -844,6 +944,7 @@ public class TaskExecutorTest extends TestLogger {
final String resourceManagerAddress = "rm";
final UUID resourceManagerLeaderId = UUID.randomUUID();
+ final ResourceID resourceManagerResourceId = new ResourceID(resourceManagerAddress);
final String jobManagerAddress = "jm";
final UUID jobManagerLeaderId = UUID.randomUUID();
@@ -862,7 +963,7 @@ public class TaskExecutorTest extends TestLogger {
eq(resourceId),
any(SlotReport.class),
any(Time.class))).thenReturn(
- FlinkCompletableFuture.<RegistrationResponse>completed(new TaskExecutorRegistrationSuccess(registrationId, 1000L)));
+ FlinkCompletableFuture.<RegistrationResponse>completed(new TaskExecutorRegistrationSuccess(registrationId, resourceManagerResourceId, 1000L)));
final ResourceID jmResourceId = new ResourceID(jobManagerAddress);
final int blobPort = 42;
http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
index 7a0dbbe..ed672a3 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
@@ -198,15 +198,18 @@ public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicati
haServices,
commonRpcService.getScheduledExecutor());
- return new YarnResourceManager(config,
- ENV,
- commonRpcService,
- resourceManagerConfiguration,
- haServices,
- resourceManagerRuntimeServices.getSlotManagerFactory(),
- metricRegistry,
- resourceManagerRuntimeServices.getJobLeaderIdService(),
- this);
+ return new YarnResourceManager(
+ ResourceID.generate(),
+ config,
+ ENV,
+ commonRpcService,
+ resourceManagerConfiguration,
+ haServices,
+ heartbeatServices,
+ resourceManagerRuntimeServices.getSlotManagerFactory(),
+ metricRegistry,
+ resourceManagerRuntimeServices.getJobLeaderIdService(),
+ this);
}
private JobManagerRunner createJobManagerRunner(Configuration config) throws Exception{
http://git-wip-us.apache.org/repos/asf/flink/blob/fd90672f/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index ab96441..a308079 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
@@ -106,19 +107,23 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements
final private Map<ResourceProfile, Integer> resourcePriorities = new HashMap<>();
public YarnResourceManager(
+ ResourceID resourceId,
Configuration flinkConfig,
Map<String, String> env,
RpcService rpcService,
ResourceManagerConfiguration resourceManagerConfiguration,
HighAvailabilityServices highAvailabilityServices,
+ HeartbeatServices heartbeatServices,
SlotManagerFactory slotManagerFactory,
MetricRegistry metricRegistry,
JobLeaderIdService jobLeaderIdService,
FatalErrorHandler fatalErrorHandler) {
super(
+ resourceId,
rpcService,
resourceManagerConfiguration,
highAvailabilityServices,
+ heartbeatServices,
slotManagerFactory,
metricRegistry,
jobLeaderIdService,
@@ -231,7 +236,8 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements
public void onContainersCompleted(List<ContainerStatus> list) {
for (ContainerStatus container : list) {
if (container.getExitStatus() < 0) {
- notifyWorkerFailed(new ResourceID(container.getContainerId().toString()), container.getDiagnostics());
+ closeTaskManagerConnection(new ResourceID(
+ container.getContainerId().toString()), new Exception(container.getDiagnostics()));
}
}
}