You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/03/20 17:38:14 UTC

flink git commit: [FLINK-5999] [resMgnr] Move JobLeaderIdService shut down into ResourceManagerRunner

Repository: flink
Updated Branches:
  refs/heads/master 7e9557afe -> 124872319


[FLINK-5999] [resMgnr] Move JobLeaderIdService shut down into ResourceManagerRunner

The JobLeaderIdService is being created by the ResourceManagerRunner and then given to a
ResourceManager. Before the ResourceManager stopped the service before being stopped
itself. This could lead to a concurrent modification exception by a state changing action
executed by the actor thread. In order to avoid this concurrent modification, the service's
shut down is now being executed after the ResourceManager has been shut down.

This closes #3526.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/12487231
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/12487231
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/12487231

Branch: refs/heads/master
Commit: 124872319889206b8ceaa7222c25318047102f6e
Parents: 7e9557a
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Mar 13 15:55:02 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Mar 20 18:35:58 2017 +0100

----------------------------------------------------------------------
 .../resourcemanager/ResourceManager.java        | 10 +--
 .../ResourceManagerConfiguration.java           | 22 +------
 .../resourcemanager/ResourceManagerRunner.java  | 37 ++++++++---
 .../ResourceManagerRuntimeServices.java         | 69 ++++++++++++++++++++
 ...urceManagerRuntimeServicesConfiguration.java | 59 +++++++++++++++++
 .../resourcemanager/ResourceManagerHATest.java  | 17 +++--
 .../ResourceManagerJobMasterTest.java           |  5 +-
 .../ResourceManagerTaskExecutorTest.java        |  7 +-
 .../slotmanager/SlotProtocolTest.java           | 10 ++-
 .../taskexecutor/TaskExecutorITCase.java        |  5 +-
 .../yarn/YarnFlinkApplicationMasterRunner.java  | 21 +++---
 11 files changed, 191 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/12487231/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index badfbe2..91fbba6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -179,13 +179,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 		Exception exception = null;
 
 		try {
-			jobLeaderIdService.stop();
-		} catch (Exception e) {
-			exception = ExceptionUtils.firstOrSuppressed(e, exception);
-		}
-
-		try {
-			leaderElectionService.stop();
+			super.shutDown();
 		} catch (Exception e) {
 			exception = ExceptionUtils.firstOrSuppressed(e, exception);
 		}
