You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/28 09:43:11 UTC

[flink] 01/04: [FLINK-10411] Make ClusterEntrypoint more compositional

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1fcbc9f816460b1a159fd79b4c9bcf83327d79c3
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Sun Sep 23 13:39:34 2018 +0200

    [FLINK-10411] Make ClusterEntrypoint more compositional
    
    Introduce a ClusterComponent which encapsulates the logic for starting the cluster
    components, Dispatcher, RestServerEndpoint and the ResourceManager. The individual
    components are created by using a factory instance. The ClusterEntrypoint is now
    only responsible for managing the required services needed by the ClusterComponent.
    This design should make the testing of these components easier, improve reusability
    and reduce code duplication.
---
 .../entrypoint/ClassPathJobGraphRetriever.java     |  80 +++++
 .../entrypoint/StandaloneJobClusterEntryPoint.java |  85 +----
 ...st.java => ClassPathJobGraphRetrieverTest.java} |  18 +-
 .../apache/flink/container/entrypoint/TestJob.java |   2 +-
 .../entrypoint/MesosJobClusterEntrypoint.java      |  89 +-----
 .../entrypoint/MesosSessionClusterEntrypoint.java  |  61 +---
 .../MesosResourceManagerFactory.java               |  90 ++++++
 .../runtime/dispatcher/DispatcherFactory.java      |  53 ++++
 .../runtime/dispatcher/JobDispatcherFactory.java   |  86 +++++
 .../dispatcher/SessionDispatcherFactory.java       |  69 ++++
 .../flink/runtime/entrypoint/ClusterComponent.java | 347 +++++++++++++++++++++
 .../runtime/entrypoint/ClusterEntrypoint.java      | 284 +++--------------
 .../runtime/entrypoint/FileJobGraphRetriever.java  |  70 +++++
 .../runtime/entrypoint/JobClusterComponent.java    |  51 +++
 .../runtime/entrypoint/JobClusterEntrypoint.java   |  98 ------
 .../runtime/entrypoint/JobGraphRetriever.java      |  38 +++
 .../entrypoint/SessionClusterComponent.java        |  34 ++
 .../entrypoint/SessionClusterEntrypoint.java       |  79 -----
 .../StandaloneSessionClusterEntrypoint.java        |  43 +--
 .../resourcemanager/ResourceManagerFactory.java    |  50 +++
 .../StandaloneResourceManagerFactory.java}         |  46 +--
 .../flink/runtime/rest/JobRestEndpointFactory.java |  66 ++++
 .../flink/runtime/rest/RestEndpointFactory.java    |  50 +++
 .../runtime/rest/SessionRestEndpointFactory.java   |  65 ++++
 .../yarn/entrypoint/YarnJobClusterEntrypoint.java  |  83 +----
 ...ypoint.java => YarnResourceManagerFactory.java} |  69 +---
 .../entrypoint/YarnSessionClusterEntrypoint.java   |  47 +--
 27 files changed, 1255 insertions(+), 898 deletions(-)

diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java
new file mode 100644
index 0000000..5ded04b
--- /dev/null
+++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.container.entrypoint;
+
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.client.program.PackagedProgramUtils;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.runtime.entrypoint.JobGraphRetriever;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.util.FlinkException;
+
+import javax.annotation.Nonnull;
+
+/**
+ * {@link JobGraphRetriever} which creates the {@link JobGraph} from a class
+ * on the class path.
+ */
+public class ClassPathJobGraphRetriever implements JobGraphRetriever {
+
+	@Nonnull
+	private final String jobClassName;
+
+	@Nonnull
+	private final SavepointRestoreSettings savepointRestoreSettings;
+
+	@Nonnull
+	private final String[] programArguments;
+
+	public ClassPathJobGraphRetriever(
+			@Nonnull String jobClassName,
+			@Nonnull SavepointRestoreSettings savepointRestoreSettings,
+			@Nonnull String[] programArguments) {
+		this.jobClassName = jobClassName;
+		this.savepointRestoreSettings = savepointRestoreSettings;
+		this.programArguments = programArguments;
+	}
+
+	@Override
+	public JobGraph retrieveJobGraph(Configuration configuration) throws FlinkException {
+		final PackagedProgram packagedProgram = createPackagedProgram();
+		final int defaultParallelism = configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM);
+		try {
+			final JobGraph jobGraph = PackagedProgramUtils.createJobGraph(packagedProgram, configuration, defaultParallelism);
+			jobGraph.setAllowQueuedScheduling(true);
+			jobGraph.setSavepointRestoreSettings(savepointRestoreSettings);
+
+			return jobGraph;
+		} catch (Exception e) {
+			throw new FlinkException("Could not create the JobGraph from the provided user code jar.", e);
+		}
+	}
+
+	private PackagedProgram createPackagedProgram() throws FlinkException {
+		try {
+			final Class<?> mainClass = getClass().getClassLoader().loadClass(jobClassName);
+			return new PackagedProgram(mainClass, programArguments);
+		} catch (ClassNotFoundException | ProgramInvocationException e) {
+			throw new FlinkException("Could not load the provided entrypoint class.", e);
+		}
+	}
+}
diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java
index 0e40095..cdd44b5 100644
--- a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java
+++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java
@@ -18,38 +18,20 @@
 
 package org.apache.flink.container.entrypoint;
 
-import org.apache.flink.client.program.PackagedProgram;
-import org.apache.flink.client.program.PackagedProgramUtils;
-import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.entrypoint.ClusterComponent;
 import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
-import org.apache.flink.runtime.entrypoint.ClusterInformation;
 import org.apache.flink.runtime.entrypoint.FlinkParseException;
+import org.apache.flink.runtime.entrypoint.JobClusterComponent;
 import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint;
 import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
-import org.apache.flink.runtime.heartbeat.HeartbeatServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
-import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.resourcemanager.ResourceManager;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
-import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.SignalHandler;
-import org.apache.flink.util.FlinkException;
 
 import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-
-import java.util.concurrent.CompletableFuture;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -79,62 +61,10 @@ public final class StandaloneJobClusterEntryPoint extends JobClusterEntrypoint {
 	}
 
 	@Override
-	protected JobGraph retrieveJobGraph(Configuration configuration) throws FlinkException {
-		final PackagedProgram packagedProgram = createPackagedProgram();
-		final int defaultParallelism = configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM);
-		try {
-			final JobGraph jobGraph = PackagedProgramUtils.createJobGraph(packagedProgram, configuration, defaultParallelism);
-			jobGraph.setAllowQueuedScheduling(true);
-			jobGraph.setSavepointRestoreSettings(savepointRestoreSettings);
-
-			return jobGraph;
-		} catch (Exception e) {
-			throw new FlinkException("Could not create the JobGraph from the provided user code jar.", e);
-		}
-	}
-
-	private PackagedProgram createPackagedProgram() throws FlinkException {
-		try {
-			final Class<?> mainClass = getClass().getClassLoader().loadClass(jobClassName);
-			return new PackagedProgram(mainClass, programArguments);
-		} catch (ClassNotFoundException | ProgramInvocationException e) {
-			throw new FlinkException("Could not load the provided entrypoint class.", e);
-		}
-	}
-
-	@Override
-	protected void registerShutdownActions(CompletableFuture<ApplicationStatus> terminationFuture) {
-		terminationFuture.thenAccept((status) -> shutDownAndTerminate(0, ApplicationStatus.SUCCEEDED, null, true));
-	}
-
-	@Override
-	protected ResourceManager<?> createResourceManager(
-			Configuration configuration,
-			ResourceID resourceId,
-			RpcService rpcService,
-			HighAvailabilityServices highAvailabilityServices,
-			HeartbeatServices heartbeatServices,
-			MetricRegistry metricRegistry,
-			FatalErrorHandler fatalErrorHandler,
-			ClusterInformation clusterInformation,
-			@Nullable String webInterfaceUrl) throws Exception {
-		final ResourceManagerRuntimeServicesConfiguration resourceManagerRuntimeServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
-		final ResourceManagerRuntimeServices resourceManagerRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
-			resourceManagerRuntimeServicesConfiguration,
-			highAvailabilityServices,
-			rpcService.getScheduledExecutor());
-
-		return new StandaloneResourceManager(
-			rpcService,
-			ResourceManager.RESOURCE_MANAGER_NAME,
-			resourceId,
-			highAvailabilityServices,
-			heartbeatServices,
-			resourceManagerRuntimeServices.getSlotManager(),
-			metricRegistry,
-			resourceManagerRuntimeServices.getJobLeaderIdService(),
-			clusterInformation,
-			fatalErrorHandler);
+	protected ClusterComponent<?> createDispatcherComponent(Configuration configuration) {
+		return new JobClusterComponent(
+			StandaloneResourceManagerFactory.INSTANCE,
+			new ClassPathJobGraphRetriever(jobClassName, savepointRestoreSettings, programArguments));
 	}
 
 	public static void main(String[] args) {
@@ -166,5 +96,4 @@ public final class StandaloneJobClusterEntryPoint extends JobClusterEntrypoint {
 
 		entrypoint.startCluster();
 	}
-
 }
diff --git a/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java b/flink-container/src/test/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetrieverTest.java
similarity index 81%
rename from flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java
rename to flink-container/src/test/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetrieverTest.java
index 2faec4c..6e460e1 100644
--- a/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java
+++ b/flink-container/src/test/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetrieverTest.java
@@ -32,24 +32,24 @@ import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertThat;
 
 /**
- * Tests for the {@link StandaloneJobClusterEntryPoint}.
+ * Tests for the {@link ClassPathJobGraphRetriever}.
  */
