You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/03/30 22:04:18 UTC

[07/50] [abbrv] flink git commit: [FLINK-4354] [heartbeat] Add heartbeats between the ResourceManager and TaskExecutor

[FLINK-4354] [heartbeat] Add heartbeats between the ResourceManager and TaskExecutor


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/83b99f8a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/83b99f8a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/83b99f8a

Branch: refs/heads/table-retraction
Commit: 83b99f8a624ddf35deb934b4d4358582657998c6
Parents: fd90672
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Mar 22 12:03:45 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Mar 23 13:58:44 2017 +0100

----------------------------------------------------------------------
 .../heartbeat/TestingHeartbeatManagerImpl.java  | 63 -----------------
 .../TestingHeartbeatManagerSenderImpl.java      | 61 -----------------
 .../heartbeat/TestingHeartbeatServices.java     | 52 --------------
 .../flink/runtime/jobmaster/JobMaster.java      |  4 +-
 .../flink/runtime/minicluster/MiniCluster.java  |  7 +-
 .../resourcemanager/ResourceManager.java        | 71 +++++++++++---------
 .../resourcemanager/ResourceManagerGateway.java |  7 +-
 .../resourcemanager/ResourceManagerRunner.java  |  2 +-
 .../StandaloneResourceManager.java              |  4 +-
 .../runtime/taskexecutor/TaskExecutor.java      | 63 ++++++++++-------
 .../clusterframework/ResourceManagerTest.java   |  6 +-
 .../heartbeat/TestingHeartbeatServices.java     | 52 ++++++++++++++
 .../flink/runtime/jobmaster/JobMasterTest.java  | 10 ++-
 .../resourcemanager/ResourceManagerHATest.java  |  3 +-
 .../ResourceManagerJobMasterTest.java           |  6 +-
 .../ResourceManagerTaskExecutorTest.java        |  7 +-
 .../slotmanager/SlotProtocolTest.java           |  6 +-
 .../taskexecutor/TaskExecutorITCase.java        |  3 +-
 .../runtime/taskexecutor/TaskExecutorTest.java  | 16 +++--
 .../src/test/resources/log4j-test.properties    |  2 +-
 .../yarn/YarnFlinkApplicationMasterRunner.java  |  3 +-
 .../apache/flink/yarn/YarnResourceManager.java  |  4 +-
 22 files changed, 181 insertions(+), 271 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerImpl.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerImpl.java