@@ -193,7 +187,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 		clearState();
 
 		try {
-			super.shutDown();
+			leaderElectionService.stop();
 		} catch (Exception e) {
 			exception = ExceptionUtils.firstOrSuppressed(e, exception);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/12487231/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerConfiguration.java
index d04d852..fa75bbb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerConfiguration.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.resourcemanager;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.ResourceManagerOptions;
 import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
 import org.apache.flink.util.Preconditions;
 import scala.concurrent.duration.Duration;
@@ -33,15 +32,12 @@ public class ResourceManagerConfiguration {
 
 	private final Time timeout;
 	private final Time heartbeatInterval;
-	private final Time jobTimeout;
 
 	public ResourceManagerConfiguration(
 			Time timeout,
-			Time heartbeatInterval,
-			Time jobTimeout) {
+			Time heartbeatInterval) {
 		this.timeout = Preconditions.checkNotNull(timeout, "timeout");
 		this.heartbeatInterval = Preconditions.checkNotNull(heartbeatInterval, "heartbeatInterval");
-		this.jobTimeout = Preconditions.checkNotNull(jobTimeout, "jobTimeout");
 	}
 
 	public Time getTimeout() {
@@ -52,10 +48,6 @@ public class ResourceManagerConfiguration {
 		return heartbeatInterval;
 	}
 
-	public Time getJobTimeout() {
-		return jobTimeout;
-	}
-
 	// --------------------------------------------------------------------------
 	// Static factory methods
 	// --------------------------------------------------------------------------
@@ -81,16 +73,6 @@ public class ResourceManagerConfiguration {
 				"value " + AkkaOptions.AKKA_WATCH_HEARTBEAT_INTERVAL + '.', e);
 		}
 
-		final String strJobTimeout = configuration.getString(ResourceManagerOptions.JOB_TIMEOUT);
-		final Time jobTimeout;
-
-		try {
-			jobTimeout = Time.milliseconds(Duration.apply(strJobTimeout).toMillis());
-		} catch (NumberFormatException e) {
-			throw new ConfigurationException("Could not parse the resource manager's job timeout " +
-				"value " + ResourceManagerOptions.JOB_TIMEOUT + '.', e);
-		}
-
-		return new ResourceManagerConfiguration(timeout, heartbeatInterval, jobTimeout);
+		return new ResourceManagerConfiguration(timeout, heartbeatInterval);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/12487231/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
index 749b407..73b27b5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
@@ -21,10 +21,9 @@ package org.apache.flink.runtime.resourcemanager;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
-import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,6 +38,8 @@ public class ResourceManagerRunner implements FatalErrorHandler {
 
 	private final Object lock = new Object();
 
+	private final ResourceManagerRuntimeServices resourceManagerRuntimeServices;
+
 	private final ResourceManager<?> resourceManager;
 
 	public ResourceManagerRunner(
@@ -53,19 +54,21 @@ public class ResourceManagerRunner implements FatalErrorHandler {
 		Preconditions.checkNotNull(metricRegistry);
 
 		final ResourceManagerConfiguration resourceManagerConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration);
-		final SlotManagerFactory slotManagerFactory = new DefaultSlotManager.Factory();
-		final JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(
+
+		final ResourceManagerRuntimeServicesConfiguration resourceManagerRuntimeServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
+
+		resourceManagerRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
+			resourceManagerRuntimeServicesConfiguration,
 			highAvailabilityServices,
-			rpcService.getScheduledExecutor(),
-			resourceManagerConfiguration.getJobTimeout());
+			rpcService.getScheduledExecutor());
 
 		this.resourceManager = new StandaloneResourceManager(
 			rpcService,
 			resourceManagerConfiguration,
 			highAvailabilityServices,
-			slotManagerFactory,
+			resourceManagerRuntimeServices.getSlotManagerFactory(),
 			metricRegistry,
-			jobLeaderIdService,
+			resourceManagerRuntimeServices.getJobLeaderIdService(),
 			this);
 	}
 
@@ -82,8 +85,24 @@ public class ResourceManagerRunner implements FatalErrorHandler {
 	}
 
 	private void shutDownInternally() throws Exception {
+		Exception exception = null;
 		synchronized (lock) {
-			resourceManager.shutDown();
+
+			try {
+				resourceManager.shutDown();
+			} catch (Exception e) {
+				exception = ExceptionUtils.firstOrSuppressed(e, exception);
+			}
+
+			try {
+				resourceManagerRuntimeServices.shutDown();
+			} catch (Exception e) {
+				exception = ExceptionUtils.firstOrSuppressed(e, exception);
+			}
+
+			if (exception != null) {
+				ExceptionUtils.rethrow(exception, "Error while shutting down the resource manager runner.");
+			}
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/12487231/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServices.java
new file mode 100644
index 0000000..56edde4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServices.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Container class for the {@link ResourceManager} services.
+ */
+public class ResourceManagerRuntimeServices {
+
+	private final SlotManagerFactory slotManagerFactory;
+	private final JobLeaderIdService jobLeaderIdService;
+
+	public ResourceManagerRuntimeServices(SlotManagerFactory slotManagerFactory, JobLeaderIdService jobLeaderIdService) {
+		this.slotManagerFactory = Preconditions.checkNotNull(slotManagerFactory);
+		this.jobLeaderIdService = Preconditions.checkNotNull(jobLeaderIdService);
+	}
+
+	public SlotManagerFactory getSlotManagerFactory() {
+		return slotManagerFactory;
+	}
+
+	public JobLeaderIdService getJobLeaderIdService() {
+		return jobLeaderIdService;
+	}
+
+	// -------------------- Lifecycle methods -----------------------------------
+
+	public void shutDown() throws Exception {
+		jobLeaderIdService.stop();
+	}
+
+	// -------------------- Static methods --------------------------------------
+
+	public static ResourceManagerRuntimeServices fromConfiguration(
+			ResourceManagerRuntimeServicesConfiguration configuration,
+			HighAvailabilityServices highAvailabilityServices,
+			ScheduledExecutor scheduledExecutor) throws Exception {
+
+		final SlotManagerFactory slotManagerFactory = new DefaultSlotManager.Factory();
+		final JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(
+			highAvailabilityServices,
+			scheduledExecutor,
+			configuration.getJobTimeout());
+
+		return new ResourceManagerRuntimeServices(slotManagerFactory, jobLeaderIdService);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/12487231/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServicesConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServicesConfiguration.java
new file mode 100644
index 0000000..6de5f4d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRuntimeServicesConfiguration.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ResourceManagerOptions;
+import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
+import org.apache.flink.util.Preconditions;
+import scala.concurrent.duration.Duration;
+
+/**
+ * Configuration class for the {@link ResourceManagerRuntimeServices} class.
+ */
+public class ResourceManagerRuntimeServicesConfiguration {
+
+	private final Time jobTimeout;
+
+	public ResourceManagerRuntimeServicesConfiguration(Time jobTimeout) {
+		this.jobTimeout = Preconditions.checkNotNull(jobTimeout);
+	}
+
+	public Time getJobTimeout() {
+		return jobTimeout;
+	}
+
+	// ---------------------------- Static methods ----------------------------------
+
+	public static ResourceManagerRuntimeServicesConfiguration fromConfiguration(Configuration configuration) throws ConfigurationException {
+
+		final String strJobTimeout = configuration.getString(ResourceManagerOptions.JOB_TIMEOUT);
+		final Time jobTimeout;
+
+		try {
+			jobTimeout = Time.milliseconds(Duration.apply(strJobTimeout).toMillis());
+		} catch (NumberFormatException e) {
+			throw new ConfigurationException("Could not parse the resource manager's job timeout " +
+				"value " + ResourceManagerOptions.JOB_TIMEOUT + '.', e);
+		}
+
+		return new ResourceManagerRuntimeServicesConfiguration(jobTimeout);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/12487231/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
index 58dedc3..1aa799b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
@@ -48,14 +48,17 @@ public class ResourceManagerHATest {
 
 		ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration(
 			Time.seconds(5L),
-			Time.seconds(5L),
-			Time.minutes(5L));
+			Time.seconds(5L));
+
+		ResourceManagerRuntimeServicesConfiguration resourceManagerRuntimeServicesConfiguration = new ResourceManagerRuntimeServicesConfiguration(Time.seconds(5L));
+		ResourceManagerRuntimeServices resourceManagerRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
+			resourceManagerRuntimeServicesConfiguration,
+			highAvailabilityServices,
+			rpcService.getScheduledExecutor());
+
 		SlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory();
 		MetricRegistry metricRegistry = mock(MetricRegistry.class);
-		JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(
-			highAvailabilityServices,
-			rpcService.getScheduledExecutor(),
-			resourceManagerConfiguration.getJobTimeout());
+
 		TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
 
 		final ResourceManager resourceManager =
@@ -65,7 +68,7 @@ public class ResourceManagerHATest {
 				highAvailabilityServices,
 				slotManagerFactory,
 				metricRegistry,
-				jobLeaderIdService,
+				resourceManagerRuntimeServices.getJobLeaderIdService(),
 				testingFatalErrorHandler);
 		resourceManager.start();
 		// before grant leadership, resourceManager's leaderId is null

http://git-wip-us.apache.org/repos/asf/flink/blob/12487231/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
index 031f76e..fb166d4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
@@ -195,14 +195,13 @@ public class ResourceManagerJobMasterTest {
 
 		ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration(
 			Time.seconds(5L),
-			Time.seconds(5L),
-			Time.minutes(5L));
+			Time.seconds(5L));
 		SlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory();
 		MetricRegistry metricRegistry = mock(MetricRegistry.class);
 		JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(
 			highAvailabilityServices,
 			rpcService.getScheduledExecutor(),
-			resourceManagerConfiguration.getJobTimeout());
+			Time.minutes(5L));
 
 		ResourceManager resourceManager = new StandaloneResourceManager(
 			rpcService,

http://git-wip-us.apache.org/repos/asf/flink/blob/12487231/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
index 4456235..0a1addb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
@@ -148,14 +148,13 @@ public class ResourceManagerTaskExecutorTest {
 		TestingSlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory();
 		ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration(
 			Time.seconds(5L),
-			Time.seconds(5L),
-			Time.minutes(5L));
+			Time.seconds(5L));
+
 		MetricRegistry metricRegistry = mock(MetricRegistry.class);
 		JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(
 			highAvailabilityServices,
 			rpcService.getScheduledExecutor(),
-			resourceManagerConfiguration.getJobTimeout());
-
+			Time.minutes(5L));
 
 		StandaloneResourceManager resourceManager =
 			new StandaloneResourceManager(

http://git-wip-us.apache.org/repos/asf/flink/blob/12487231/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
index 1e5edbe..ea660f8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
@@ -109,13 +109,12 @@ public class SlotProtocolTest extends TestLogger {
 
 		ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration(
 			Time.seconds(5L),
-			Time.seconds(5L),
-			Time.minutes(5L));
+			Time.seconds(5L));
 
 		JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(
 			testingHaServices,
 			testRpcService.getScheduledExecutor(),
-			resourceManagerConfiguration.getJobTimeout());
+			Time.seconds(5L));
 
 		final TestingSlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory();
 		SpiedResourceManager resourceManager =
@@ -217,13 +216,12 @@ public class SlotProtocolTest extends TestLogger {
 
 		ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration(
 			Time.seconds(5L),
-			Time.seconds(5L),
-			Time.minutes(5L));
+			Time.seconds(5L));
 
 		JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(
 			testingHaServices,
 			testRpcService.getScheduledExecutor(),
-			resourceManagerConfiguration.getJobTimeout());
+			Time.seconds(5L));
 
 		TestingSlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory();
 		ResourceManager<ResourceID> resourceManager =

http://git-wip-us.apache.org/repos/asf/flink/blob/12487231/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
index 16edbf7..43f33a3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
@@ -97,13 +97,12 @@ public class TaskExecutorITCase {
 		TestingSerialRpcService rpcService = new TestingSerialRpcService();
 		ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration(
 			Time.milliseconds(500L),
-			Time.milliseconds(500L),
-			Time.minutes(5L));
+			Time.milliseconds(500L));
 		SlotManagerFactory slotManagerFactory = new DefaultSlotManager.Factory();
 		JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(
 			testingHAServices,
 			rpcService.getScheduledExecutor(),
-			resourceManagerConfiguration.getJobTimeout());
+			Time.minutes(5L));
 		MetricRegistry metricRegistry = mock(MetricRegistry.class);
 		HeartbeatServices heartbeatServices = mock(HeartbeatServices.class, RETURNS_MOCKS);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/12487231/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
index 6fb7c86..7a0dbbe 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
@@ -35,11 +35,10 @@ import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmaster.JobManagerRunner;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
-import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
-import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
-import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
@@ -64,8 +63,8 @@ import java.io.ObjectInputStream;
  * <p>The lifetime of the YARN application bound to that of the Flink job. Other
  * YARN Application Master implementations are for example the YARN session.
  * 
- * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmaster.JobManagerRunner}
- * and {@link org.apache.flink.yarn.YarnResourceManager}.
+ * It starts actor system and the actors for {@link JobManagerRunner}
+ * and {@link YarnResourceManager}.
  *
  * The JobMasnagerRunner start a {@link org.apache.flink.runtime.jobmaster.JobMaster}
  * JobMaster handles Flink job execution, while the YarnResourceManager handles container
@@ -193,20 +192,20 @@ public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicati
 
 	private ResourceManager<?> createResourceManager(Configuration config) throws Exception {
 		final ResourceManagerConfiguration resourceManagerConfiguration = ResourceManagerConfiguration.fromConfiguration(config);
-		final SlotManagerFactory slotManagerFactory = new DefaultSlotManager.Factory();
-		final JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(
+		final ResourceManagerRuntimeServicesConfiguration resourceManagerRuntimeServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(config);
+		final ResourceManagerRuntimeServices resourceManagerRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
+			resourceManagerRuntimeServicesConfiguration,
 			haServices,
-			commonRpcService.getScheduledExecutor(),
-			resourceManagerConfiguration.getJobTimeout());
+			commonRpcService.getScheduledExecutor());
 
 		return new YarnResourceManager(config,
 				ENV,
 				commonRpcService,
 				resourceManagerConfiguration,
 				haServices,
-				slotManagerFactory,
+				resourceManagerRuntimeServices.getSlotManagerFactory(),
 				metricRegistry,
-				jobLeaderIdService,
+				resourceManagerRuntimeServices.getJobLeaderIdService(),
 				this);
 	}