-public class StandaloneJobClusterEntryPointTest extends TestLogger {
+public class ClassPathJobGraphRetrieverTest extends TestLogger {
 
 	public static final String[] PROGRAM_ARGUMENTS = {"--arg", "suffix"};
 
 	@Test
 	public void testJobGraphRetrieval() throws FlinkException {
-		final Configuration configuration = new Configuration();
 		final int parallelism = 42;
+		final Configuration configuration = new Configuration();
 		configuration.setInteger(CoreOptions.DEFAULT_PARALLELISM, parallelism);
-		final StandaloneJobClusterEntryPoint standaloneJobClusterEntryPoint = new StandaloneJobClusterEntryPoint(
-			configuration,
+
+		final ClassPathJobGraphRetriever classPathJobGraphRetriever = new ClassPathJobGraphRetriever(
 			TestJob.class.getCanonicalName(),
 			SavepointRestoreSettings.none(),
 			PROGRAM_ARGUMENTS);
 
-		final JobGraph jobGraph = standaloneJobClusterEntryPoint.retrieveJobGraph(configuration);
+		final JobGraph jobGraph = classPathJobGraphRetriever.retrieveJobGraph(configuration);
 
 		assertThat(jobGraph.getName(), is(equalTo(TestJob.class.getCanonicalName() + "-suffix")));
 		assertThat(jobGraph.getMaximumParallelism(), is(parallelism));
@@ -59,13 +59,13 @@ public class StandaloneJobClusterEntryPointTest extends TestLogger {
 	public void testSavepointRestoreSettings() throws FlinkException {
 		final Configuration configuration = new Configuration();
 		final SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.forPath("foobar", true);
-		final StandaloneJobClusterEntryPoint jobClusterEntryPoint = new StandaloneJobClusterEntryPoint(
-			configuration,
+
+		final ClassPathJobGraphRetriever classPathJobGraphRetriever = new ClassPathJobGraphRetriever(
 			TestJob.class.getCanonicalName(),
 			savepointRestoreSettings,
 			PROGRAM_ARGUMENTS);
 
-		final JobGraph jobGraph = jobClusterEntryPoint.retrieveJobGraph(configuration);
+		final JobGraph jobGraph = classPathJobGraphRetriever.retrieveJobGraph(configuration);
 
 		assertThat(jobGraph.getSavepointRestoreSettings(), is(equalTo(savepointRestoreSettings)));
 	}
diff --git a/flink-container/src/test/java/org/apache/flink/container/entrypoint/TestJob.java b/flink-container/src/test/java/org/apache/flink/container/entrypoint/TestJob.java
index ada434d..27d1f32 100644
--- a/flink-container/src/test/java/org/apache/flink/container/entrypoint/TestJob.java
+++ b/flink-container/src/test/java/org/apache/flink/container/entrypoint/TestJob.java
@@ -25,7 +25,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 
 /**
- * Test job which is used for {@link StandaloneJobClusterEntryPointTest}.
+ * Test job which is used for {@link ClassPathJobGraphRetrieverTest}.
  */
 public class TestJob {
 
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
index 38b61d8..ed2175a 100755
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
@@ -20,31 +20,21 @@ package org.apache.flink.mesos.entrypoint;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager;
+import org.apache.flink.mesos.runtime.clusterframework.MesosResourceManagerFactory;
 import org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters;
 import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
 import org.apache.flink.mesos.runtime.clusterframework.services.MesosServicesUtils;
 import org.apache.flink.mesos.util.MesosConfiguration;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.clusterframework.ContainerSpecification;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.entrypoint.ClusterComponent;
+import org.apache.flink.runtime.entrypoint.FileJobGraphRetriever;
+import org.apache.flink.runtime.entrypoint.JobClusterComponent;
 import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint;
-import org.apache.flink.runtime.heartbeat.HeartbeatServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.resourcemanager.ResourceManager;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.SignalHandler;
-import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.commons.cli.CommandLine;
@@ -52,13 +42,6 @@ import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
 
-import javax.annotation.Nullable;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.ObjectInputStream;
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -66,8 +49,6 @@ import java.util.concurrent.CompletableFuture;
  */
 public class MesosJobClusterEntrypoint extends JobClusterEntrypoint {
 
-	public static final String JOB_GRAPH_FILE_PATH = "flink.jobgraph.path";
-
 	// ------------------------------------------------------------------------
 	//  Command-line options
 	// ------------------------------------------------------------------------
@@ -114,58 +95,6 @@ public class MesosJobClusterEntrypoint extends JobClusterEntrypoint {
 	}
 
 	@Override
-	protected ResourceManager<?> createResourceManager(
-			Configuration configuration,
-			ResourceID resourceId,
-			RpcService rpcService,
-			HighAvailabilityServices highAvailabilityServices,
-			HeartbeatServices heartbeatServices,
-			MetricRegistry metricRegistry,
-			FatalErrorHandler fatalErrorHandler,
-			ClusterInformation clusterInformation,
-			@Nullable String webInterfaceUrl) throws Exception {
-		final ResourceManagerRuntimeServicesConfiguration rmServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
-		final ResourceManagerRuntimeServices rmRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
-			rmServicesConfiguration,
-			highAvailabilityServices,
-			rpcService.getScheduledExecutor());
-
-		return new MesosResourceManager(
-			rpcService,
-			ResourceManager.RESOURCE_MANAGER_NAME,
-			resourceId,
-			highAvailabilityServices,
-			heartbeatServices,
-			rmRuntimeServices.getSlotManager(),
-			metricRegistry,
-			rmRuntimeServices.getJobLeaderIdService(),
-			clusterInformation,
-			fatalErrorHandler,
-			configuration,
-			mesosServices,
-			schedulerConfiguration,
-			taskManagerParameters,
-			taskManagerContainerSpec,
-			webInterfaceUrl);
-	}
-
-	@Override
-	protected JobGraph retrieveJobGraph(Configuration configuration) throws FlinkException {
-		String jobGraphFile = configuration.getString(JOB_GRAPH_FILE_PATH, "job.graph");
-		File fp = new File(jobGraphFile);
-
-		try (FileInputStream input = new FileInputStream(fp);
-			ObjectInputStream obInput = new ObjectInputStream(input)) {
-
-			return (JobGraph) obInput.readObject();
-		} catch (FileNotFoundException e) {
-			throw new FlinkException("Could not find the JobGraph file.", e);
-		} catch (ClassNotFoundException | IOException e) {
-			throw new FlinkException("Could not load the JobGraph from file.", e);
-		}
-	}
-
-	@Override
 	protected CompletableFuture<Void> stopClusterServices(boolean cleanupHaData) {
 		final CompletableFuture<Void> serviceShutDownFuture = super.stopClusterServices(cleanupHaData);
 
@@ -179,7 +108,15 @@ public class MesosJobClusterEntrypoint extends JobClusterEntrypoint {
 	}
 
 	@Override
-	protected void registerShutdownActions(CompletableFuture<ApplicationStatus> terminationFuture) {}
+	protected ClusterComponent<?> createDispatcherComponent(Configuration configuration) {
+		return new JobClusterComponent(
+			new MesosResourceManagerFactory(
+				mesosServices,
+				schedulerConfiguration,
+				taskManagerParameters,
+				taskManagerContainerSpec),
+			FileJobGraphRetriever.createFrom(configuration));
+	}
 
 	public static void main(String[] args) {
 		// startup checks and logging
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
index 70369f6..0cc3053 100755
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
@@ -20,25 +20,17 @@ package org.apache.flink.mesos.entrypoint;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager;
+import org.apache.flink.mesos.runtime.clusterframework.MesosResourceManagerFactory;
 import org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters;
 import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
 import org.apache.flink.mesos.runtime.clusterframework.services.MesosServicesUtils;
 import org.apache.flink.mesos.util.MesosConfiguration;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.clusterframework.ContainerSpecification;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.entrypoint.ClusterComponent;
+import org.apache.flink.runtime.entrypoint.SessionClusterComponent;
 import org.apache.flink.runtime.entrypoint.SessionClusterEntrypoint;
-import org.apache.flink.runtime.heartbeat.HeartbeatServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.resourcemanager.ResourceManager;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.SignalHandler;
@@ -49,8 +41,6 @@ import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
 
-import javax.annotation.Nullable;
-
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -104,42 +94,6 @@ public class MesosSessionClusterEntrypoint extends SessionClusterEntrypoint {
 	}
 
 	@Override
-	protected ResourceManager<?> createResourceManager(
-			Configuration configuration,
-			ResourceID resourceId,
-			RpcService rpcService,
-			HighAvailabilityServices highAvailabilityServices,
-			HeartbeatServices heartbeatServices,
-			MetricRegistry metricRegistry,
-			FatalErrorHandler fatalErrorHandler,
-			ClusterInformation clusterInformation,
-			@Nullable String webInterfaceUrl) throws Exception {
-		final ResourceManagerRuntimeServicesConfiguration rmServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
-		final ResourceManagerRuntimeServices rmRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
-			rmServicesConfiguration,
-			highAvailabilityServices,
-			rpcService.getScheduledExecutor());
-
-		return new MesosResourceManager(
-			rpcService,
-			ResourceManager.RESOURCE_MANAGER_NAME,
-			resourceId,
-			highAvailabilityServices,
-			heartbeatServices,
-			rmRuntimeServices.getSlotManager(),
-			metricRegistry,
-			rmRuntimeServices.getJobLeaderIdService(),
-			clusterInformation,
-			fatalErrorHandler,
-			configuration,
-			mesosServices,
-			mesosConfig,
-			taskManagerParameters,
-			taskManagerContainerSpec,
-			webInterfaceUrl);
-	}
-
-	@Override
 	protected CompletableFuture<Void> stopClusterServices(boolean cleanupHaData) {
 		final CompletableFuture<Void> serviceShutDownFuture = super.stopClusterServices(cleanupHaData);
 
@@ -152,6 +106,15 @@ public class MesosSessionClusterEntrypoint extends SessionClusterEntrypoint {
 			});
 	}
 
+	@Override
+	protected ClusterComponent<?> createDispatcherComponent(Configuration configuration) {
+		return new SessionClusterComponent(new MesosResourceManagerFactory(
+			mesosServices,
+			mesosConfig,
+			taskManagerParameters,
+			taskManagerContainerSpec));
+	}
+
 	public static void main(String[] args) {
 		// startup checks and logging
 		EnvironmentInformation.logEnvironmentInfo(LOG, MesosSessionClusterEntrypoint.class.getSimpleName(), args);
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerFactory.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerFactory.java
new file mode 100644
index 0000000..9582e9f
--- /dev/null
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerFactory.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+/**
+ * {@link ResourceManagerFactory} which creates a {@link MesosResourceManager}.
+ */
+public class MesosResourceManagerFactory implements ResourceManagerFactory<RegisteredMesosWorkerNode> {
+
+	@Nonnull
+	private final MesosServices mesosServices;
+
+	@Nonnull
+	private final MesosConfiguration schedulerConfiguration;
+
+	@Nonnull
+	private final MesosTaskManagerParameters taskManagerParameters;
+
+	@Nonnull
+	private final ContainerSpecification taskManagerContainerSpec;
+
+	public MesosResourceManagerFactory(@Nonnull MesosServices mesosServices, @Nonnull MesosConfiguration schedulerConfiguration, @Nonnull MesosTaskManagerParameters taskManagerParameters, @Nonnull ContainerSpecification taskManagerContainerSpec) {
+		this.mesosServices = mesosServices;
+		this.schedulerConfiguration = schedulerConfiguration;
+		this.taskManagerParameters = taskManagerParameters;
+		this.taskManagerContainerSpec = taskManagerContainerSpec;
+	}
+
+	@Override
+	public ResourceManager<RegisteredMesosWorkerNode> createResourceManager(Configuration configuration, ResourceID resourceId, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, MetricRegistry metricRegistry, FatalErrorHandler fatalErrorHandler, ClusterInformation clusterInformation, @Nullable String webInterfaceUrl) throws Exception {
+		final ResourceManagerRuntimeServicesConfiguration rmServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
+		final ResourceManagerRuntimeServices rmRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
+			rmServicesConfiguration,
+			highAvailabilityServices,
+			rpcService.getScheduledExecutor());
+
+		return new MesosResourceManager(
+			rpcService,
+			ResourceManager.RESOURCE_MANAGER_NAME,
+			resourceId,
+			highAvailabilityServices,
+			heartbeatServices,
+			rmRuntimeServices.getSlotManager(),
+			metricRegistry,
+			rmRuntimeServices.getJobLeaderIdService(),
+			clusterInformation,
+			fatalErrorHandler,
+			configuration,
+			mesosServices,
+			schedulerConfiguration,
+			taskManagerParameters,
+			taskManagerContainerSpec,
+			webInterfaceUrl);
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherFactory.java
new file mode 100644
index 0000000..5952299
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherFactory.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+
+import javax.annotation.Nullable;
+
+/**
+ * {@link Dispatcher} factory interface.
+ */
+public interface DispatcherFactory<T extends Dispatcher> {
+
+	/**
+	 * Create a {@link Dispatcher} of the given type {@link T}.
+	 */
+	T createDispatcher(
+		Configuration configuration,
+		RpcService rpcService,
+		HighAvailabilityServices highAvailabilityServices,
+		ResourceManagerGateway resourceManagerGateway,
+		BlobServer blobServer,
+		HeartbeatServices heartbeatServices,
+		JobManagerMetricGroup jobManagerMetricGroup,
+		@Nullable String metricQueryServicePath,
+		ArchivedExecutionGraphStore archivedExecutionGraphStore,
+		FatalErrorHandler fatalErrorHandler,
+		@Nullable String restAddress,
+		HistoryServerArchivist historyServerArchivist) throws Exception;
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobDispatcherFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobDispatcherFactory.java
new file mode 100644
index 0000000..488f2fc
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobDispatcherFactory.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
+import org.apache.flink.runtime.entrypoint.JobGraphRetriever;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+
+import javax.annotation.Nullable;
+
+import static org.apache.flink.runtime.entrypoint.ClusterEntrypoint.EXECUTION_MODE;
+
+/**
+ * {@link DispatcherFactory} which creates a {@link MiniDispatcher}.
+ */
+public class JobDispatcherFactory implements DispatcherFactory<MiniDispatcher> {
+
+	private final JobGraphRetriever jobGraphRetriever;
+
+	public JobDispatcherFactory(JobGraphRetriever jobGraphRetriever) {
+		this.jobGraphRetriever = jobGraphRetriever;
+	}
+
+	@Override
+	public MiniDispatcher createDispatcher(
+			Configuration configuration,
+			RpcService rpcService,
+			HighAvailabilityServices highAvailabilityServices,
+			ResourceManagerGateway resourceManagerGateway,
+			BlobServer blobServer,
+			HeartbeatServices heartbeatServices,
+			JobManagerMetricGroup jobManagerMetricGroup,
+			@Nullable String metricQueryServicePath,
+			ArchivedExecutionGraphStore archivedExecutionGraphStore,
+			FatalErrorHandler fatalErrorHandler,
+			@Nullable String restAddress,
+			HistoryServerArchivist historyServerArchivist) throws Exception {
+		final JobGraph jobGraph = jobGraphRetriever.retrieveJobGraph(configuration);
+
+		final String executionModeValue = configuration.getString(EXECUTION_MODE);
+
+		final ClusterEntrypoint.ExecutionMode executionMode = ClusterEntrypoint.ExecutionMode.valueOf(executionModeValue);
+
+		return new MiniDispatcher(
+			rpcService,
+			Dispatcher.DISPATCHER_NAME,
+			configuration,
+			highAvailabilityServices,
+			resourceManagerGateway,
+			blobServer,
+			heartbeatServices,
+			jobManagerMetricGroup,
+			metricQueryServicePath,
+			archivedExecutionGraphStore,
+			Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE,
+			fatalErrorHandler,
+			restAddress,
+			historyServerArchivist,
+			jobGraph,
+			executionMode);
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SessionDispatcherFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SessionDispatcherFactory.java
new file mode 100644
index 0000000..18e29a0
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SessionDispatcherFactory.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+
+import javax.annotation.Nullable;
+
+/**
+ * {@link DispatcherFactory} which creates a {@link StandaloneDispatcher}.
+ */
+public enum SessionDispatcherFactory implements DispatcherFactory<Dispatcher> {
+	INSTANCE;
+
+	@Override
+	public Dispatcher createDispatcher(
+				Configuration configuration,
+				RpcService rpcService,
+				HighAvailabilityServices highAvailabilityServices,
+				ResourceManagerGateway resourceManagerGateway,
+				BlobServer blobServer,
+				HeartbeatServices heartbeatServices,
+				JobManagerMetricGroup jobManagerMetricGroup,
+				@Nullable String metricQueryServicePath,
+				ArchivedExecutionGraphStore archivedExecutionGraphStore,
+				FatalErrorHandler fatalErrorHandler,
+				@Nullable String restAddress,
+				HistoryServerArchivist historyServerArchivist) throws Exception {
+		// create the default dispatcher
+		return new StandaloneDispatcher(
+			rpcService,
+			Dispatcher.DISPATCHER_NAME,
+			configuration,
+			highAvailabilityServices,
+			resourceManagerGateway,
+			blobServer,
+			heartbeatServices,
+			jobManagerMetricGroup,
+			metricQueryServicePath,
+			archivedExecutionGraphStore,
+			Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE,
+			fatalErrorHandler,
+			restAddress,
+			historyServerArchivist);
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterComponent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterComponent.java
new file mode 100644
index 0000000..af02729
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterComponent.java
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ConfigurationUtils;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.runtime.dispatcher.DispatcherFactory;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.DispatcherId;
+import org.apache.flink.runtime.dispatcher.HistoryServerArchivist;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.metrics.util.MetricUtils;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
+import org.apache.flink.runtime.rest.RestEndpointFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
+import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever;
+import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
+
+import akka.actor.ActorSystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Component which starts a {@link Dispatcher}, {@link ResourceManager} and {@link WebMonitorEndpoint}
+ * in the same process.
+ */
+public class ClusterComponent<T extends Dispatcher> implements AutoCloseableAsync {
+
+	private static final Logger LOG = LoggerFactory.getLogger(ClusterComponent.class);
+
+	private final Object lock = new Object();
+
+	private final DispatcherFactory<T> dispatcherFactory;
+
+	private final ResourceManagerFactory<?> resourceManagerFactory;
+
+	private final RestEndpointFactory<?> restEndpointFactory;
+
+	private final CompletableFuture<Void> terminationFuture;
+
+	private final CompletableFuture<ApplicationStatus> shutDownFuture;
+
+	@GuardedBy("lock")
+	private State state;
+
+	@GuardedBy("lock")
+	private ResourceManager<?> resourceManager;
+
+	@GuardedBy("lock")
+	private T dispatcher;
+
+	@GuardedBy("lock")
+	private LeaderRetrievalService dispatcherLeaderRetrievalService;
+
+	@GuardedBy("lock")
+	private LeaderRetrievalService resourceManagerRetrievalService;
+
+	@GuardedBy("lock")
+	private WebMonitorEndpoint<?> webMonitorEndpoint;
+
+	@GuardedBy("lock")
+	private JobManagerMetricGroup jobManagerMetricGroup;
+
+	public ClusterComponent(
+			DispatcherFactory<T> dispatcherFactory,
+			ResourceManagerFactory<?> resourceManagerFactory,
+			RestEndpointFactory<?> restEndpointFactory) {
+		this.dispatcherFactory = dispatcherFactory;
+		this.resourceManagerFactory = resourceManagerFactory;
+		this.restEndpointFactory = restEndpointFactory;
+		this.terminationFuture = new CompletableFuture<>();
+		this.shutDownFuture = new CompletableFuture<>();
+		this.state = State.CREATED;
+
+		terminationFuture.whenComplete(
+			(aVoid, throwable) -> {
+				if (throwable != null) {
+					shutDownFuture.completeExceptionally(throwable);
+				} else {
+					shutDownFuture.complete(ApplicationStatus.SUCCEEDED);
+				}
+			});
+	}
+
+	public T getDispatcher() {
+		synchronized (lock) {
+			return dispatcher;
+		}
+	}
+
+	public CompletableFuture<Void> getTerminationFuture() {
+		return terminationFuture;
+	}
+
+	public CompletableFuture<ApplicationStatus> getShutDownFuture() {
+		return shutDownFuture;
+	}
+
+	public void startComponent(
+			Configuration configuration,
+			RpcService rpcService,
+			HighAvailabilityServices highAvailabilityServices,
+			BlobServer blobServer,
+			HeartbeatServices heartbeatServices,
+			MetricRegistry metricRegistry,
+			ArchivedExecutionGraphStore archivedExecutionGraphStore,
+			FatalErrorHandler fatalErrorHandler) throws Exception {
+		synchronized (lock) {
+			Preconditions.checkState(state == State.CREATED);
+			state = State.RUNNING;
+
+			dispatcherLeaderRetrievalService = highAvailabilityServices.getDispatcherLeaderRetriever();
+
+			resourceManagerRetrievalService = highAvailabilityServices.getResourceManagerLeaderRetriever();
+
+			LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever = new RpcGatewayRetriever<>(
+				rpcService,
+				DispatcherGateway.class,
+				DispatcherId::fromUuid,
+				10,
+				Time.milliseconds(50L));
+
+			LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever = new RpcGatewayRetriever<>(
+				rpcService,
+				ResourceManagerGateway.class,
+				ResourceManagerId::fromUuid,
+				10,
+				Time.milliseconds(50L));
+
+			// TODO: Remove once we have ported the MetricFetcher to the RpcEndpoint
+			final ActorSystem actorSystem = ((AkkaRpcService) rpcService).getActorSystem();
+			final Time timeout = Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT));
+
+			webMonitorEndpoint = restEndpointFactory.createRestEndpoint(
+				configuration,
+				dispatcherGatewayRetriever,
+				resourceManagerGatewayRetriever,
+				blobServer,
+				rpcService.getExecutor(),
+				new AkkaQueryServiceRetriever(actorSystem, timeout),
+				highAvailabilityServices.getWebMonitorLeaderElectionService(),
+				fatalErrorHandler);
+
+			LOG.debug("Starting Dispatcher REST endpoint.");
+			webMonitorEndpoint.start();
+
+			resourceManager = resourceManagerFactory.createResourceManager(
+				configuration,
+				ResourceID.generate(),
+				rpcService,
+				highAvailabilityServices,
+				heartbeatServices,
+				metricRegistry,
+				fatalErrorHandler,
+				new ClusterInformation(rpcService.getAddress(), blobServer.getPort()),
+				webMonitorEndpoint.getRestBaseUrl());
+
+			jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(
+				metricRegistry,
+				rpcService.getAddress(),
+				ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration));
+
+			final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, webMonitorEndpoint);
+
+			dispatcher = dispatcherFactory.createDispatcher(
+				configuration,
+				rpcService,
+				highAvailabilityServices,
+				resourceManager.getSelfGateway(ResourceManagerGateway.class),
+				blobServer,
+				heartbeatServices,
+				jobManagerMetricGroup,
+				metricRegistry.getMetricQueryServicePath(),
+				archivedExecutionGraphStore,
+				fatalErrorHandler,
+				webMonitorEndpoint.getRestBaseUrl(),
+				historyServerArchivist);
+
+			registerShutDownFuture(dispatcher, shutDownFuture);
+
+			LOG.debug("Starting ResourceManager.");
+			resourceManager.start();
+			resourceManagerRetrievalService.start(resourceManagerGatewayRetriever);
+
+			LOG.debug("Starting Dispatcher.");
+			dispatcher.start();
+			dispatcherLeaderRetrievalService.start(dispatcherGatewayRetriever);
+		}
+	}
+
+	protected void registerShutDownFuture(T dispatcher, CompletableFuture<ApplicationStatus> shutDownFuture) {
+			dispatcher
+				.getTerminationFuture()
+				.whenComplete(
+					(aVoid, throwable) -> {
+						if (throwable != null) {
+							shutDownFuture.completeExceptionally(throwable);
+						} else {
+							shutDownFuture.complete(ApplicationStatus.SUCCEEDED);
+						}
+					});
+	}
+
+	@Override
+	public CompletableFuture<Void> closeAsync() {
+		synchronized (lock) {
+			if (state == State.RUNNING) {
+				Exception exception = null;
+
+				final Collection<CompletableFuture<Void>> terminationFutures = new ArrayList<>(4);
+
+				if (dispatcherLeaderRetrievalService != null) {
+					try {
+						dispatcherLeaderRetrievalService.stop();
+					} catch (Exception e) {
+						exception = ExceptionUtils.firstOrSuppressed(e, exception);
+					}
+				}
+
+				if (resourceManagerRetrievalService != null) {
+					try {
+						resourceManagerRetrievalService.stop();
+					} catch (Exception e) {
+						exception = ExceptionUtils.firstOrSuppressed(e, exception);
+					}
+				}
+
+				if (webMonitorEndpoint != null) {
+					terminationFutures.add(webMonitorEndpoint.closeAsync());
+				}
+
+				if (dispatcher != null) {
+					dispatcher.shutDown();
+					terminationFutures.add(dispatcher.getTerminationFuture());
+				}
+
+				if (resourceManager != null) {
+					resourceManager.shutDown();
+					terminationFutures.add(resourceManager.getTerminationFuture());
+				}
+
+				if (exception != null) {
+					terminationFutures.add(FutureUtils.completedExceptionally(exception));
+				}
+
+				final CompletableFuture<Void> componentTerminationFuture = FutureUtils.completeAll(terminationFutures);
+
+				final CompletableFuture<Void> metricGroupTerminationFuture;
+
+				if (jobManagerMetricGroup != null) {
+					metricGroupTerminationFuture = FutureUtils.runAfterwards(
+						componentTerminationFuture,
+						() -> {
+							synchronized (lock) {
+								jobManagerMetricGroup.close();
+							}
+						});
+				} else {
+					metricGroupTerminationFuture = componentTerminationFuture;
+				}
+
+				metricGroupTerminationFuture.whenComplete((aVoid, throwable) -> {
+					if (throwable != null) {
+						terminationFuture.completeExceptionally(throwable);
+					} else {
+						terminationFuture.complete(aVoid);
+					}
+				});
+			} else if (state == State.CREATED) {
+				terminationFuture.complete(null);
+			}
+
+			state = State.TERMINATED;
+			return terminationFuture;
+		}
+	}
+
+	/**
+	 * Deregister the Flink application from the resource management system by signalling
+	 * the {@link ResourceManager}.
+	 *
+	 * @param applicationStatus to terminate the application with
+	 * @param diagnostics additional information about the shut down, can be {@code null}
+	 * @return Future which is completed once the shut down
+	 */
+	public CompletableFuture<Void> deregisterApplication(ApplicationStatus applicationStatus, @Nullable String diagnostics) {
+		synchronized (lock) {
+			if (resourceManager != null) {
+				final ResourceManagerGateway selfGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class);
+				return selfGateway.deregisterApplication(applicationStatus, diagnostics).thenApply(ack -> null);
+			} else {
+				return CompletableFuture.completedFuture(null);
+			}
+		}
+	}
+
+	enum State {
+		CREATED,
+		RUNNING,
+		TERMINATED
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index 1a8c058..39e1265 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -32,32 +32,19 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.blob.TransientBlobCache;
-import org.apache.flink.runtime.blob.TransientBlobService;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
-import org.apache.flink.runtime.dispatcher.Dispatcher;
-import org.apache.flink.runtime.dispatcher.DispatcherGateway;
-import org.apache.flink.runtime.dispatcher.DispatcherId;
-import org.apache.flink.runtime.dispatcher.HistoryServerArchivist;
 import org.apache.flink.runtime.dispatcher.MiniDispatcher;
 import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
-import org.apache.flink.runtime.leaderelection.LeaderElectionService;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.MetricRegistryImpl;
-import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
-import org.apache.flink.runtime.metrics.util.MetricUtils;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
@@ -65,11 +52,6 @@ import org.apache.flink.runtime.security.SecurityConfiguration;
 import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
-import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
-import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
-import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
-import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever;
-import org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.Preconditions;
@@ -109,7 +91,6 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 
 	protected static final Logger LOG = LoggerFactory.getLogger(ClusterEntrypoint.class);
 
-	protected static final int SUCCESS_RETURN_CODE = 0;
 	protected static final int STARTUP_FAILURE_RETURN_CODE = 1;
 	protected static final int RUNTIME_FAILURE_RETURN_CODE = 2;
 
@@ -125,6 +106,9 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 	private final AtomicBoolean isShutDown = new AtomicBoolean(false);
 
 	@GuardedBy("lock")
+	private ClusterComponent<?> dispatcherComponent;
+
+	@GuardedBy("lock")
 	private MetricRegistryImpl metricRegistry;
 
 	@GuardedBy("lock")
@@ -140,32 +124,11 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 	private RpcService commonRpcService;
 
 	@GuardedBy("lock")
-	private ResourceManager<?> resourceManager;
-
-	@GuardedBy("lock")
-	private Dispatcher dispatcher;
-
-	@GuardedBy("lock")
-	private LeaderRetrievalService dispatcherLeaderRetrievalService;
-
-	@GuardedBy("lock")
-	private LeaderRetrievalService resourceManagerRetrievalService;
-
-	@GuardedBy("lock")
-	private WebMonitorEndpoint<?> webMonitorEndpoint;
-
-	@GuardedBy("lock")
 	private ArchivedExecutionGraphStore archivedExecutionGraphStore;
 
 	@GuardedBy("lock")
 	private TransientBlobCache transientBlobCache;
 
-	@GuardedBy("lock")
-	private ClusterInformation clusterInformation;
-
-	@GuardedBy("lock")
-	private JobManagerMetricGroup jobManagerMetricGroup;
-
 	private final Thread shutDownHook;
 
 	protected ClusterEntrypoint(Configuration configuration) {
@@ -223,7 +186,7 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 		return SecurityUtils.getInstalledContext();
 	}
 
-	protected void runCluster(Configuration configuration) throws Exception {
+	private void runCluster(Configuration configuration) throws Exception {
 		synchronized (lock) {
 			initializeServices(configuration);
 
@@ -231,27 +194,35 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 			configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
 			configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());
 
-			startClusterComponents(
+			dispatcherComponent = createDispatcherComponent(configuration);
+
+			dispatcherComponent.startComponent(
 				configuration,
 				commonRpcService,
 				haServices,
 				blobServer,
 				heartbeatServices,
-				metricRegistry);
+				metricRegistry,
+				archivedExecutionGraphStore,
+				this);
 
-			dispatcher.getTerminationFuture().whenComplete(
-				(Void value, Throwable throwable) -> {
+			dispatcherComponent.getShutDownFuture().whenComplete(
+				(ApplicationStatus applicationStatus, Throwable throwable) -> {
 					if (throwable != null) {
-						LOG.info("Could not properly terminate the Dispatcher.", throwable);
+						shutDownAndTerminate(
+							RUNTIME_FAILURE_RETURN_CODE,
+							ApplicationStatus.UNKNOWN,
+							ExceptionUtils.stringifyException(throwable),
+							false);
+					} else {
+						// This is the general shutdown path. If a separate more specific shutdown was
+						// already triggered, this will do nothing
+						shutDownAndTerminate(
+							applicationStatus.processExitCode(),
+							applicationStatus,
+							null,
+							true);
 					}
-
-					// This is the general shutdown path. If a separate more specific shutdown was
-					// already triggered, this will do nothing
-					shutDownAndTerminate(
-						SUCCESS_RETURN_CODE,
-						ApplicationStatus.SUCCEEDED,
-						throwable != null ? throwable.getMessage() : null,
-						true);
 				});
 		}
 	}
@@ -283,99 +254,11 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 
 			archivedExecutionGraphStore = createSerializableExecutionGraphStore(configuration, commonRpcService.getScheduledExecutor());
 
-			clusterInformation = new ClusterInformation(
-				commonRpcService.getAddress(),
-				blobServer.getPort());
-
 			transientBlobCache = new TransientBlobCache(
 				configuration,
 				new InetSocketAddress(
-					clusterInformation.getBlobServerHostname(),
-					clusterInformation.getBlobServerPort()));
-		}
-	}
-
-	protected void startClusterComponents(
-			Configuration configuration,
-			RpcService rpcService,
-			HighAvailabilityServices highAvailabilityServices,
-			BlobServer blobServer,
-			HeartbeatServices heartbeatServices,
-			MetricRegistry metricRegistry) throws Exception {
-		synchronized (lock) {
-			dispatcherLeaderRetrievalService = highAvailabilityServices.getDispatcherLeaderRetriever();
-
-			resourceManagerRetrievalService = highAvailabilityServices.getResourceManagerLeaderRetriever();
-
-			LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever = new RpcGatewayRetriever<>(
-				rpcService,
-				DispatcherGateway.class,
-				DispatcherId::fromUuid,
-				10,
-				Time.milliseconds(50L));
-
-			LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever = new RpcGatewayRetriever<>(
-				rpcService,
-				ResourceManagerGateway.class,
-				ResourceManagerId::fromUuid,
-				10,
-				Time.milliseconds(50L));
-
-			// TODO: Remove once we have ported the MetricFetcher to the RpcEndpoint
-			final ActorSystem actorSystem = ((AkkaRpcService) rpcService).getActorSystem();
-			final Time timeout = Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT));
-
-			webMonitorEndpoint = createRestEndpoint(
-				configuration,
-				dispatcherGatewayRetriever,
-				resourceManagerGatewayRetriever,
-				transientBlobCache,
-				rpcService.getExecutor(),
-				new AkkaQueryServiceRetriever(actorSystem, timeout),
-				highAvailabilityServices.getWebMonitorLeaderElectionService());
-
-			LOG.debug("Starting Dispatcher REST endpoint.");
-			webMonitorEndpoint.start();
-
-			resourceManager = createResourceManager(
-				configuration,
-				ResourceID.generate(),
-				rpcService,
-				highAvailabilityServices,
-				heartbeatServices,
-				metricRegistry,
-				this,
-				clusterInformation,
-				webMonitorEndpoint.getRestBaseUrl());
-
-			jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(
-				metricRegistry,
-				rpcService.getAddress(),
-				ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration));
-
-			final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, webMonitorEndpoint);
-
-			dispatcher = createDispatcher(
-				configuration,
-				rpcService,
-				highAvailabilityServices,
-				resourceManager.getSelfGateway(ResourceManagerGateway.class),
-				blobServer,
-				heartbeatServices,
-				jobManagerMetricGroup,
-				metricRegistry.getMetricQueryServicePath(),
-				archivedExecutionGraphStore,
-				this,
-				webMonitorEndpoint.getRestBaseUrl(),
-				historyServerArchivist);
-
-			LOG.debug("Starting ResourceManager.");
-			resourceManager.start();
-			resourceManagerRetrievalService.start(resourceManagerGatewayRetriever);
-
-			LOG.debug("Starting Dispatcher.");
-			dispatcher.start();
-			dispatcherLeaderRetrievalService.start(dispatcherGatewayRetriever);
+					commonRpcService.getAddress(),
+					blobServer.getPort()));
 		}
 	}
 
@@ -477,63 +360,6 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 		}
 	}
 
-	protected CompletableFuture<Void> stopClusterComponents() {
-		synchronized (lock) {
-
-			Exception exception = null;
-
-			final Collection<CompletableFuture<Void>> terminationFutures = new ArrayList<>(4);
-
-			if (dispatcherLeaderRetrievalService != null) {
-				try {
-					dispatcherLeaderRetrievalService.stop();
-				} catch (Exception e) {
-					exception = ExceptionUtils.firstOrSuppressed(e, exception);
-				}
-			}
-
-			if (resourceManagerRetrievalService != null) {
-				try {
-					resourceManagerRetrievalService.stop();
-				} catch (Exception e) {
-					exception = ExceptionUtils.firstOrSuppressed(e, exception);
-				}
-			}
-
-			if (webMonitorEndpoint != null) {
-				terminationFutures.add(webMonitorEndpoint.closeAsync());
-			}
-
-			if (dispatcher != null) {
-				dispatcher.shutDown();
-				terminationFutures.add(dispatcher.getTerminationFuture());
-			}
-
-			if (resourceManager != null) {
-				resourceManager.shutDown();
-				terminationFutures.add(resourceManager.getTerminationFuture());
-			}
-
-			if (exception != null) {
-				terminationFutures.add(FutureUtils.completedExceptionally(exception));
-			}
-
-			final CompletableFuture<Void> componentTerminationFuture = FutureUtils.completeAll(terminationFutures);
-
-			if (jobManagerMetricGroup != null) {
-				return FutureUtils.runAfterwards(
-					componentTerminationFuture,
-					() -> {
-						synchronized (lock) {
-							jobManagerMetricGroup.close();
-						}
-					});
-			} else {
-				return componentTerminationFuture;
-			}
-		}
-	}
-
 	@Override
 	public void onFatalError(Throwable exception) {
 		LOG.error("Fatal error occurred in the cluster entrypoint.", exception);
@@ -563,14 +389,10 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 		if (isShutDown.compareAndSet(false, true)) {
 			LOG.info("Stopping {}.", getClass().getSimpleName());
 
-			final CompletableFuture<Void> shutDownApplicationFuture = deregisterApplication(applicationStatus, diagnostics);
-
-			final CompletableFuture<Void> componentShutdownFuture = FutureUtils.composeAfterwards(
-				shutDownApplicationFuture,
-				this::stopClusterComponents);
+			final CompletableFuture<Void> shutDownApplicationFuture = closeClusterComponent(applicationStatus, diagnostics);
 
 			final CompletableFuture<Void> serviceShutdownFuture = FutureUtils.composeAfterwards(
-				componentShutdownFuture,
+				shutDownApplicationFuture,
 				() -> stopClusterServices(cleanupHaData));
 
 			final CompletableFuture<Void> cleanupDirectoriesFuture = FutureUtils.runAfterwards(
@@ -597,10 +419,11 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 		boolean cleanupHaData) {
 
 		if (isTerminating.compareAndSet(false, true)) {
-			LOG.info("Shut down and terminate {} with return code {} and application status {}.",
+			LOG.info("Shut down and terminate {} with return code {} and application status {}. Diagnostics {}.",
 				getClass().getSimpleName(),
 				returnCode,
-				applicationStatus);
+				applicationStatus,
+				diagnostics);
 
 			shutDownAsync(
 				cleanupHaData,
@@ -628,11 +451,12 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 	 * @param diagnostics additional information about the shut down, can be {@code null}
 	 * @return Future which is completed once the shut down
 	 */
-	private CompletableFuture<Void> deregisterApplication(ApplicationStatus applicationStatus, @Nullable String diagnostics) {
+	private CompletableFuture<Void> closeClusterComponent(ApplicationStatus applicationStatus, @Nullable String diagnostics) {
 		synchronized (lock) {
-			if (resourceManager != null) {
-				final ResourceManagerGateway selfGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class);
-				return selfGateway.deregisterApplication(applicationStatus, diagnostics).thenApply(ack -> null);
+			if (dispatcherComponent != null) {
+				final CompletableFuture<Void> deregisterApplicationFuture = dispatcherComponent.deregisterApplication(applicationStatus, diagnostics);
+
+				return FutureUtils.runAfterwards(deregisterApplicationFuture, dispatcherComponent::closeAsync);
 			} else {
 				return CompletableFuture.completedFuture(null);
 			}
@@ -656,39 +480,7 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 	// Abstract methods
 	// --------------------------------------------------
 
-	protected abstract Dispatcher createDispatcher(
-		Configuration configuration,
-		RpcService rpcService,
-		HighAvailabilityServices highAvailabilityServices,
-		ResourceManagerGateway resourceManagerGateway,
-		BlobServer blobServer,
-		HeartbeatServices heartbeatServices,
-		JobManagerMetricGroup jobManagerMetricGroup,
-		@Nullable String metricQueryServicePath,
-		ArchivedExecutionGraphStore archivedExecutionGraphStore,
-		FatalErrorHandler fatalErrorHandler,
-		@Nullable String restAddress,
-		HistoryServerArchivist historyServerArchivist) throws Exception;
-
-	protected abstract ResourceManager<?> createResourceManager(
-		Configuration configuration,
-		ResourceID resourceId,
-		RpcService rpcService,
-		HighAvailabilityServices highAvailabilityServices,
-		HeartbeatServices heartbeatServices,
-		MetricRegistry metricRegistry,
-		FatalErrorHandler fatalErrorHandler,
-		ClusterInformation clusterInformation,
-		@Nullable String webInterfaceUrl) throws Exception;
-
-	protected abstract WebMonitorEndpoint<?> createRestEndpoint(
-		Configuration configuration,
-		LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever,
-		LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
-		TransientBlobService transientBlobService,
-		Executor executor,
-		MetricQueryServiceRetriever metricQueryServiceRetriever,
-		LeaderElectionService leaderElectionService) throws Exception;
+	protected abstract ClusterComponent<?> createDispatcherComponent(Configuration configuration);
 
 	protected abstract ArchivedExecutionGraphStore createSerializableExecutionGraphStore(
 		Configuration configuration,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/FileJobGraphRetriever.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/FileJobGraphRetriever.java
new file mode 100644
index 0000000..7a194f6
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/FileJobGraphRetriever.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.util.FlinkException;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+
+/**
+ * {@link JobGraphRetriever} implementation which retrieves the {@link JobGraph} from
+ * a file on disk.
+ */
+public class FileJobGraphRetriever implements JobGraphRetriever {
+
+	public static final ConfigOption<String> JOB_GRAPH_FILE_PATH = ConfigOptions
+		.key("internal.jobgraph-path")
+		.defaultValue("job.graph");
+
+	@Nonnull
+	private final String jobGraphFile;
+
+	public FileJobGraphRetriever(@Nonnull String jobGraphFile) {
+		this.jobGraphFile = jobGraphFile;
+	}
+
+	@Override
+	public JobGraph retrieveJobGraph(Configuration configuration) throws FlinkException {
+		File fp = new File(jobGraphFile);
+
+		try (FileInputStream input = new FileInputStream(fp);
+			ObjectInputStream obInput = new ObjectInputStream(input)) {
+
+			return (JobGraph) obInput.readObject();
+		} catch (FileNotFoundException e) {
+			throw new FlinkException("Could not find the JobGraph file.", e);
+		} catch (ClassNotFoundException | IOException e) {
+			throw new FlinkException("Could not load the JobGraph from file.", e);
+		}
+	}
+
+	public static FileJobGraphRetriever createFrom(Configuration configuration) {
+		return new FileJobGraphRetriever(configuration.getString(JOB_GRAPH_FILE_PATH));
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterComponent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterComponent.java
new file mode 100644
index 0000000..17583ac
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterComponent.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint;
+
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.dispatcher.JobDispatcherFactory;
+import org.apache.flink.runtime.dispatcher.MiniDispatcher;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
+import org.apache.flink.runtime.rest.JobRestEndpointFactory;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * {@link ClusterComponent} for a job cluster. The dispatcher component starts
+ * a {@link MiniDispatcher}.
+ */
+public class JobClusterComponent extends ClusterComponent<MiniDispatcher> {
+
+	public JobClusterComponent(ResourceManagerFactory<?> resourceManagerFactory, JobGraphRetriever jobActions) {
+		super(new JobDispatcherFactory(jobActions), resourceManagerFactory, JobRestEndpointFactory.INSTANCE);
+	}
+
+	@Override
+	protected void registerShutDownFuture(MiniDispatcher dispatcher, CompletableFuture<ApplicationStatus> shutDownFuture) {
+		super.registerShutDownFuture(dispatcher, shutDownFuture);
+
+		dispatcher.getJobTerminationFuture().whenComplete((applicationStatus, throwable) -> {
+			if (throwable != null) {
+				shutDownFuture.completeExceptionally(throwable);
+			} else {
+				shutDownFuture.complete(applicationStatus);
+			}
+		});
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
index 80a9da2..0426df9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
@@ -19,35 +19,9 @@
 package org.apache.flink.runtime.entrypoint;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.blob.BlobServer;
-import org.apache.flink.runtime.blob.TransientBlobService;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
-import org.apache.flink.runtime.dispatcher.Dispatcher;
-import org.apache.flink.runtime.dispatcher.DispatcherGateway;
-import org.apache.flink.runtime.dispatcher.HistoryServerArchivist;
 import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore;
-import org.apache.flink.runtime.dispatcher.MiniDispatcher;
-import org.apache.flink.runtime.heartbeat.HeartbeatServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint;
-import org.apache.flink.runtime.leaderelection.LeaderElectionService;
-import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
-import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
-import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
-import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
-import org.apache.flink.util.FlinkException;
-
-import javax.annotation.Nullable;
-
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
 
 /**
  * Base class for per-job cluster entry points.
@@ -59,81 +33,9 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
 	}
 
 	@Override
-	protected MiniDispatcherRestEndpoint createRestEndpoint(
-			Configuration configuration,
-			LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever,
-			LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
-			TransientBlobService transientBlobService,
-			Executor executor,
-			MetricQueryServiceRetriever metricQueryServiceRetriever,
-			LeaderElectionService leaderElectionService) throws Exception {
-		final RestHandlerConfiguration restHandlerConfiguration = RestHandlerConfiguration.fromConfiguration(configuration);
-
-		return new MiniDispatcherRestEndpoint(
-			RestServerEndpointConfiguration.fromConfiguration(configuration),
-			dispatcherGatewayRetriever,
-			configuration,
-			restHandlerConfiguration,
-			resourceManagerGatewayRetriever,
-			transientBlobService,
-			executor,
-			metricQueryServiceRetriever,
-			leaderElectionService,
-			this);
-	}
-
-	@Override
 	protected ArchivedExecutionGraphStore createSerializableExecutionGraphStore(
 			Configuration configuration,
 			ScheduledExecutor scheduledExecutor) {
 		return new MemoryArchivedExecutionGraphStore();
 	}
-
-	@Override
-	protected Dispatcher createDispatcher(
-			Configuration configuration,
-			RpcService rpcService,
-			HighAvailabilityServices highAvailabilityServices,
-			ResourceManagerGateway resourceManagerGateway,
-			BlobServer blobServer,
-			HeartbeatServices heartbeatServices,
-			JobManagerMetricGroup jobManagerMetricGroup,
-			@Nullable String metricQueryServicePath,
-			ArchivedExecutionGraphStore archivedExecutionGraphStore,
-			FatalErrorHandler fatalErrorHandler,
-			@Nullable String restAddress,
-			HistoryServerArchivist historyServerArchivist) throws Exception {
-
-		final JobGraph jobGraph = retrieveJobGraph(configuration);
-
-		final String executionModeValue = configuration.getString(EXECUTION_MODE);
-
-		final ExecutionMode executionMode = ExecutionMode.valueOf(executionModeValue);
-
-		final MiniDispatcher dispatcher = new MiniDispatcher(
-			rpcService,
-			Dispatcher.DISPATCHER_NAME,
-			configuration,
-			highAvailabilityServices,
-			resourceManagerGateway,
-			blobServer,
-			heartbeatServices,
-			jobManagerMetricGroup,
-			metricQueryServicePath,
-			archivedExecutionGraphStore,
-			Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE,
-			fatalErrorHandler,
-			restAddress,
-			historyServerArchivist,
-			jobGraph,
-			executionMode);
-
-		registerShutdownActions(dispatcher.getJobTerminationFuture());
-
-		return dispatcher;
-	}
-
-	protected abstract JobGraph retrieveJobGraph(Configuration configuration) throws FlinkException;
-
-	protected abstract void registerShutdownActions(CompletableFuture<ApplicationStatus> terminationFuture);
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobGraphRetriever.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobGraphRetriever.java
new file mode 100644
index 0000000..e2ace15
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobGraphRetriever.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.util.FlinkException;
+
+/**
+ * Interface which allows to retrieve the {@link JobGraph}.
+ */
+public interface JobGraphRetriever {
+
+	/**
+	 * Retrieve the {@link JobGraph}.
+	 *
+	 * @param configuration cluster configuration
+	 * @return the retrieved {@link JobGraph}.
+	 * @throws FlinkException if the {@link JobGraph} could not be retrieved
+	 */
+	JobGraph retrieveJobGraph(Configuration configuration) throws FlinkException;
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterComponent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterComponent.java
new file mode 100644
index 0000000..8ab0701
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterComponent.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint;
+
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.runtime.dispatcher.SessionDispatcherFactory;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
+import org.apache.flink.runtime.rest.SessionRestEndpointFactory;
+
+/**
+ * {@link ClusterComponent} used by session clusters.
+ */
+public class SessionClusterComponent extends ClusterComponent<Dispatcher> {
+
+	public SessionClusterComponent(ResourceManagerFactory<?> resourceManagerFactory) {
+		super(SessionDispatcherFactory.INSTANCE, resourceManagerFactory, SessionRestEndpointFactory.INSTANCE);
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
index 40eb8b7..1fb693c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
@@ -22,35 +22,14 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.runtime.blob.BlobServer;
-import org.apache.flink.runtime.blob.TransientBlobService;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
-import org.apache.flink.runtime.dispatcher.Dispatcher;
-import org.apache.flink.runtime.dispatcher.DispatcherGateway;
-import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
 import org.apache.flink.runtime.dispatcher.FileArchivedExecutionGraphStore;
-import org.apache.flink.runtime.dispatcher.HistoryServerArchivist;
-import org.apache.flink.runtime.dispatcher.StandaloneDispatcher;
-import org.apache.flink.runtime.heartbeat.HeartbeatServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.leaderelection.LeaderElectionService;
-import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
-import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
-import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
-import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 
 import org.apache.flink.shaded.guava18.com.google.common.base.Ticker;
 
-import javax.annotation.Nullable;
-
 import java.io.File;
 import java.io.IOException;
-import java.util.concurrent.Executor;
 
 /**
  * Base class for session cluster entry points.
@@ -77,62 +56,4 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
 			scheduledExecutor,
 			Ticker.systemTicker());
 	}
-
-	@Override
-	protected DispatcherRestEndpoint createRestEndpoint(
-			Configuration configuration,
-			LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever,
-			LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
-			TransientBlobService transientBlobService,
-			Executor executor,
-			MetricQueryServiceRetriever metricQueryServiceRetriever,
-			LeaderElectionService leaderElectionService) throws Exception {
-
-		final RestHandlerConfiguration restHandlerConfiguration = RestHandlerConfiguration.fromConfiguration(configuration);
-
-		return new DispatcherRestEndpoint(
-			RestServerEndpointConfiguration.fromConfiguration(configuration),
-			dispatcherGatewayRetriever,
-			configuration,
-			restHandlerConfiguration,
-			resourceManagerGatewayRetriever,
-			transientBlobService,
-			executor,
-			metricQueryServiceRetriever,
-			leaderElectionService,
-			this);
-	}
-
-	@Override
-	protected Dispatcher createDispatcher(
-			Configuration configuration,
-			RpcService rpcService,
-			HighAvailabilityServices highAvailabilityServices,
-			ResourceManagerGateway resourceManagerGateway,
-			BlobServer blobServer,
-			HeartbeatServices heartbeatServices,
-			JobManagerMetricGroup jobManagerMetricGroup,
-			@Nullable String metricQueryServicePath,
-			ArchivedExecutionGraphStore archivedExecutionGraphStore,
-			FatalErrorHandler fatalErrorHandler,
-			@Nullable String restAddress,
-			HistoryServerArchivist historyServerArchivist) throws Exception {
-
-		// create the default dispatcher
-		return new StandaloneDispatcher(
-			rpcService,
-			Dispatcher.DISPATCHER_NAME,
-			configuration,
-			highAvailabilityServices,
-			resourceManagerGateway,
-			blobServer,
-			heartbeatServices,
-			jobManagerMetricGroup,
-			metricQueryServicePath,
-			archivedExecutionGraphStore,
-			Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE,
-			fatalErrorHandler,
-			restAddress,
-			historyServerArchivist);
-	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java
index edff87b..e92248c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java
@@ -19,24 +19,12 @@
 package org.apache.flink.runtime.entrypoint;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
-import org.apache.flink.runtime.heartbeat.HeartbeatServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.resourcemanager.ResourceManager;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
-import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.SignalHandler;
 
-import javax.annotation.Nullable;
-
 /**
  * Entry point for the standalone session cluster.
  */
@@ -47,33 +35,8 @@ public class StandaloneSessionClusterEntrypoint extends SessionClusterEntrypoint
 	}
 
 	@Override
-	protected ResourceManager<?> createResourceManager(
-			Configuration configuration,
-			ResourceID resourceId,
-			RpcService rpcService,
-			HighAvailabilityServices highAvailabilityServices,
-			HeartbeatServices heartbeatServices,
-			MetricRegistry metricRegistry,
-			FatalErrorHandler fatalErrorHandler,
-			ClusterInformation clusterInformation,
-			@Nullable String webInterfaceUrl) throws Exception {
-		final ResourceManagerRuntimeServicesConfiguration resourceManagerRuntimeServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
-		final ResourceManagerRuntimeServices resourceManagerRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
-			resourceManagerRuntimeServicesConfiguration,
-			highAvailabilityServices,
-			rpcService.getScheduledExecutor());
-
-		return new StandaloneResourceManager(
-			rpcService,
-			FlinkResourceManager.RESOURCE_MANAGER_NAME,
-			resourceId,
-			highAvailabilityServices,
-			heartbeatServices,
-			resourceManagerRuntimeServices.getSlotManager(),
-			metricRegistry,
-			resourceManagerRuntimeServices.getJobLeaderIdService(),
-			clusterInformation,
-			fatalErrorHandler);
+	protected ClusterComponent<?> createDispatcherComponent(Configuration configuration) {
+		return new SessionClusterComponent(StandaloneResourceManagerFactory.INSTANCE);
 	}
 
 	public static void main(String[] args) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerFactory.java
new file mode 100644
index 0000000..91a7b26
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+
+import javax.annotation.Nullable;
+
+/**
+ * {@link ResourceManager} factory.
+ *
+ * @param <T> type of the workers of the ResourceManager
+ */
+public interface ResourceManagerFactory<T extends ResourceIDRetrievable> {
+
+	ResourceManager<T> createResourceManager(
+		Configuration configuration,
+		ResourceID resourceId,
+		RpcService rpcService,
+		HighAvailabilityServices highAvailabilityServices,
+		HeartbeatServices heartbeatServices,
+		MetricRegistry metricRegistry,
+		FatalErrorHandler fatalErrorHandler,
+		ClusterInformation clusterInformation,
+		@Nullable String webInterfaceUrl) throws Exception;
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactory.java
similarity index 56%
copy from flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java
copy to flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactory.java
index edff87b..c8e314f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactory.java
@@ -16,38 +16,28 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.entrypoint;
+package org.apache.flink.runtime.resourcemanager;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.resourcemanager.ResourceManager;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
-import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.util.EnvironmentInformation;
-import org.apache.flink.runtime.util.JvmShutdownSafeguard;
-import org.apache.flink.runtime.util.SignalHandler;
 
 import javax.annotation.Nullable;
 
 /**
- * Entry point for the standalone session cluster.
+ * {@link ResourceManagerFactory} which creates a {@link StandaloneResourceManager}.
  */
-public class StandaloneSessionClusterEntrypoint extends SessionClusterEntrypoint {
-
-	public StandaloneSessionClusterEntrypoint(Configuration configuration) {
-		super(configuration);
-	}
+public enum StandaloneResourceManagerFactory implements ResourceManagerFactory<ResourceID> {
+	INSTANCE;
 
 	@Override
-	protected ResourceManager<?> createResourceManager(
+	public ResourceManager<ResourceID> createResourceManager(
 			Configuration configuration,
 			ResourceID resourceId,
 			RpcService rpcService,
@@ -75,28 +65,4 @@ public class StandaloneSessionClusterEntrypoint extends SessionClusterEntrypoint
 			clusterInformation,
 			fatalErrorHandler);
 	}
-
-	public static void main(String[] args) {
-		// startup checks and logging
-		EnvironmentInformation.logEnvironmentInfo(LOG, StandaloneSessionClusterEntrypoint.class.getSimpleName(), args);
-		SignalHandler.register(LOG);
-		JvmShutdownSafeguard.installAsShutdownHook(LOG);
-
-		EntrypointClusterConfiguration entrypointClusterConfiguration = null;
-		final CommandLineParser<EntrypointClusterConfiguration> commandLineParser = new CommandLineParser<>(new EntrypointClusterConfigurationParserFactory());
-
-		try {
-			entrypointClusterConfiguration = commandLineParser.parse(args);
-		} catch (FlinkParseException e) {
-			LOG.error("Could not parse command line arguments {}.", args, e);
-			commandLineParser.printHelp(StandaloneSessionClusterEntrypoint.class.getSimpleName());
-			System.exit(1);
-		}
-
-		Configuration configuration = loadConfiguration(entrypointClusterConfiguration);
-
-		StandaloneSessionClusterEntrypoint entrypoint = new StandaloneSessionClusterEntrypoint(configuration);
-
-		entrypoint.startCluster();
-	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/JobRestEndpointFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/JobRestEndpointFactory.java
new file mode 100644
index 0000000..da4b063
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/JobRestEndpointFactory.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.TransientBlobService;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
+import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
+
+import java.util.concurrent.Executor;
+
+/**
+ * {@link RestEndpointFactory} which creates a {@link MiniDispatcherRestEndpoint}.
+ */
+public enum JobRestEndpointFactory implements RestEndpointFactory<RestfulGateway> {
+	INSTANCE;
+
+	@Override
+	public WebMonitorEndpoint<RestfulGateway> createRestEndpoint(
+			Configuration configuration,
+			LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever,
+			LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
+			TransientBlobService transientBlobService,
+			Executor executor,
+			MetricQueryServiceRetriever metricQueryServiceRetriever,
+			LeaderElectionService leaderElectionService,
+			FatalErrorHandler fatalErrorHandler) throws Exception {
+		final RestHandlerConfiguration restHandlerConfiguration = RestHandlerConfiguration.fromConfiguration(configuration);
+
+		return new MiniDispatcherRestEndpoint(
+			RestServerEndpointConfiguration.fromConfiguration(configuration),
+			dispatcherGatewayRetriever,
+			configuration,
+			restHandlerConfiguration,
+			resourceManagerGatewayRetriever,
+			transientBlobService,
+			executor,
+			metricQueryServiceRetriever,
+			leaderElectionService,
+			fatalErrorHandler);
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java
new file mode 100644
index 0000000..ffdc0cb
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.TransientBlobService;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
+import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
+
+import java.util.concurrent.Executor;
+
+/**
+ * {@link WebMonitorEndpoint} factory.
+ *
+ * @param <T> type of the {@link RestfulGateway}
+ */
+public interface RestEndpointFactory<T extends RestfulGateway> {
+
+	WebMonitorEndpoint<T> createRestEndpoint(
+		Configuration configuration,
+		LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever,
+		LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
+		TransientBlobService transientBlobService,
+		Executor executor,
+		MetricQueryServiceRetriever metricQueryServiceRetriever,
+		LeaderElectionService leaderElectionService,
+		FatalErrorHandler fatalErrorHandler) throws Exception;
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/SessionRestEndpointFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/SessionRestEndpointFactory.java
new file mode 100644
index 0000000..359efbf
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/SessionRestEndpointFactory.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.TransientBlobService;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
+import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
+
+import java.util.concurrent.Executor;
+
+/**
+ * {@link RestEndpointFactory} which creates a {@link DispatcherRestEndpoint}.
+ */
+public enum SessionRestEndpointFactory implements RestEndpointFactory<DispatcherGateway> {
+	INSTANCE;
+
+	@Override
+	public WebMonitorEndpoint<DispatcherGateway> createRestEndpoint(
+			Configuration configuration,
+			LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever,
+			LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
+			TransientBlobService transientBlobService,
+			Executor executor,
+			MetricQueryServiceRetriever metricQueryServiceRetriever,
+			LeaderElectionService leaderElectionService,
+			FatalErrorHandler fatalErrorHandler) throws Exception {
+		final RestHandlerConfiguration restHandlerConfiguration = RestHandlerConfiguration.fromConfiguration(configuration);
+
+		return new DispatcherRestEndpoint(
+			RestServerEndpointConfiguration.fromConfiguration(configuration),
+			dispatcherGatewayRetriever,
+			configuration,
+			restHandlerConfiguration,
+			resourceManagerGatewayRetriever,
+			transientBlobService,
+			executor,
+			metricQueryServiceRetriever,
+			leaderElectionService,
+			fatalErrorHandler);
+	}
+}
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
index 482e2b7..a52975a 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
@@ -19,48 +19,27 @@
 package org.apache.flink.yarn.entrypoint;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.entrypoint.ClusterComponent;
+import org.apache.flink.runtime.entrypoint.FileJobGraphRetriever;
+import org.apache.flink.runtime.entrypoint.JobClusterComponent;
 import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint;
-import org.apache.flink.runtime.heartbeat.HeartbeatServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.resourcemanager.ResourceManager;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.SignalHandler;
-import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
-import org.apache.flink.yarn.YarnResourceManager;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 
-import javax.annotation.Nullable;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.io.ObjectInputStream;
 import java.util.Map;
-import java.util.concurrent.CompletableFuture;
 
 /**
  * Entry point for Yarn per-job clusters.
  */
 public class YarnJobClusterEntrypoint extends JobClusterEntrypoint {
 
-	/** The job graph file path. */
-	public static final String JOB_GRAPH_FILE_PATH = "flink.jobgraph.path";
-
 	private final String workingDirectory;
 
 	public YarnJobClusterEntrypoint(
@@ -82,58 +61,10 @@ public class YarnJobClusterEntrypoint extends JobClusterEntrypoint {
 	}
 
 	@Override
-	protected ResourceManager<?> createResourceManager(
-			Configuration configuration,
-			ResourceID resourceId,
-			RpcService rpcService,
-			HighAvailabilityServices highAvailabilityServices,
-			HeartbeatServices heartbeatServices,
-			MetricRegistry metricRegistry,
-			FatalErrorHandler fatalErrorHandler,
-			ClusterInformation clusterInformation,
-			@Nullable String webInterfaceUrl) throws Exception {
-		final ResourceManagerRuntimeServicesConfiguration rmServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
-		final ResourceManagerRuntimeServices rmRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
-			rmServicesConfiguration,
-			highAvailabilityServices,
-			rpcService.getScheduledExecutor());
-
-		return new YarnResourceManager(
-			rpcService,
-			ResourceManager.RESOURCE_MANAGER_NAME,
-			resourceId,
-			configuration,
-			System.getenv(),
-			highAvailabilityServices,
-			heartbeatServices,
-			rmRuntimeServices.getSlotManager(),
-			metricRegistry,
-			rmRuntimeServices.getJobLeaderIdService(),
-			clusterInformation,
-			fatalErrorHandler,
-			webInterfaceUrl);
-	}
-
-	@Override
-	protected JobGraph retrieveJobGraph(Configuration configuration) throws FlinkException {
-		String jobGraphFile = configuration.getString(JOB_GRAPH_FILE_PATH, "job.graph");
-		File fp = new File(jobGraphFile);
-
-		try (FileInputStream input = new FileInputStream(fp);
-			ObjectInputStream obInput = new ObjectInputStream(input)) {
-
-			return (JobGraph) obInput.readObject();
-		} catch (FileNotFoundException e) {
-			throw new FlinkException("Could not find the JobGraph file.", e);
-		} catch (ClassNotFoundException | IOException e) {
-			throw new FlinkException("Could not load the JobGraph from file.", e);
-		}
-	}
-
-	@Override
-	protected void registerShutdownActions(CompletableFuture<ApplicationStatus> terminationFuture) {
-		terminationFuture.thenAccept((status) ->
-			shutDownAndTerminate(status.processExitCode(), status, null, true));
+	protected ClusterComponent<?> createDispatcherComponent(Configuration configuration) {
+		return new JobClusterComponent(
+			YarnResourceManagerFactory.INSTANCE,
+			FileJobGraphRetriever.createFrom(configuration));
 	}
 
 	// ------------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnResourceManagerFactory.java
similarity index 54%
copy from flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
copy to flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnResourceManagerFactory.java
index 6c92861..bfd1b4a 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnResourceManagerFactory.java
@@ -21,56 +21,28 @@ package org.apache.flink.yarn.entrypoint;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.entrypoint.ClusterInformation;
-import org.apache.flink.runtime.entrypoint.SessionClusterEntrypoint;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.security.SecurityContext;
-import org.apache.flink.runtime.util.EnvironmentInformation;
-import org.apache.flink.runtime.util.JvmShutdownSafeguard;
-import org.apache.flink.runtime.util.SignalHandler;
-import org.apache.flink.util.Preconditions;
 import org.apache.flink.yarn.YarnResourceManager;
-import org.apache.flink.yarn.configuration.YarnConfigOptions;
-
-import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.flink.yarn.YarnWorkerNode;
 
 import javax.annotation.Nullable;
 
-import java.io.IOException;
-import java.util.Map;
-
 /**
- * Entry point for Yarn session clusters.
+ * {@link ResourceManagerFactory} implementation which creates a {@link YarnResourceManager}.
  */
-public class YarnSessionClusterEntrypoint extends SessionClusterEntrypoint {
-
-	private final String workingDirectory;
-
-	public YarnSessionClusterEntrypoint(
-			Configuration configuration,
-			String workingDirectory) {
-		super(configuration);
-		this.workingDirectory = Preconditions.checkNotNull(workingDirectory);
-	}
+public enum YarnResourceManagerFactory implements ResourceManagerFactory<YarnWorkerNode> {
+	INSTANCE;
 
 	@Override
-	protected SecurityContext installSecurityContext(Configuration configuration) throws Exception {
-		return YarnEntrypointUtils.installSecurityContext(configuration, workingDirectory);
-	}
-
-	@Override
-	protected String getRPCPortRange(Configuration configuration) {
-		return configuration.getString(YarnConfigOptions.APPLICATION_MASTER_PORT);
-	}
-
-	@Override
-	protected ResourceManager<?> createResourceManager(
+	public ResourceManager<YarnWorkerNode> createResourceManager(
 			Configuration configuration,
 			ResourceID resourceId,
 			RpcService rpcService,
@@ -101,33 +73,4 @@ public class YarnSessionClusterEntrypoint extends SessionClusterEntrypoint {
 			fatalErrorHandler,
 			webInterfaceUrl);
 	}
-
-	public static void main(String[] args) {
-		// startup checks and logging
-		EnvironmentInformation.logEnvironmentInfo(LOG, YarnSessionClusterEntrypoint.class.getSimpleName(), args);
-		SignalHandler.register(LOG);
-		JvmShutdownSafeguard.installAsShutdownHook(LOG);
-
-		Map<String, String> env = System.getenv();
-
-		final String workingDirectory = env.get(ApplicationConstants.Environment.PWD.key());
-		Preconditions.checkArgument(
-			workingDirectory != null,
-			"Working directory variable (%s) not set",
-			ApplicationConstants.Environment.PWD.key());
-
-		try {
-			YarnEntrypointUtils.logYarnEnvironmentInformation(env, LOG);
-		} catch (IOException e) {
-			LOG.warn("Could not log YARN environment information.", e);
-		}
-
-		Configuration configuration = YarnEntrypointUtils.loadConfiguration(workingDirectory, env, LOG);
-
-		YarnSessionClusterEntrypoint yarnSessionClusterEntrypoint = new YarnSessionClusterEntrypoint(
-			configuration,
-			workingDirectory);
-
-		yarnSessionClusterEntrypoint.startCluster();
-	}
 }
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
index 6c92861..116e2ff 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
@@ -19,29 +19,18 @@
 package org.apache.flink.yarn.entrypoint;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.entrypoint.ClusterComponent;
 import org.apache.flink.runtime.entrypoint.SessionClusterEntrypoint;
-import org.apache.flink.runtime.heartbeat.HeartbeatServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.resourcemanager.ResourceManager;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.entrypoint.SessionClusterComponent;
 import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.SignalHandler;
 import org.apache.flink.util.Preconditions;
-import org.apache.flink.yarn.YarnResourceManager;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 
-import javax.annotation.Nullable;
-
 import java.io.IOException;
 import java.util.Map;
 
@@ -70,36 +59,8 @@ public class YarnSessionClusterEntrypoint extends SessionClusterEntrypoint {
 	}
 
 	@Override
-	protected ResourceManager<?> createResourceManager(
-			Configuration configuration,
-			ResourceID resourceId,
-			RpcService rpcService,
-			HighAvailabilityServices highAvailabilityServices,
-			HeartbeatServices heartbeatServices,
-			MetricRegistry metricRegistry,
-			FatalErrorHandler fatalErrorHandler,
-			ClusterInformation clusterInformation,
-			@Nullable String webInterfaceUrl) throws Exception {
-		final ResourceManagerRuntimeServicesConfiguration rmServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
-		final ResourceManagerRuntimeServices rmRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
-			rmServicesConfiguration,
-			highAvailabilityServices,
-			rpcService.getScheduledExecutor());
-
-		return new YarnResourceManager(
-			rpcService,
-			ResourceManager.RESOURCE_MANAGER_NAME,
-			resourceId,
-			configuration,
-			System.getenv(),
-			highAvailabilityServices,
-			heartbeatServices,
-			rmRuntimeServices.getSlotManager(),
-			metricRegistry,
-			rmRuntimeServices.getJobLeaderIdService(),
-			clusterInformation,
-			fatalErrorHandler,
-			webInterfaceUrl);
+	protected ClusterComponent<?> createDispatcherComponent(Configuration configuration) {
+		return new SessionClusterComponent(YarnResourceManagerFactory.INSTANCE);
 	}
 
 	public static void main(String[] args) {