deleted file mode 100644
index a6e056d..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerImpl.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.heartbeat;
-
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.ScheduledExecutor;
-import org.slf4j.Logger;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executor;
-
-/**
- * Heartbeat manager implementation which extends {@link HeartbeatManagerImpl} for testing.
- * It overrides the {@link #unmonitorTarget(ResourceID)} to wait for some tests complete
- * when notify heartbeat timeout.
- *
- * @param <I> Type of the incoming heartbeat payload
- * @param <O> Type of the outgoing heartbeat payload
- */
-public class TestingHeartbeatManagerImpl<I, O> extends HeartbeatManagerImpl<I, O> {
-
-	private final CountDownLatch waitLatch;
-
-	public TestingHeartbeatManagerImpl(
-			CountDownLatch waitLatch,
-			long heartbeatTimeoutIntervalMs,
-			ResourceID ownResourceID,
-			HeartbeatListener<I, O> heartbeatListener,
-			Executor executor,
-			ScheduledExecutor scheduledExecutor,
-			Logger log) {
-
-		super(heartbeatTimeoutIntervalMs, ownResourceID, heartbeatListener, executor, scheduledExecutor, log);
-
-		this.waitLatch = waitLatch;
-	}
-
-	@Override
-	public void unmonitorTarget(ResourceID resourceID) {
-		try {
-			waitLatch.await();
-		} catch (InterruptedException ex) {
-			log.error("Unexpected interrupted exception.", ex);
-		}
-
-		super.unmonitorTarget(resourceID);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerSenderImpl.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerSenderImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerSenderImpl.java
deleted file mode 100644
index 36f7e96..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatManagerSenderImpl.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.heartbeat;
-
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.ScheduledExecutor;
-import org.slf4j.Logger;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executor;
-
-/**
- *
- * @param <I>
- * @param <O>
- */
-public class TestingHeartbeatManagerSenderImpl<I, O> extends HeartbeatManagerSenderImpl<I, O> {
-
-	private final CountDownLatch waitLatch;
-
-	public TestingHeartbeatManagerSenderImpl(
-			CountDownLatch waitLatch,
-			long heartbeatPeriod,
-			long heartbeatTimeout,
-			ResourceID ownResourceID,
-			HeartbeatListener<I, O> heartbeatListener,
-			Executor executor,
-			ScheduledExecutor scheduledExecutor,
-			Logger log) {
-
-		super(heartbeatPeriod, heartbeatTimeout, ownResourceID, heartbeatListener, executor, scheduledExecutor, log);
-
-		this.waitLatch = waitLatch;
-	}
-
-	@Override
-	public void unmonitorTarget(ResourceID resourceID) {
-		try {
-			waitLatch.await();
-		} catch (InterruptedException ex) {
-			log.error("Unexpected interrupted exception.", ex);
-		}
-
-		super.unmonitorTarget(resourceID);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatServices.java
deleted file mode 100644
index e628db5..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatServices.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.heartbeat;
-
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.ScheduledExecutor;
-import org.apache.flink.util.Preconditions;
-import org.slf4j.Logger;
-
-public class TestingHeartbeatServices extends HeartbeatServices {
-
-	private final ScheduledExecutor scheduledExecutorToUse;
-
-	public TestingHeartbeatServices(long heartbeatInterval, long heartbeatTimeout, ScheduledExecutor scheduledExecutorToUse) {
-		super(heartbeatInterval, heartbeatTimeout);
-
-		this.scheduledExecutorToUse = Preconditions.checkNotNull(scheduledExecutorToUse);
-	}
-
-	@Override
-	public <I, O> HeartbeatManager<I, O> createHeartbeatManagerSender(
-		ResourceID resourceId,
-		HeartbeatListener<I, O> heartbeatListener,
-		ScheduledExecutor scheduledExecutor,
-		Logger log) {
-
-		return new HeartbeatManagerSenderImpl<>(
-			heartbeatInterval,
-			heartbeatTimeout,
-			resourceId,
-			heartbeatListener,
-			org.apache.flink.runtime.concurrent.Executors.directExecutor(),
-			scheduledExecutorToUse,
-			log);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 81fc541..080b48e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -1043,11 +1043,11 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 
 		@Override
 		public void notifyHeartbeatTimeout(ResourceID resourceID) {
-			log.info("Task manager with id {} heartbeat timed out.", resourceID);
+			log.info("Heartbeat of TaskManager with id {} timed out.", resourceID);
 
 			getSelf().disconnectTaskManager(
 				resourceID,
-				new TimeoutException("Task manager with id " + resourceID + " heartbeat timed out."));
+				new TimeoutException("Heartbeat of TaskManager with id " + resourceID + " timed out."));
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 2cfba7b..9d5f9d1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -241,7 +241,12 @@ public class MiniCluster {
 				// bring up the ResourceManager(s)
 				LOG.info("Starting {} ResourceManger(s)", numResourceManagers);
 				resourceManagerRunners = startResourceManagers(
-						configuration, haServices, heartbeatServices, metricRegistry, numResourceManagers, resourceManagerRpcServices);
+					configuration,
+					haServices,
+					heartbeatServices,
+					metricRegistry,
+					numResourceManagers,
+					resourceManagerRpcServices);
 
 				// bring up the TaskManager(s) for the mini cluster
 				LOG.info("Starting {} TaskManger(s)", numTaskManagers);

http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 9a7a790..5467177 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -64,6 +64,7 @@ import org.apache.flink.util.ExceptionUtils;
 import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -129,8 +130,8 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 	private ConcurrentMap<String, InfoMessageListenerRpcGateway> infoMessageListeners;
 
 	public ResourceManager(
-			ResourceID resourceId,
 			RpcService rpcService,
+			ResourceID resourceId,
 			ResourceManagerConfiguration resourceManagerConfiguration,
 			HighAvailabilityServices highAvailabilityServices,
 			HeartbeatServices heartbeatServices,
@@ -359,7 +360,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 		final ResourceID taskExecutorResourceId,
 		final SlotReport slotReport) {
 
-		if (leaderSessionId.equals(resourceManagerLeaderId)) {
+		if (Objects.equals(leaderSessionId, resourceManagerLeaderId)) {
 			Future<TaskExecutorGateway> taskExecutorGatewayFuture = getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class);
 
 			return taskExecutorGatewayFuture.handleAsync(new BiFunction<TaskExecutorGateway, Throwable, RegistrationResponse>() {
@@ -384,7 +385,8 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 						taskManagerHeartbeatManager.monitorTarget(taskExecutorResourceId, new HeartbeatTarget<Void>() {
 							@Override
 							public void receiveHeartbeat(ResourceID resourceID, Void payload) {
-								// the task manager will not request heartbeat, so this method will never be called currently
+								// the ResourceManager will always send heartbeat requests to the
+								// TaskManager
 							}
 
 							@Override
@@ -394,7 +396,8 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 						});
 
 						return new TaskExecutorRegistrationSuccess(
-							registration.getInstanceID(), resourceId,
+							registration.getInstanceID(),
+							resourceId,
 							resourceManagerConfiguration.getHeartbeatInterval().toMilliseconds());
 					}
 				}
@@ -607,6 +610,30 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 	}
 
 	/**
+	 * This method should be called by the framework once it detects that a currently registered
+	 * task executor has failed.
+	 *
+	 * @param resourceID Id of the TaskManager that has failed.
+	 * @param cause The exception which cause the TaskManager failed.
+	 */
+	protected void closeTaskManagerConnection(final ResourceID resourceID, final Exception cause) {
+		taskManagerHeartbeatManager.unmonitorTarget(resourceID);
+
+		WorkerRegistration<WorkerType> workerRegistration = taskExecutors.remove(resourceID);
+
+		if (workerRegistration != null) {
+			log.info("Task manager {} failed because {}.", resourceID, cause);
+
+			// TODO :: suggest failed task executor to stop itself
+			slotManager.notifyTaskManagerFailure(resourceID);
+
+			workerRegistration.getTaskExecutorGateway().disconnectResourceManager(cause);
+		} else {
+			log.debug("Could not find a registered task manager with the process id {}.", resourceID);
+		}
+	}
+
+	/**
 	 * Checks whether the given resource manager leader id is matching the current leader id and
 	 * not null.
 	 *
@@ -756,30 +783,6 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 		onFatalErrorAsync(new ResourceManagerException("Received an error from the LeaderElectionService.", exception));
 	}
 
-	/**
-	 * This method should be called by the framework once it detects that a currently registered
-	 * task executor has failed.
-	 *
-	 * @param resourceID Id of the TaskManager that has failed.
-	 * @param cause The exception which cause the TaskManager failed.
-	 */
-	public void closeTaskManagerConnection(final ResourceID resourceID, final Exception cause) {
-		taskManagerHeartbeatManager.unmonitorTarget(resourceID);
-
-		WorkerRegistration<WorkerType> workerRegistration = taskExecutors.remove(resourceID);
-
-		if (workerRegistration != null) {
-			log.info("Task manager {} failed because {}.", resourceID, cause);
-
-			// TODO :: suggest failed task executor to stop itself
-			slotManager.notifyTaskManagerFailure(resourceID);
-
-			workerRegistration.getTaskExecutorGateway().disconnectResourceManager(cause);
-		} else {
-			log.debug("Could not find a registered task manager with the process id {}.", resourceID);
-		}
-	}
-
 	// ------------------------------------------------------------------------
 	//  Framework specific behavior
 	// ------------------------------------------------------------------------
@@ -875,11 +878,17 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 	private class TaskManagerHeartbeatListener implements HeartbeatListener<Void, Void> {
 
 		@Override
-		public void notifyHeartbeatTimeout(ResourceID resourceID) {
+		public void notifyHeartbeatTimeout(final ResourceID resourceID) {
 			log.info("The heartbeat of TaskManager with id {} timed out.", resourceID);
 
-			closeTaskManagerConnection(resourceID, new TimeoutException(
-					"Task manager with id " + resourceID + " heartbeat timed out."));
+			runAsync(new Runnable() {
+				@Override
+				public void run() {
+					closeTaskManagerConnection(
+						resourceID,
+						new TimeoutException("Task manager with id " + resourceID + " heartbeat timed out."));
+				}
+			});
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index 7741e0d..cda4a7c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -134,13 +134,12 @@ public interface ResourceManagerGateway extends RpcGateway {
 	/**
 	 * Sends the heartbeat to resource manager from task manager
 	 *
-	 * @param resourceID unique id of the task manager
+	 * @param heartbeatOrigin unique id of the task manager
 	 */
-	void heartbeatFromTaskManager(final ResourceID resourceID);
+	void heartbeatFromTaskManager(final ResourceID heartbeatOrigin);
 
 	/**
-	 * Disconnects the given {@link org.apache.flink.runtime.taskexecutor.TaskExecutor} from the
-	 * {@link ResourceManager}.
+	 * Disconnects a TaskManager specified by the given resourceID from the {@link ResourceManager}.
 	 *
 	 * @param resourceID identifying the TaskManager to disconnect
 	 * @param cause for the disconnection of the TaskManager

http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
index d07e373..3a8baa6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
@@ -69,8 +69,8 @@ public class ResourceManagerRunner implements FatalErrorHandler {
 			rpcService.getScheduledExecutor());
 
 		this.resourceManager = new StandaloneResourceManager(
-			resourceId,
 			rpcService,
+			resourceId,
 			resourceManagerConfiguration,
 			highAvailabilityServices,
 			heartbeatServices,

http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
index e2d6538..fd5a001 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
@@ -38,8 +38,8 @@ import org.apache.flink.runtime.rpc.RpcService;
 public class StandaloneResourceManager extends ResourceManager<ResourceID> {
 
 	public StandaloneResourceManager(
-			ResourceID resourceId,
 			RpcService rpcService,
+			ResourceID resourceId,
 			ResourceManagerConfiguration resourceManagerConfiguration,
 			HighAvailabilityServices highAvailabilityServices,
 			HeartbeatServices heartbeatServices,
@@ -48,8 +48,8 @@ public class StandaloneResourceManager extends ResourceManager<ResourceID> {
 			JobLeaderIdService jobLeaderIdService,
 			FatalErrorHandler fatalErrorHandler) {
 		super(
-			resourceId,
 			rpcService,
+			resourceId,
 			resourceManagerConfiguration,
 			highAvailabilityServices,
 			heartbeatServices,

http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index f3e1ff3..4883e7d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -696,17 +696,35 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 		}
 	}
 
+	private void establishResourceManagerConnection(ResourceID resourceManagerResourceId) {
+		// monitor the resource manager as heartbeat target
+		resourceManagerHeartbeatManager.monitorTarget(resourceManagerResourceId, new HeartbeatTarget<Void>() {
+			@Override
+			public void receiveHeartbeat(ResourceID resourceID, Void payload) {
+				ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway();
+				resourceManagerGateway.heartbeatFromTaskManager(resourceID);
+			}
+
+			@Override
+			public void requestHeartbeat(ResourceID resourceID, Void payload) {
+				// the TaskManager won't send heartbeat requests to the ResourceManager
+			}
+		});
+	}
+
 	private void closeResourceManagerConnection(Exception cause) {
-		log.info("Close ResourceManager connection for {}.", cause);
+		validateRunsInMainThread();
 
 		if (isConnectedToResourceManager()) {
+			log.info("Close ResourceManager connection {}.", resourceManagerConnection.getResourceManagerId(), cause);
+
 			resourceManagerHeartbeatManager.unmonitorTarget(resourceManagerConnection.getResourceManagerId());
 
 			ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway();
+			resourceManagerGateway.disconnectTaskManager(getResourceID(), cause);
+
 			resourceManagerConnection.close();
 			resourceManagerConnection = null;
-
-			resourceManagerGateway.disconnectTaskManager(getResourceID(), cause);
 		}
 	}
 
@@ -790,7 +808,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 									"and returning them to the ResourceManager.", throwable);
 
 							// We encountered an exception. Free the slots and return them to the RM.
-							for (SlotOffer reservedSlot : reservedSlots) {
+							for (SlotOffer reservedSlot: reservedSlots) {
 								freeSlot(reservedSlot.getAllocationId(), throwable);
 							}
 						}
@@ -841,6 +859,8 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 	}
 
 	private void closeJobManagerConnection(JobID jobId, Exception cause) {
+		validateRunsInMainThread();
+
 		log.info("Close JobManager connection for job {}.", jobId);
 
 		// 1. fail tasks running under this JobID
@@ -1183,21 +1203,14 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 		public void onRegistrationSuccess(TaskExecutorRegistrationSuccess success) {
 			final ResourceID resourceManagerId = success.getResourceManagerId();
 
-			// monitor the resource manager as heartbeat target
-			resourceManagerHeartbeatManager.monitorTarget(resourceManagerId, new HeartbeatTarget<Void>() {
-				@Override
-				public void receiveHeartbeat(ResourceID resourceID, Void payload) {
-					if (isConnectedToResourceManager()) {
-						ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway();
-						resourceManagerGateway.heartbeatFromTaskManager(resourceID);
+			runAsync(
+				new Runnable() {
+					@Override
+					public void run() {
+						establishResourceManagerConnection(resourceManagerId);
 					}
 				}
-
-				@Override
-				public void requestHeartbeat(ResourceID resourceID, Void payload) {
-					// request heartbeat will never be called on the task manager side
-				}
-			});
+			);
 		}
 
 		@Override
@@ -1277,14 +1290,15 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 			runAsync(new Runnable() {
 				@Override
 				public void run() {
-					log.info("Job manager with id {} heartbeat timed out.", resourceID);
+					log.info("The heartbeat of JobManager with id {} timed out.", resourceID);
 
 					if (jobManagerConnections.containsKey(resourceID)) {
 						JobManagerConnection jobManagerConnection = jobManagerConnections.get(resourceID);
+
 						if (jobManagerConnection != null) {
 							closeJobManagerConnection(
 								jobManagerConnection.getJobID(),
-								new TimeoutException("Job manager with id " + resourceID + " heartbeat timed out."));
+								new TimeoutException("The heartbeat of JobManager with id " + resourceID + " timed out."));
 						}
 					}
 				}
@@ -1305,16 +1319,15 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 	private class ResourceManagerHeartbeatListener implements HeartbeatListener<Void, Void> {
 
 		@Override
-		public void notifyHeartbeatTimeout(final ResourceID resourceID) {
+		public void notifyHeartbeatTimeout(final ResourceID resourceId) {
 			runAsync(new Runnable() {
 				@Override
 				public void run() {
-					log.info("Resource manager with id {} heartbeat timed out.", resourceID);
+					log.info("The heartbeat of ResourceManager with id {} timed out.", resourceId);
 
-					if (isConnectedToResourceManager() && resourceManagerConnection.getResourceManagerId().equals(resourceID)) {
-						closeResourceManagerConnection(
-								new TimeoutException("Resource manager with id " + resourceID + " heartbeat timed out."));
-					}
+					closeResourceManagerConnection(
+						new TimeoutException(
+							"The heartbeat of ResourceManager with id " + resourceId + " timed out."));
 				}
 			});
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
index e7f2439..72925bb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
@@ -53,6 +53,7 @@ import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.TestingResourceManager;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -76,7 +77,7 @@ import static org.mockito.Mockito.verify;
 /**
  * General tests for the resource manager component.
  */
-public class ResourceManagerTest {
+public class ResourceManagerTest extends TestLogger {
 
 	private static ActorSystem system;
 
@@ -393,8 +394,7 @@ public class ResourceManagerTest {
 
 		try {
 			final StandaloneResourceManager resourceManager = new StandaloneResourceManager(
-				resourceManagerResourceID,
-				rpcService,
+				rpcService, resourceManagerResourceID,
 				resourceManagerConfiguration,
 				highAvailabilityServices,
 				heartbeatServices,

http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatServices.java
new file mode 100644
index 0000000..e628db5
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/heartbeat/TestingHeartbeatServices.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.heartbeat;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+
+public class TestingHeartbeatServices extends HeartbeatServices {
+
+	private final ScheduledExecutor scheduledExecutorToUse;
+
+	public TestingHeartbeatServices(long heartbeatInterval, long heartbeatTimeout, ScheduledExecutor scheduledExecutorToUse) {
+		super(heartbeatInterval, heartbeatTimeout);
+
+		this.scheduledExecutorToUse = Preconditions.checkNotNull(scheduledExecutorToUse);
+	}
+
+	@Override
+	public <I, O> HeartbeatManager<I, O> createHeartbeatManagerSender(
+		ResourceID resourceId,
+		HeartbeatListener<I, O> heartbeatListener,
+		ScheduledExecutor scheduledExecutor,
+		Logger log) {
+
+		return new HeartbeatManagerSenderImpl<>(
+			heartbeatInterval,
+			heartbeatTimeout,
+			resourceId,
+			heartbeatListener,
+			org.apache.flink.runtime.concurrent.Executors.directExecutor(),
+			scheduledExecutorToUse,
+			log);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 73da244..ee8f51d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -26,7 +26,8 @@ import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoader;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
-import org.apache.flink.runtime.heartbeat.*;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
@@ -49,8 +50,11 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(BlobLibraryCacheManager.class)
@@ -139,4 +143,6 @@ public class JobMasterTest extends TestLogger {
 			rpc.stopService();
 		}
 	}
+
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
index 39594df..c8e209d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
@@ -68,8 +68,7 @@ public class ResourceManagerHATest {
 
 		final ResourceManager resourceManager =
 			new StandaloneResourceManager(
-				rmResourceId,
-				rpcService,
+				rpcService, rmResourceId,
 				resourceManagerConfiguration,
 				highAvailabilityServices,
 				heartbeatServices,

http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
index 0401f9e..32b40ac 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.TestingSerialRpcService;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.util.TestLogger;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -45,7 +46,7 @@ import java.util.concurrent.TimeUnit;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.*;
 
-public class ResourceManagerJobMasterTest {
+public class ResourceManagerJobMasterTest extends TestLogger {
 
 	private TestingSerialRpcService rpcService;
 
@@ -216,8 +217,7 @@ public class ResourceManagerJobMasterTest {
 			Time.minutes(5L));
 
 		ResourceManager resourceManager = new StandaloneResourceManager(
-			rmResourceId,
-			rpcService,
+			rpcService, rmResourceId,
 			resourceManagerConfiguration,
 			highAvailabilityServices,
 			heartbeatServices,

http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
index 7c811d9..cb0a414 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.util.TestLogger;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -43,7 +44,7 @@ import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 
-public class ResourceManagerTaskExecutorTest {
+public class ResourceManagerTaskExecutorTest extends TestLogger {
 
 	private TestingSerialRpcService rpcService;
 
@@ -148,7 +149,7 @@ public class ResourceManagerTaskExecutorTest {
 
 	private StandaloneResourceManager createAndStartResourceManager(TestingLeaderElectionService rmLeaderElectionService, FatalErrorHandler fatalErrorHandler) throws Exception {
 		TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
-		HeartbeatServices heartbeatServices = mock(HeartbeatServices.class);
+		HeartbeatServices heartbeatServices = new HeartbeatServices(5L, 5L);
 		highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService);
 		TestingSlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory();
 		ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration(
@@ -163,8 +164,8 @@ public class ResourceManagerTaskExecutorTest {
 
 		StandaloneResourceManager resourceManager =
 			new StandaloneResourceManager(
-				resourceManagerResourceID,
 				rpcService,
+				resourceManagerResourceID,
 				resourceManagerConfiguration,
 				highAvailabilityServices,
 				heartbeatServices,

http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
index 28ed697..68aff42 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
@@ -237,8 +237,7 @@ public class SlotProtocolTest extends TestLogger {
 
 		ResourceManager<ResourceID> resourceManager =
 			Mockito.spy(new StandaloneResourceManager(
-				rmResourceId,
-				testRpcService,
+				testRpcService, rmResourceId,
 				resourceManagerConfiguration,
 				testingHaServices,
 				heartbeatServices,
@@ -325,8 +324,7 @@ public class SlotProtocolTest extends TestLogger {
 				JobLeaderIdService jobLeaderIdService,
 				FatalErrorHandler fatalErrorHandler) {
 			super(
-				resourceId,
-				rpcService,
+				rpcService, resourceId,
 				resourceManagerConfiguration,
 				highAvailabilityServices,
 				heartbeatServices,

http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
index 4e76486..1789ace 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
@@ -120,8 +120,7 @@ public class TaskExecutorITCase {
 		final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation);
 
 		ResourceManager<ResourceID> resourceManager = new StandaloneResourceManager(
-			rmResourceId,
-			rpcService,
+			rpcService, rmResourceId,
 			resourceManagerConfiguration,
 			testingHAServices,
 			heartbeatServices,

http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index d1f6e2e..330d4fb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -224,13 +224,19 @@ public class TaskExecutorTest extends TestLogger {
 		ResourceManagerGateway rmGateway = mock(ResourceManagerGateway.class);
 		when(rmGateway.registerTaskExecutor(
 			any(UUID.class), anyString(), any(ResourceID.class), any(SlotReport.class), any(Time.class)))
-			.thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(
-					new TaskExecutorRegistrationSuccess(new InstanceID(), rmResourceId, 10L)));
+			.thenReturn(
+				FlinkCompletableFuture.<RegistrationResponse>completed(
+					new TaskExecutorRegistrationSuccess(
+						new InstanceID(),
+						rmResourceId,
+						10L)));
 
 		final TestingSerialRpcService rpc = new TestingSerialRpcService();
 		rpc.registerGateway(rmAddress, rmGateway);
 
-		final TestingLeaderRetrievalService testLeaderService = new TestingLeaderRetrievalService();
+		final TestingLeaderRetrievalService testLeaderService = new TestingLeaderRetrievalService(
+			null,
+			null);
 		final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
 		haServices.setResourceManagerLeaderRetriever(testLeaderService);
 
@@ -292,11 +298,11 @@ public class TaskExecutorTest extends TestLogger {
 			testLeaderService.notifyListener(rmAddress, rmLeaderId);
 
 			// register resource manager success will trigger monitoring heartbeat target between tm and rm
-			verify(rmGateway).registerTaskExecutor(
+			verify(rmGateway, atLeast(1)).registerTaskExecutor(
 					eq(rmLeaderId), eq(taskManager.getAddress()), eq(tmResourceId), any(SlotReport.class), any(Time.class));
 
 			// heartbeat timeout should trigger disconnect TaskManager from ResourceManager
-			verify(rmGateway, timeout(heartbeatTimeout * 5)).disconnectTaskManager(eq(taskManagerLocation.getResourceID()), any(TimeoutException.class));
+			verify(rmGateway, timeout(heartbeatTimeout * 50L)).disconnectTaskManager(eq(taskManagerLocation.getResourceID()), any(TimeoutException.class));
 
 			// check if a concurrent error occurred
 			testingFatalErrorHandler.rethrowError();

http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-runtime/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/resources/log4j-test.properties b/flink-runtime/src/test/resources/log4j-test.properties
index 98f136a..7ba1633 100644
--- a/flink-runtime/src/test/resources/log4j-test.properties
+++ b/flink-runtime/src/test/resources/log4j-test.properties
@@ -16,7 +16,7 @@
 # limitations under the License.
 ################################################################################
 
-log4j.rootLogger=INFO, console
+log4j.rootLogger=OFF, console
 
 # -----------------------------------------------------------------------------
 # Console (use 'console')

http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
index ed672a3..21e6e45 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
@@ -199,10 +199,9 @@ public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicati
 			commonRpcService.getScheduledExecutor());
 
 		return new YarnResourceManager(
-			ResourceID.generate(),
+			commonRpcService, ResourceID.generate(),
 			config,
 			ENV,
-			commonRpcService,
 			resourceManagerConfiguration,
 			haServices,
 			heartbeatServices,

http://git-wip-us.apache.org/repos/asf/flink/blob/83b99f8a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index a308079..f8cf275 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -107,10 +107,10 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements
 	final private Map<ResourceProfile, Integer> resourcePriorities = new HashMap<>();
 
 	public YarnResourceManager(
+			RpcService rpcService,
 			ResourceID resourceId,
 			Configuration flinkConfig,
 			Map<String, String> env,
-			RpcService rpcService,
 			ResourceManagerConfiguration resourceManagerConfiguration,
 			HighAvailabilityServices highAvailabilityServices,
 			HeartbeatServices heartbeatServices,
@@ -119,8 +119,8 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements
 			JobLeaderIdService jobLeaderIdService,
 			FatalErrorHandler fatalErrorHandler) {
 		super(
-			resourceId,
 			rpcService,
+			resourceId,
 			resourceManagerConfiguration,
 			highAvailabilityServices,
 			heartbeatServices,