You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/03/30 22:04:18 UTC
[07/50] [abbrv] flink git commit: [FLINK-4354] [heartbeat] Add
heartbeats between the ResourceManager and TaskExecutor
[FLINK-4354] [heartbeat] Add heartbeats between the ResourceManager and TaskExecutor
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/83b99f8a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/83b99f8a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/83b99f8a
Branch: refs/heads/table-retraction
Commit: 83b99f8a624ddf35deb934b4d4358582657998c6
Parents: fd90672
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Mar 22 12:03:45 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Mar 23 13:58:44 2017 +0100
----------------------------------------------------------------------
.../heartbeat/TestingHeartbeatManagerImpl.java | 63 -----------------
.../TestingHeartbeatManagerSenderImpl.java | 61 -----------------
.../heartbeat/TestingHeartbeatServices.java | 52 --------------
.../flink/runtime/jobmaster/JobMaster.java | 4 +-
.../flink/runtime/minicluster/MiniCluster.java | 7 +-
.../resourcemanager/ResourceManager.java | 71 +++++++++++---------
.../resourcemanager/ResourceManagerGateway.java | 7 +-
.../resourcemanager/ResourceManagerRunner.java | 2 +-
.../StandaloneResourceManager.java | 4 +-
.../runtime/taskexecutor/TaskExecutor.java | 63 ++++++++++-------
.../clusterframework/ResourceManagerTest.java | 6 +-
.../heartbeat/TestingHeartbeatServices.java | 52 ++++++++++++++
.../flink/runtime/jobmaster/JobMasterTest.java | 10 ++-
.../resourcemanager/ResourceManagerHATest.java | 3 +-
.../ResourceManagerJobMasterTest.java | 6 +-
.../ResourceManagerTaskExecutorTest.java | 7 +-
.../slotmanager/SlotProtocolTest.java | 6 +-
.../taskexecutor/TaskExecutorITCase.java | 3 +-
.../runtime/taskexecutor/TaskExecutorTest.java | 16 +++--
.../src/test/resources/log4j-test.properties | 2 +-
.../yarn/YarnFlinkApplicationMasterRunner.java | 3 +-
.../apache/flink/yarn/YarnResourceManager.java | 4 +-
22 files changed, 181 insertions(+), 271 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerImpl.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerImpl.java
deleted file mode 100644
index a6e056d..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerImpl.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.heartbeat;
-
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.ScheduledExecutor;
-import org.slf4j.Logger;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executor;
-
-/**
- * Heartbeat manager implementation which extends {@link HeartbeatManagerImpl} for testing.
- * It overrides the {@link #unmonitorTarget(ResourceID)} to wait for some tests complete
- * when notify heartbeat timeout.
- *
- * @param <I> Type of the incoming heartbeat payload
- * @param <O> Type of the outgoing heartbeat payload
- */
-public class TestingHeartbeatManagerImpl<I, O> extends HeartbeatManagerImpl<I, O> {
-
- private final CountDownLatch waitLatch;
-
- public TestingHeartbeatManagerImpl(
- CountDownLatch waitLatch,
- long heartbeatTimeoutIntervalMs,
- ResourceID ownResourceID,
- HeartbeatListener<I, O> heartbeatListener,
- Executor executor,
- ScheduledExecutor scheduledExecutor,
- Logger log) {
-
- super(heartbeatTimeoutIntervalMs, ownResourceID, heartbeatListener, executor, scheduledExecutor, log);
-
- this.waitLatch = waitLatch;
- }
-
- @Override
- public void unmonitorTarget(ResourceID resourceID) {
- try {
- waitLatch.await();
- } catch (InterruptedException ex) {
- log.error("Unexpected interrupted exception.", ex);
- }
-
- super.unmonitorTarget(resourceID);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerSenderImpl.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerSenderImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerSenderImpl.java
deleted file mode 100644
index 36f7e96..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerSenderImpl.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.heartbeat;
-
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.ScheduledExecutor;
-import org.slf4j.Logger;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executor;
-
-/**
- *
- * @param <I>
- * @param <O>
- */
-public class TestingHeartbeatManagerSenderImpl<I, O> extends HeartbeatManagerSenderImpl<I, O> {
-
- private final CountDownLatch waitLatch;
-
- public TestingHeartbeatManagerSenderImpl(
- CountDownLatch waitLatch,
- long heartbeatPeriod,
- long heartbeatTimeout,
- ResourceID ownResourceID,
- HeartbeatListener<I, O> heartbeatListener,
- Executor executor,
- ScheduledExecutor scheduledExecutor,
- Logger log) {
-
- super(heartbeatPeriod, heartbeatTimeout, ownResourceID, heartbeatListener, executor, scheduledExecutor, log);
-
- this.waitLatch = waitLatch;
- }
-
- @Override
- public void unmonitorTarget(ResourceID resourceID) {
- try {
- waitLatch.await();
- } catch (InterruptedException ex) {
- log.error("Unexpected interrupted exception.", ex);
- }
-
- super.unmonitorTarget(resourceID);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatServices.java
deleted file mode 100644
index e628db5..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatServices.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.heartbeat;
-
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.ScheduledExecutor;
-import org.apache.flink.util.Preconditions;
-import org.slf4j.Logger;
-
-public class TestingHeartbeatServices extends HeartbeatServices {
-
- private final ScheduledExecutor scheduledExecutorToUse;
-
- public TestingHeartbeatServices(long heartbeatInterval, long heartbeatTimeout, ScheduledExecutor scheduledExecutorToUse) {
- super(heartbeatInterval, heartbeatTimeout);
-
- this.scheduledExecutorToUse = Preconditions.checkNotNull(scheduledExecutorToUse);
- }
-
- @Override
- public <I, O> HeartbeatManager<I, O> createHeartbeatManagerSender(
- ResourceID resourceId,
- HeartbeatListener<I, O> heartbeatListener,
- ScheduledExecutor scheduledExecutor,
- Logger log) {
-
- return new HeartbeatManagerSenderImpl<>(
- heartbeatInterval,
- heartbeatTimeout,
- resourceId,
- heartbeatListener,
- org.apache.flink.runtime.concurrent.Executors.directExecutor(),
- scheduledExecutorToUse,
- log);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 81fc541..080b48e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -1043,11 +1043,11 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
@Override
public void notifyHeartbeatTimeout(ResourceID resourceID) {
- log.info("Task manager with id {} heartbeat timed out.", resourceID);
+ log.info("Heartbeat of TaskManager with id {} timed out.", resourceID);
getSelf().disconnectTaskManager(
resourceID,
- new TimeoutException("Task manager with id " + resourceID + " heartbeat timed out."));
+ new TimeoutException("Heartbeat of TaskManager with id " + resourceID + " timed out."));
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 2cfba7b..9d5f9d1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -241,7 +241,12 @@ public class MiniCluster {
// bring up the ResourceManager(s)
LOG.info("Starting {} ResourceManger(s)", numResourceManagers);
resourceManagerRunners = startResourceManagers(
- configuration, haServices, heartbeatServices, metricRegistry, numResourceManagers, resourceManagerRpcServices);
+ configuration,
+ haServices,
+ heartbeatServices,
+ metricRegistry,
+ numResourceManagers,
+ resourceManagerRpcServices);
// bring up the TaskManager(s) for the mini cluster
LOG.info("Starting {} TaskManger(s)", numTaskManagers);
http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 9a7a790..5467177 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -64,6 +64,7 @@ import org.apache.flink.util.ExceptionUtils;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
+import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -129,8 +130,8 @@ public abstract class ResourceManager<WorkerType extends Serializable>
private ConcurrentMap<String, InfoMessageListenerRpcGateway> infoMessageListeners;
public ResourceManager(
- ResourceID resourceId,
RpcService rpcService,
+ ResourceID resourceId,
ResourceManagerConfiguration resourceManagerConfiguration,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
@@ -359,7 +360,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
final ResourceID taskExecutorResourceId,
final SlotReport slotReport) {
- if (leaderSessionId.equals(resourceManagerLeaderId)) {
+ if (Objects.equals(leaderSessionId, resourceManagerLeaderId)) {
Future<TaskExecutorGateway> taskExecutorGatewayFuture = getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class);
return taskExecutorGatewayFuture.handleAsync(new BiFunction<TaskExecutorGateway, Throwable, RegistrationResponse>() {
@@ -384,7 +385,8 @@ public abstract class ResourceManager<WorkerType extends Serializable>
taskManagerHeartbeatManager.monitorTarget(taskExecutorResourceId, new HeartbeatTarget<Void>() {
@Override
public void receiveHeartbeat(ResourceID resourceID, Void payload) {
- // the task manager will not request heartbeat, so this method will never be called currently
+ // the ResourceManager will always send heartbeat requests to the
+ // TaskManager
}
@Override
@@ -394,7 +396,8 @@ public abstract class ResourceManager<WorkerType extends Serializable>
});
return new TaskExecutorRegistrationSuccess(
- registration.getInstanceID(), resourceId,
+ registration.getInstanceID(),
+ resourceId,
resourceManagerConfiguration.getHeartbeatInterval().toMilliseconds());
}
}
@@ -607,6 +610,30 @@ public abstract class ResourceManager<WorkerType extends Serializable>
}
/**
+ * This method should be called by the framework once it detects that a currently registered
+ * task executor has failed.
+ *
+ * @param resourceID Id of the TaskManager that has failed.
+ * @param cause The exception which cause the TaskManager failed.
+ */
+ protected void closeTaskManagerConnection(final ResourceID resourceID, final Exception cause) {
+ taskManagerHeartbeatManager.unmonitorTarget(resourceID);
+
+ WorkerRegistration<WorkerType> workerRegistration = taskExecutors.remove(resourceID);
+
+ if (workerRegistration != null) {
+ log.info("Task manager {} failed because {}.", resourceID, cause);
+
+ // TODO :: suggest failed task executor to stop itself
+ slotManager.notifyTaskManagerFailure(resourceID);
+
+ workerRegistration.getTaskExecutorGateway().disconnectResourceManager(cause);
+ } else {
+ log.debug("Could not find a registered task manager with the process id {}.", resourceID);
+ }
+ }
+
+ /**
* Checks whether the given resource manager leader id is matching the current leader id and
* not null.
*
@@ -756,30 +783,6 @@ public abstract class ResourceManager<WorkerType extends Serializable>
onFatalErrorAsync(new ResourceManagerException("Received an error from the LeaderElectionService.", exception));
}
- /**
- * This method should be called by the framework once it detects that a currently registered
- * task executor has failed.
- *
- * @param resourceID Id of the TaskManager that has failed.
- * @param cause The exception which cause the TaskManager failed.
- */
- public void closeTaskManagerConnection(final ResourceID resourceID, final Exception cause) {
- taskManagerHeartbeatManager.unmonitorTarget(resourceID);
-
- WorkerRegistration<WorkerType> workerRegistration = taskExecutors.remove(resourceID);
-
- if (workerRegistration != null) {
- log.info("Task manager {} failed because {}.", resourceID, cause);
-
- // TODO :: suggest failed task executor to stop itself
- slotManager.notifyTaskManagerFailure(resourceID);
-
- workerRegistration.getTaskExecutorGateway().disconnectResourceManager(cause);
- } else {
- log.debug("Could not find a registered task manager with the process id {}.", resourceID);
- }
- }
-
// ------------------------------------------------------------------------
// Framework specific behavior
// ------------------------------------------------------------------------
@@ -875,11 +878,17 @@ public abstract class ResourceManager<WorkerType extends Serializable>
private class TaskManagerHeartbeatListener implements HeartbeatListener<Void, Void> {
@Override
- public void notifyHeartbeatTimeout(ResourceID resourceID) {
+ public void notifyHeartbeatTimeout(final ResourceID resourceID) {
log.info("The heartbeat of TaskManager with id {} timed out.", resourceID);
- closeTaskManagerConnection(resourceID, new TimeoutException(
- "Task manager with id " + resourceID + " heartbeat timed out."));
+ runAsync(new Runnable() {
+ @Override
+ public void run() {
+ closeTaskManagerConnection(
+ resourceID,
+ new TimeoutException("Task manager with id " + resourceID + " heartbeat timed out."));
+ }
+ });
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index 7741e0d..cda4a7c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -134,13 +134,12 @@ public interface ResourceManagerGateway extends RpcGateway {
/**
* Sends the heartbeat to resource manager from task manager
*
- * @param resourceID unique id of the task manager
+ * @param heartbeatOrigin unique id of the task manager
*/
- void heartbeatFromTaskManager(final ResourceID resourceID);
+ void heartbeatFromTaskManager(final ResourceID heartbeatOrigin);
/**
- * Disconnects the given {@link org.apache.flink.runtime.taskexecutor.TaskExecutor} from the
- * {@link ResourceManager}.
+ * Disconnects a TaskManager specified by the given resourceID from the {@link ResourceManager}.
*
* @param resourceID identifying the TaskManager to disconnect
* @param cause for the disconnection of the TaskManager
http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
index d07e373..3a8baa6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
@@ -69,8 +69,8 @@ public class ResourceManagerRunner implements FatalErrorHandler {
rpcService.getScheduledExecutor());
this.resourceManager = new StandaloneResourceManager(
- resourceId,
rpcService,
+ resourceId,
resourceManagerConfiguration,
highAvailabilityServices,
heartbeatServices,
http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
index e2d6538..fd5a001 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
@@ -38,8 +38,8 @@ import org.apache.flink.runtime.rpc.RpcService;
public class StandaloneResourceManager extends ResourceManager<ResourceID> {
public StandaloneResourceManager(
- ResourceID resourceId,
RpcService rpcService,
+ ResourceID resourceId,
ResourceManagerConfiguration resourceManagerConfiguration,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
@@ -48,8 +48,8 @@ public class StandaloneResourceManager extends ResourceManager<ResourceID> {
JobLeaderIdService jobLeaderIdService,
FatalErrorHandler fatalErrorHandler) {
super(
- resourceId,
rpcService,
+ resourceId,
resourceManagerConfiguration,
highAvailabilityServices,
heartbeatServices,
http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index f3e1ff3..4883e7d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -696,17 +696,35 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
}
}
+ private void establishResourceManagerConnection(ResourceID resourceManagerResourceId) {
+ // monitor the resource manager as heartbeat target
+ resourceManagerHeartbeatManager.monitorTarget(resourceManagerResourceId, new HeartbeatTarget<Void>() {
+ @Override
+ public void receiveHeartbeat(ResourceID resourceID, Void payload) {
+ ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway();
+ resourceManagerGateway.heartbeatFromTaskManager(resourceID);
+ }
+
+ @Override
+ public void requestHeartbeat(ResourceID resourceID, Void payload) {
+ // the TaskManager won't send heartbeat requests to the ResourceManager
+ }
+ });
+ }
+
private void closeResourceManagerConnection(Exception cause) {
- log.info("Close ResourceManager connection for {}.", cause);
+ validateRunsInMainThread();
if (isConnectedToResourceManager()) {
+ log.info("Close ResourceManager connection {}.", resourceManagerConnection.getResourceManagerId(), cause);
+
resourceManagerHeartbeatManager.unmonitorTarget(resourceManagerConnection.getResourceManagerId());
ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway();
+ resourceManagerGateway.disconnectTaskManager(getResourceID(), cause);
+
resourceManagerConnection.close();
resourceManagerConnection = null;
-
- resourceManagerGateway.disconnectTaskManager(getResourceID(), cause);
}
}
@@ -790,7 +808,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
"and returning them to the ResourceManager.", throwable);
// We encountered an exception. Free the slots and return them to the RM.
- for (SlotOffer reservedSlot : reservedSlots) {
+ for (SlotOffer reservedSlot: reservedSlots) {
freeSlot(reservedSlot.getAllocationId(), throwable);
}
}
@@ -841,6 +859,8 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
}
private void closeJobManagerConnection(JobID jobId, Exception cause) {
+ validateRunsInMainThread();
+
log.info("Close JobManager connection for job {}.", jobId);
// 1. fail tasks running under this JobID
@@ -1183,21 +1203,14 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
public void onRegistrationSuccess(TaskExecutorRegistrationSuccess success) {
final ResourceID resourceManagerId = success.getResourceManagerId();
- // monitor the resource manager as heartbeat target
- resourceManagerHeartbeatManager.monitorTarget(resourceManagerId, new HeartbeatTarget<Void>() {
- @Override
- public void receiveHeartbeat(ResourceID resourceID, Void payload) {
- if (isConnectedToResourceManager()) {
- ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway();
- resourceManagerGateway.heartbeatFromTaskManager(resourceID);
+ runAsync(
+ new Runnable() {
+ @Override
+ public void run() {
+ establishResourceManagerConnection(resourceManagerId);
}
}
-
- @Override
- public void requestHeartbeat(ResourceID resourceID, Void payload) {
- // request heartbeat will never be called on the task manager side
- }
- });
+ );
}
@Override
@@ -1277,14 +1290,15 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
runAsync(new Runnable() {
@Override
public void run() {
- log.info("Job manager with id {} heartbeat timed out.", resourceID);
+ log.info("The heartbeat of JobManager with id {} timed out.", resourceID);
if (jobManagerConnections.containsKey(resourceID)) {
JobManagerConnection jobManagerConnection = jobManagerConnections.get(resourceID);
+
if (jobManagerConnection != null) {
closeJobManagerConnection(
jobManagerConnection.getJobID(),
- new TimeoutException("Job manager with id " + resourceID + " heartbeat timed out."));
+ new TimeoutException("The heartbeat of JobManager with id " + resourceID + " timed out."));
}
}
}
@@ -1305,16 +1319,15 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
private class ResourceManagerHeartbeatListener implements HeartbeatListener<Void, Void> {
@Override
- public void notifyHeartbeatTimeout(final ResourceID resourceID) {
+ public void notifyHeartbeatTimeout(final ResourceID resourceId) {
runAsync(new Runnable() {
@Override
public void run() {
- log.info("Resource manager with id {} heartbeat timed out.", resourceID);
+ log.info("The heartbeat of ResourceManager with id {} timed out.", resourceId);
- if (isConnectedToResourceManager() && resourceManagerConnection.getResourceManagerId().equals(resourceID)) {
- closeResourceManagerConnection(
- new TimeoutException("Resource manager with id " + resourceID + " heartbeat timed out."));
- }
+ closeResourceManagerConnection(
+ new TimeoutException(
+ "The heartbeat of ResourceManager with id " + resourceId + " timed out."));
}
});
}
http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
index e7f2439..72925bb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
@@ -53,6 +53,7 @@ import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.TestingResourceManager;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -76,7 +77,7 @@ import static org.mockito.Mockito.verify;
/**
* General tests for the resource manager component.
*/
-public class ResourceManagerTest {
+public class ResourceManagerTest extends TestLogger {
private static ActorSystem system;
@@ -393,8 +394,7 @@ public class ResourceManagerTest {
try {
final StandaloneResourceManager resourceManager = new StandaloneResourceManager(
- resourceManagerResourceID,
- rpcService,
+ rpcService, resourceManagerResourceID,
resourceManagerConfiguration,
highAvailabilityServices,
heartbeatServices,
http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatServices.java
new file mode 100644
index 0000000..e628db5
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatServices.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.heartbeat;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+
+public class TestingHeartbeatServices extends HeartbeatServices {
+
+ private final ScheduledExecutor scheduledExecutorToUse;
+
+ public TestingHeartbeatServices(long heartbeatInterval, long heartbeatTimeout, ScheduledExecutor scheduledExecutorToUse) {
+ super(heartbeatInterval, heartbeatTimeout);
+
+ this.scheduledExecutorToUse = Preconditions.checkNotNull(scheduledExecutorToUse);
+ }
+
+ @Override
+ public <I, O> HeartbeatManager<I, O> createHeartbeatManagerSender(
+ ResourceID resourceId,
+ HeartbeatListener<I, O> heartbeatListener,
+ ScheduledExecutor scheduledExecutor,
+ Logger log) {
+
+ return new HeartbeatManagerSenderImpl<>(
+ heartbeatInterval,
+ heartbeatTimeout,
+ resourceId,
+ heartbeatListener,
+ org.apache.flink.runtime.concurrent.Executors.directExecutor(),
+ scheduledExecutorToUse,
+ log);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 73da244..ee8f51d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -26,7 +26,8 @@ import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoader;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
-import org.apache.flink.runtime.heartbeat.*;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.OnCompletionActions;
@@ -49,8 +50,11 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
@RunWith(PowerMockRunner.class)
@PrepareForTest(BlobLibraryCacheManager.class)
@@ -139,4 +143,6 @@ public class JobMasterTest extends TestLogger {
rpc.stopService();
}
}
+
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
index 39594df..c8e209d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
@@ -68,8 +68,7 @@ public class ResourceManagerHATest {
final ResourceManager resourceManager =
new StandaloneResourceManager(
- rmResourceId,
- rpcService,
+ rpcService, rmResourceId,
resourceManagerConfiguration,
highAvailabilityServices,
heartbeatServices,
http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
index 0401f9e..32b40ac 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.TestingSerialRpcService;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -45,7 +46,7 @@ import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.*;
-public class ResourceManagerJobMasterTest {
+public class ResourceManagerJobMasterTest extends TestLogger {
private TestingSerialRpcService rpcService;
@@ -216,8 +217,7 @@ public class ResourceManagerJobMasterTest {
Time.minutes(5L));
ResourceManager resourceManager = new StandaloneResourceManager(
- rmResourceId,
- rpcService,
+ rpcService, rmResourceId,
resourceManagerConfiguration,
highAvailabilityServices,
heartbeatServices,
http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
index 7c811d9..cb0a414 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -43,7 +44,7 @@ import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
-public class ResourceManagerTaskExecutorTest {
+public class ResourceManagerTaskExecutorTest extends TestLogger {
private TestingSerialRpcService rpcService;
@@ -148,7 +149,7 @@ public class ResourceManagerTaskExecutorTest {
private StandaloneResourceManager createAndStartResourceManager(TestingLeaderElectionService rmLeaderElectionService, FatalErrorHandler fatalErrorHandler) throws Exception {
TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
- HeartbeatServices heartbeatServices = mock(HeartbeatServices.class);
+ HeartbeatServices heartbeatServices = new HeartbeatServices(5L, 5L);
highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService);
TestingSlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory();
ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration(
@@ -163,8 +164,8 @@ public class ResourceManagerTaskExecutorTest {
StandaloneResourceManager resourceManager =
new StandaloneResourceManager(
- resourceManagerResourceID,
rpcService,
+ resourceManagerResourceID,
resourceManagerConfiguration,
highAvailabilityServices,
heartbeatServices,
http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
index 28ed697..68aff42 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
@@ -237,8 +237,7 @@ public class SlotProtocolTest extends TestLogger {
ResourceManager<ResourceID> resourceManager =
Mockito.spy(new StandaloneResourceManager(
- rmResourceId,
- testRpcService,
+ testRpcService, rmResourceId,
resourceManagerConfiguration,
testingHaServices,
heartbeatServices,
@@ -325,8 +324,7 @@ public class SlotProtocolTest extends TestLogger {
JobLeaderIdService jobLeaderIdService,
FatalErrorHandler fatalErrorHandler) {
super(
- resourceId,
- rpcService,
+ rpcService, resourceId,
resourceManagerConfiguration,
highAvailabilityServices,
heartbeatServices,
http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
index 4e76486..1789ace 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
@@ -120,8 +120,7 @@ public class TaskExecutorITCase {
final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation);
ResourceManager<ResourceID> resourceManager = new StandaloneResourceManager(
- rmResourceId,
- rpcService,
+ rpcService, rmResourceId,
resourceManagerConfiguration,
testingHAServices,
heartbeatServices,
http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index d1f6e2e..330d4fb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -224,13 +224,19 @@ public class TaskExecutorTest extends TestLogger {
ResourceManagerGateway rmGateway = mock(ResourceManagerGateway.class);
when(rmGateway.registerTaskExecutor(
any(UUID.class), anyString(), any(ResourceID.class), any(SlotReport.class), any(Time.class)))
- .thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(
- new TaskExecutorRegistrationSuccess(new InstanceID(), rmResourceId, 10L)));
+ .thenReturn(
+ FlinkCompletableFuture.<RegistrationResponse>completed(
+ new TaskExecutorRegistrationSuccess(
+ new InstanceID(),
+ rmResourceId,
+ 10L)));
final TestingSerialRpcService rpc = new TestingSerialRpcService();
rpc.registerGateway(rmAddress, rmGateway);
- final TestingLeaderRetrievalService testLeaderService = new TestingLeaderRetrievalService();
+ final TestingLeaderRetrievalService testLeaderService = new TestingLeaderRetrievalService(
+ null,
+ null);
final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
haServices.setResourceManagerLeaderRetriever(testLeaderService);
@@ -292,11 +298,11 @@ public class TaskExecutorTest extends TestLogger {
testLeaderService.notifyListener(rmAddress, rmLeaderId);
// register resource manager success will trigger monitoring heartbeat target between tm and rm
- verify(rmGateway).registerTaskExecutor(
+ verify(rmGateway, atLeast(1)).registerTaskExecutor(
eq(rmLeaderId), eq(taskManager.getAddress()), eq(tmResourceId), any(SlotReport.class), any(Time.class));
// heartbeat timeout should trigger disconnect TaskManager from ResourceManager
- verify(rmGateway, timeout(heartbeatTimeout * 5)).disconnectTaskManager(eq(taskManagerLocation.getResourceID()), any(TimeoutException.class));
+ verify(rmGateway, timeout(heartbeatTimeout * 50L)).disconnectTaskManager(eq(taskManagerLocation.getResourceID()), any(TimeoutException.class));
// check if a concurrent error occurred
testingFatalErrorHandler.rethrowError();
http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/resources/log4j-test.properties b/flink-runtime/src/test/resources/log4j-test.properties
index 98f136a..7ba1633 100644
--- a/flink-runtime/src/test/resources/log4j-test.properties
+++ b/flink-runtime/src/test/resources/log4j-test.properties
@@ -16,7 +16,7 @@
# limitations under the License.
################################################################################
-log4j.rootLogger=INFO, console
+log4j.rootLogger=OFF, console
# -----------------------------------------------------------------------------
# Console (use 'console')
http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
index ed672a3..21e6e45 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
@@ -199,10 +199,9 @@ public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicati
commonRpcService.getScheduledExecutor());
return new YarnResourceManager(
- ResourceID.generate(),
+ commonRpcService, ResourceID.generate(),
config,
ENV,
- commonRpcService,
resourceManagerConfiguration,
haServices,
heartbeatServices,
http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index a308079..f8cf275 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -107,10 +107,10 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements
final private Map<ResourceProfile, Integer> resourcePriorities = new HashMap<>();
public YarnResourceManager(
+ RpcService rpcService,
ResourceID resourceId,
Configuration flinkConfig,
Map<String, String> env,
- RpcService rpcService,
ResourceManagerConfiguration resourceManagerConfiguration,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
@@ -119,8 +119,8 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements
JobLeaderIdService jobLeaderIdService,
FatalErrorHandler fatalErrorHandler) {
super(
- resourceId,
rpcService,
+ resourceId,
resourceManagerConfiguration,
highAvailabilityServices,
heartbeatServices,