You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/28 09:43:10 UTC

[flink] branch master updated (e663990 -> 9f5fd07)

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from e663990  [hotfix] Start MesosWorkers with default ContaineredTaskManagerConfiguration
     new 1fcbc9f  [FLINK-10411] Make ClusterEntrypoint more compositional
     new b9ebf42  [FLINK-10411] Move System.exit out of ClusterEntrypoint
     new 54c1b19  [FLINK-10411] Rename ClusterComponent into DispatcherResourceManagerComponent
     new 9f5fd07  [FLINK-10411] Introduce DispatcherResourceManagerComponentFactory

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../entrypoint/ClassPathJobGraphRetriever.java     |  80 +++++
 .../entrypoint/StandaloneJobClusterEntryPoint.java |  87 +----
 ...st.java => ClassPathJobGraphRetrieverTest.java} |  18 +-
 .../apache/flink/container/entrypoint/TestJob.java |   2 +-
 .../entrypoint/MesosJobClusterEntrypoint.java      |  92 +----
 .../entrypoint/MesosSessionClusterEntrypoint.java  |  65 +---
 .../MesosResourceManagerFactory.java               |  90 +++++
 .../runtime/dispatcher/DispatcherFactory.java      |  53 +++
 .../runtime/dispatcher/JobDispatcherFactory.java   |  86 +++++
 .../dispatcher/SessionDispatcherFactory.java       |  69 ++++
 .../runtime/entrypoint/ClusterEntrypoint.java      | 384 +++++----------------
 .../entrypoint/ClusterEntrypointException.java     |  40 +++
 .../runtime/entrypoint/JobClusterEntrypoint.java   |  98 ------
 .../entrypoint/SessionClusterEntrypoint.java       |  79 -----
 .../StandaloneSessionClusterEntrypoint.java        |  47 +--
 ...tDispatcherResourceManagerComponentFactory.java | 254 ++++++++++++++
 .../DispatcherResourceManagerComponent.java        | 180 ++++++++++
 .../DispatcherResourceManagerComponentFactory.java |  45 +++
 .../component/FileJobGraphRetriever.java           |  70 ++++
 .../JobDispatcherResourceManagerComponent.java     |  55 +++
 ...bDispatcherResourceManagerComponentFactory.java |  58 ++++
 .../entrypoint/component/JobGraphRetriever.java    |  38 ++
 .../SessionDispatcherResourceManagerComponent.java |  40 +++
 ...nDispatcherResourceManagerComponentFactory.java |  58 ++++
 .../resourcemanager/ResourceManagerFactory.java    |  50 +++
 .../StandaloneResourceManagerFactory.java}         |  46 +--
 .../flink/runtime/rest/JobRestEndpointFactory.java |  66 ++++
 .../flink/runtime/rest/RestEndpointFactory.java    |  50 +++
 .../runtime/rest/SessionRestEndpointFactory.java   |  65 ++++
 .../yarn/entrypoint/YarnJobClusterEntrypoint.java  |  86 +----
 ...ypoint.java => YarnResourceManagerFactory.java} |  69 +---
 .../entrypoint/YarnSessionClusterEntrypoint.java   |  50 +--
 32 files changed, 1621 insertions(+), 949 deletions(-)
 create mode 100644 flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java
 rename flink-container/src/test/java/org/apache/flink/container/entrypoint/{StandaloneJobClusterEntryPointTest.java => ClassPathJobGraphRetrieverTest.java} (81%)
 create mode 100644 flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerFactory.java
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherFactory.java
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobDispatcherFactory.java
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SessionDispatcherFactory.java
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointException.java
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponentFactory.java
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/FileJobGraphRetriever.java
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/JobDispatcherResourceManagerComponent.java
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/JobDispatcherResourceManagerComponentFactory.java
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/JobGraphRetriever.java
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/SessionDispatcherResourceManagerComponent.java
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/SessionDispatcherResourceManagerComponentFactory.java
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerFactory.java
 copy flink-runtime/src/main/java/org/apache/flink/runtime/{entrypoint/StandaloneSessionClusterEntrypoint.java => resourcemanager/StandaloneResourceManagerFactory.java} (56%)
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rest/JobRestEndpointFactory.java
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rest/SessionRestEndpointFactory.java
 copy flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/{YarnSessionClusterEntrypoint.java => YarnResourceManagerFactory.java} (54%)


[flink] 04/04: [FLINK-10411] Introduce DispatcherResourceManagerComponentFactory

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9f5fd073bd128006571c9e788c87938125fef52d
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Thu Sep 27 14:17:21 2018 +0200

    [FLINK-10411] Introduce DispatcherResourceManagerComponentFactory
    
    This commit introduces the DispatcherResourceManagerComponentFactory which is used
    to create a DispatcherResourceManagerComponent. That way, it is possible to eagerly
    initialize all fields of the DispatcherResourceManagerComponent making it possible
    to make all fields final and remove the lock.
    
    This closes #6743.
---
 .../entrypoint/ClassPathJobGraphRetriever.java     |   2 +-
 .../entrypoint/StandaloneJobClusterEntryPoint.java |   8 +-
 .../entrypoint/MesosJobClusterEntrypoint.java      |  10 +-
 .../entrypoint/MesosSessionClusterEntrypoint.java  |  17 +-
 .../runtime/dispatcher/JobDispatcherFactory.java   |   2 +-
 .../runtime/entrypoint/ClusterEntrypoint.java      |  10 +-
 .../SessionDispatcherResourceManagerComponent.java |  34 ---
 .../StandaloneSessionClusterEntrypoint.java        |   6 +-
 ...DispatcherResourceManagerComponentFactory.java} | 261 +++++++--------------
 .../DispatcherResourceManagerComponent.java        | 180 ++++++++++++++
 .../DispatcherResourceManagerComponentFactory.java |  45 ++++
 .../{ => component}/FileJobGraphRetriever.java     |   2 +-
 .../JobDispatcherResourceManagerComponent.java     |  28 ++-
 ...bDispatcherResourceManagerComponentFactory.java |  58 +++++
 .../{ => component}/JobGraphRetriever.java         |   2 +-
 .../SessionDispatcherResourceManagerComponent.java |  40 ++++
 ...nDispatcherResourceManagerComponentFactory.java |  58 +++++
 .../yarn/entrypoint/YarnJobClusterEntrypoint.java  |  10 +-
 .../entrypoint/YarnSessionClusterEntrypoint.java   |   8 +-
 19 files changed, 522 insertions(+), 259 deletions(-)

diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java
index 5ded04b..3e0645d 100644
--- a/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java
+++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java
@@ -23,7 +23,7 @@ import org.apache.flink.client.program.PackagedProgramUtils;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.runtime.entrypoint.JobGraphRetriever;
+import org.apache.flink.runtime.entrypoint.component.JobGraphRetriever;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.util.FlinkException;
diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java
index b81d992..6c42bf2 100644
--- a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java
+++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java
@@ -19,11 +19,11 @@
 package org.apache.flink.container.entrypoint;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.entrypoint.ClusterComponent;
 import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
 import org.apache.flink.runtime.entrypoint.FlinkParseException;
-import org.apache.flink.runtime.entrypoint.JobClusterComponent;
 import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint;
+import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
+import org.apache.flink.runtime.entrypoint.component.JobDispatcherResourceManagerComponentFactory;
 import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
@@ -61,8 +61,8 @@ public final class StandaloneJobClusterEntryPoint extends JobClusterEntrypoint {
 	}
 
 	@Override
-	protected ClusterComponent<?> createClusterComponent(Configuration configuration) {
-		return new JobClusterComponent(
+	protected DispatcherResourceManagerComponentFactory<?> createDispatcherResourceManagerComponentFactory(Configuration configuration) {
+		return new JobDispatcherResourceManagerComponentFactory(
 			StandaloneResourceManagerFactory.INSTANCE,
 			new ClassPathJobGraphRetriever(jobClassName, savepointRestoreSettings, programArguments));
 	}
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
index 922d5cd..377b5b5 100755
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
@@ -28,11 +28,11 @@ import org.apache.flink.mesos.util.MesosConfiguration;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.clusterframework.ContainerSpecification;
 import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.entrypoint.ClusterComponent;
 import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
-import org.apache.flink.runtime.entrypoint.FileJobGraphRetriever;
-import org.apache.flink.runtime.entrypoint.JobClusterComponent;
 import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint;
+import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
+import org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever;
+import org.apache.flink.runtime.entrypoint.component.JobDispatcherResourceManagerComponentFactory;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.SignalHandler;
@@ -109,8 +109,8 @@ public class MesosJobClusterEntrypoint extends JobClusterEntrypoint {
 	}
 
 	@Override
-	protected ClusterComponent<?> createClusterComponent(Configuration configuration) {
-		return new JobClusterComponent(
+	protected DispatcherResourceManagerComponentFactory<?> createDispatcherResourceManagerComponentFactory(Configuration configuration) {
+		return new JobDispatcherResourceManagerComponentFactory(
 			new MesosResourceManagerFactory(
 				mesosServices,
 				schedulerConfiguration,
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
index f691940..9879628 100755
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
@@ -28,10 +28,10 @@ import org.apache.flink.mesos.util.MesosConfiguration;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.clusterframework.ContainerSpecification;
 import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.entrypoint.ClusterComponent;
 import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
-import org.apache.flink.runtime.entrypoint.SessionClusterComponent;
 import org.apache.flink.runtime.entrypoint.SessionClusterEntrypoint;
+import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
+import org.apache.flink.runtime.entrypoint.component.SessionDispatcherResourceManagerComponentFactory;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.SignalHandler;
@@ -108,12 +108,13 @@ public class MesosSessionClusterEntrypoint extends SessionClusterEntrypoint {
 	}
 
 	@Override
-	protected ClusterComponent<?> createClusterComponent(Configuration configuration) {
-		return new SessionClusterComponent(new MesosResourceManagerFactory(
-			mesosServices,
-			mesosConfig,
-			taskManagerParameters,
-			taskManagerContainerSpec));
+	protected DispatcherResourceManagerComponentFactory<?> createDispatcherResourceManagerComponentFactory(Configuration configuration) {
+		return new SessionDispatcherResourceManagerComponentFactory(
+			new MesosResourceManagerFactory(
+				mesosServices,
+				mesosConfig,
+				taskManagerParameters,
+				taskManagerContainerSpec));
 	}
 
 	public static void main(String[] args) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobDispatcherFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobDispatcherFactory.java
index 488f2fc..e6b1a26 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobDispatcherFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobDispatcherFactory.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.dispatcher;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
-import org.apache.flink.runtime.entrypoint.JobGraphRetriever;
+import org.apache.flink.runtime.entrypoint.component.JobGraphRetriever;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index 54ccaec..c9a1722 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -38,6 +38,8 @@ import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
 import org.apache.flink.runtime.dispatcher.MiniDispatcher;
+import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent;
+import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
 import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -109,7 +111,7 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 	private final AtomicBoolean isShutDown = new AtomicBoolean(false);
 
 	@GuardedBy("lock")
-	private ClusterComponent<?> clusterComponent;
+	private DispatcherResourceManagerComponent<?> clusterComponent;
 
 	@GuardedBy("lock")
 	private MetricRegistryImpl metricRegistry;
@@ -204,9 +206,9 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 			configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
 			configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());
 
-			clusterComponent = createClusterComponent(configuration);
+			final DispatcherResourceManagerComponentFactory<?> dispatcherResourceManagerComponentFactory = createDispatcherResourceManagerComponentFactory(configuration);
 
-			clusterComponent.startComponent(
+			clusterComponent = dispatcherResourceManagerComponentFactory.create(
 				configuration,
 				commonRpcService,
 				haServices,
@@ -460,7 +462,7 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 	// Abstract methods
 	// --------------------------------------------------
 
-	protected abstract ClusterComponent<?> createClusterComponent(Configuration configuration);
+	protected abstract DispatcherResourceManagerComponentFactory<?> createDispatcherResourceManagerComponentFactory(Configuration configuration);
 
 	protected abstract ArchivedExecutionGraphStore createSerializableExecutionGraphStore(
 		Configuration configuration,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionDispatcherResourceManagerComponent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionDispatcherResourceManagerComponent.java
deleted file mode 100644
index 8ab0701..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionDispatcherResourceManagerComponent.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.entrypoint;
-
-import org.apache.flink.runtime.dispatcher.Dispatcher;
-import org.apache.flink.runtime.dispatcher.SessionDispatcherFactory;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
-import org.apache.flink.runtime.rest.SessionRestEndpointFactory;
-
-/**
- * {@link ClusterComponent} used by session clusters.
- */
-public class SessionClusterComponent extends ClusterComponent<Dispatcher> {
-
-	public SessionClusterComponent(ResourceManagerFactory<?> resourceManagerFactory) {
-		super(SessionDispatcherFactory.INSTANCE, resourceManagerFactory, SessionRestEndpointFactory.INSTANCE);
-	}
-}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java
index 1675235..127fc8b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java
@@ -19,6 +19,8 @@
 package org.apache.flink.runtime.entrypoint;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
+import org.apache.flink.runtime.entrypoint.component.SessionDispatcherResourceManagerComponentFactory;
 import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
 import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
 import org.apache.flink.runtime.util.EnvironmentInformation;
@@ -35,8 +37,8 @@ public class StandaloneSessionClusterEntrypoint extends SessionClusterEntrypoint
 	}
 
 	@Override
-	protected ClusterComponent<?> createClusterComponent(Configuration configuration) {
-		return new SessionClusterComponent(StandaloneResourceManagerFactory.INSTANCE);
+	protected DispatcherResourceManagerComponentFactory<?> createDispatcherResourceManagerComponentFactory(Configuration configuration) {
+		return new SessionDispatcherResourceManagerComponentFactory(StandaloneResourceManagerFactory.INSTANCE);
 	}
 
 	public static void main(String[] args) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/DispatcherResourceManagerComponent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
similarity index 51%
rename from flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/DispatcherResourceManagerComponent.java
rename to flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
index af02729..0a37411 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/DispatcherResourceManagerComponent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
@@ -16,14 +16,13 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.entrypoint;
+package org.apache.flink.runtime.entrypoint.component;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.blob.BlobServer;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
@@ -32,6 +31,7 @@ import org.apache.flink.runtime.dispatcher.DispatcherFactory;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.dispatcher.DispatcherId;
 import org.apache.flink.runtime.dispatcher.HistoryServerArchivist;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -46,102 +46,54 @@ import org.apache.flink.runtime.rest.RestEndpointFactory;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
 import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever;
-import org.apache.flink.util.AutoCloseableAsync;
 import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.FlinkException;
 
 import akka.actor.ActorSystem;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
-import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.Nonnull;
 
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 
 /**
- * Component which starts a {@link Dispatcher}, {@link ResourceManager} and {@link WebMonitorEndpoint}
- * in the same process.
+ * Abstract class which implements the creation of the {@link DispatcherResourceManagerComponent} components.
+ *
+ * @param <T> type of the {@link Dispatcher}
+ * @param <U> type of the {@link RestfulGateway} given to the {@link WebMonitorEndpoint}
  */
-public class ClusterComponent<T extends Dispatcher> implements AutoCloseableAsync {
-
-	private static final Logger LOG = LoggerFactory.getLogger(ClusterComponent.class);
+public abstract class AbstractDispatcherResourceManagerComponentFactory<T extends Dispatcher, U extends RestfulGateway> implements DispatcherResourceManagerComponentFactory<T> {
 
-	private final Object lock = new Object();
+	private final Logger log = LoggerFactory.getLogger(getClass());
 
+	@Nonnull
 	private final DispatcherFactory<T> dispatcherFactory;
 
+	@Nonnull
 	private final ResourceManagerFactory<?> resourceManagerFactory;
 
-	private final RestEndpointFactory<?> restEndpointFactory;
-
-	private final CompletableFuture<Void> terminationFuture;
-
-	private final CompletableFuture<ApplicationStatus> shutDownFuture;
-
-	@GuardedBy("lock")
-	private State state;
-
-	@GuardedBy("lock")
-	private ResourceManager<?> resourceManager;
+	@Nonnull
+	private final RestEndpointFactory<U> restEndpointFactory;
 
-	@GuardedBy("lock")
-	private T dispatcher;
-
-	@GuardedBy("lock")
-	private LeaderRetrievalService dispatcherLeaderRetrievalService;
-
-	@GuardedBy("lock")
-	private LeaderRetrievalService resourceManagerRetrievalService;
-
-	@GuardedBy("lock")
-	private WebMonitorEndpoint<?> webMonitorEndpoint;
-
-	@GuardedBy("lock")
-	private JobManagerMetricGroup jobManagerMetricGroup;
-
-	public ClusterComponent(
-			DispatcherFactory<T> dispatcherFactory,
-			ResourceManagerFactory<?> resourceManagerFactory,
-			RestEndpointFactory<?> restEndpointFactory) {
+	public AbstractDispatcherResourceManagerComponentFactory(
+			@Nonnull DispatcherFactory<T> dispatcherFactory,
+			@Nonnull ResourceManagerFactory<?> resourceManagerFactory,
+			@Nonnull RestEndpointFactory<U> restEndpointFactory) {
 		this.dispatcherFactory = dispatcherFactory;
 		this.resourceManagerFactory = resourceManagerFactory;
 		this.restEndpointFactory = restEndpointFactory;
-		this.terminationFuture = new CompletableFuture<>();
-		this.shutDownFuture = new CompletableFuture<>();
-		this.state = State.CREATED;
-
-		terminationFuture.whenComplete(
-			(aVoid, throwable) -> {
-				if (throwable != null) {
-					shutDownFuture.completeExceptionally(throwable);
-				} else {
-					shutDownFuture.complete(ApplicationStatus.SUCCEEDED);
-				}
-			});
 	}
 
-	public T getDispatcher() {
-		synchronized (lock) {
-			return dispatcher;
-		}
-	}
-
-	public CompletableFuture<Void> getTerminationFuture() {
-		return terminationFuture;
-	}
-
-	public CompletableFuture<ApplicationStatus> getShutDownFuture() {
-		return shutDownFuture;
-	}
-
-	public void startComponent(
+	@Override
+	public DispatcherResourceManagerComponent<T> create(
 			Configuration configuration,
 			RpcService rpcService,
 			HighAvailabilityServices highAvailabilityServices,
@@ -150,22 +102,27 @@ public class ClusterComponent<T extends Dispatcher> implements AutoCloseableAsyn
 			MetricRegistry metricRegistry,
 			ArchivedExecutionGraphStore archivedExecutionGraphStore,
 			FatalErrorHandler fatalErrorHandler) throws Exception {
-		synchronized (lock) {
-			Preconditions.checkState(state == State.CREATED);
-			state = State.RUNNING;
 
+		LeaderRetrievalService dispatcherLeaderRetrievalService = null;
+		LeaderRetrievalService resourceManagerRetrievalService = null;
+		WebMonitorEndpoint<U> webMonitorEndpoint = null;
+		ResourceManager<?> resourceManager = null;
+		JobManagerMetricGroup jobManagerMetricGroup = null;
+		T dispatcher = null;
+
+		try {
 			dispatcherLeaderRetrievalService = highAvailabilityServices.getDispatcherLeaderRetriever();
 
 			resourceManagerRetrievalService = highAvailabilityServices.getResourceManagerLeaderRetriever();
 
-			LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever = new RpcGatewayRetriever<>(
+			final LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever = new RpcGatewayRetriever<>(
 				rpcService,
 				DispatcherGateway.class,
 				DispatcherId::fromUuid,
 				10,
 				Time.milliseconds(50L));
 
-			LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever = new RpcGatewayRetriever<>(
+			final LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever = new RpcGatewayRetriever<>(
 				rpcService,
 				ResourceManagerGateway.class,
 				ResourceManagerId::fromUuid,
@@ -186,7 +143,7 @@ public class ClusterComponent<T extends Dispatcher> implements AutoCloseableAsyn
 				highAvailabilityServices.getWebMonitorLeaderElectionService(),
 				fatalErrorHandler);
 
-			LOG.debug("Starting Dispatcher REST endpoint.");
+			log.debug("Starting Dispatcher REST endpoint.");
 			webMonitorEndpoint.start();
 
 			resourceManager = resourceManagerFactory.createResourceManager(
@@ -221,127 +178,77 @@ public class ClusterComponent<T extends Dispatcher> implements AutoCloseableAsyn
 				webMonitorEndpoint.getRestBaseUrl(),
 				historyServerArchivist);
 
-			registerShutDownFuture(dispatcher, shutDownFuture);
-
-			LOG.debug("Starting ResourceManager.");
+			log.debug("Starting ResourceManager.");
 			resourceManager.start();
 			resourceManagerRetrievalService.start(resourceManagerGatewayRetriever);
 
-			LOG.debug("Starting Dispatcher.");
+			log.debug("Starting Dispatcher.");
 			dispatcher.start();
 			dispatcherLeaderRetrievalService.start(dispatcherGatewayRetriever);
-		}
-	}
-
-	protected void registerShutDownFuture(T dispatcher, CompletableFuture<ApplicationStatus> shutDownFuture) {
-			dispatcher
-				.getTerminationFuture()
-				.whenComplete(
-					(aVoid, throwable) -> {
-						if (throwable != null) {
-							shutDownFuture.completeExceptionally(throwable);
-						} else {
-							shutDownFuture.complete(ApplicationStatus.SUCCEEDED);
-						}
-					});
-	}
-
-	@Override
-	public CompletableFuture<Void> closeAsync() {
-		synchronized (lock) {
-			if (state == State.RUNNING) {
-				Exception exception = null;
-
-				final Collection<CompletableFuture<Void>> terminationFutures = new ArrayList<>(4);
-
-				if (dispatcherLeaderRetrievalService != null) {
-					try {
-						dispatcherLeaderRetrievalService.stop();
-					} catch (Exception e) {
-						exception = ExceptionUtils.firstOrSuppressed(e, exception);
-					}
-				}
 
-				if (resourceManagerRetrievalService != null) {
-					try {
-						resourceManagerRetrievalService.stop();
-					} catch (Exception e) {
-						exception = ExceptionUtils.firstOrSuppressed(e, exception);
-					}
-				}
-
-				if (webMonitorEndpoint != null) {
-					terminationFutures.add(webMonitorEndpoint.closeAsync());
+			return createDispatcherResourceManagerComponent(
+				dispatcher,
+				resourceManager,
+				dispatcherLeaderRetrievalService,
+				resourceManagerRetrievalService,
+				webMonitorEndpoint,
+				jobManagerMetricGroup);
+
+		} catch (Exception exception) {
+			// clean up all started components
+			if (dispatcherLeaderRetrievalService != null) {
+				try {
+					dispatcherLeaderRetrievalService.stop();
+				} catch (Exception e) {
+					exception = ExceptionUtils.firstOrSuppressed(e, exception);
 				}
+			}
 
-				if (dispatcher != null) {
-					dispatcher.shutDown();
-					terminationFutures.add(dispatcher.getTerminationFuture());
+			if (resourceManagerRetrievalService != null) {
+				try {
+					resourceManagerRetrievalService.stop();
+				} catch (Exception e) {
+					exception = ExceptionUtils.firstOrSuppressed(e, exception);
 				}
+			}
 
-				if (resourceManager != null) {
-					resourceManager.shutDown();
-					terminationFutures.add(resourceManager.getTerminationFuture());
-				}
+			final Collection<CompletableFuture<Void>> terminationFutures = new ArrayList<>(3);
 
-				if (exception != null) {
-					terminationFutures.add(FutureUtils.completedExceptionally(exception));
-				}
+			if (webMonitorEndpoint != null) {
+				terminationFutures.add(webMonitorEndpoint.closeAsync());
+			}
 
-				final CompletableFuture<Void> componentTerminationFuture = FutureUtils.completeAll(terminationFutures);
+			if (resourceManager != null) {
+				resourceManager.shutDown();
+				terminationFutures.add(resourceManager.getTerminationFuture());
+			}
 
-				final CompletableFuture<Void> metricGroupTerminationFuture;
+			if (dispatcher != null) {
+				dispatcher.shutDown();
+				terminationFutures.add(dispatcher.getTerminationFuture());
+			}
 
-				if (jobManagerMetricGroup != null) {
-					metricGroupTerminationFuture = FutureUtils.runAfterwards(
-						componentTerminationFuture,
-						() -> {
-							synchronized (lock) {
-								jobManagerMetricGroup.close();
-							}
-						});
-				} else {
-					metricGroupTerminationFuture = componentTerminationFuture;
-				}
+			final FutureUtils.ConjunctFuture<Void> terminationFuture = FutureUtils.completeAll(terminationFutures);
 
-				metricGroupTerminationFuture.whenComplete((aVoid, throwable) -> {
-					if (throwable != null) {
-						terminationFuture.completeExceptionally(throwable);
-					} else {
-						terminationFuture.complete(aVoid);
-					}
-				});
-			} else if (state == State.CREATED) {
-				terminationFuture.complete(null);
+			try {
+				terminationFuture.get();
+			} catch (Exception e) {
+				exception = ExceptionUtils.firstOrSuppressed(e, exception);
 			}
 
-			state = State.TERMINATED;
-			return terminationFuture;
-		}
-	}
-
-	/**
-	 * Deregister the Flink application from the resource management system by signalling
-	 * the {@link ResourceManager}.
-	 *
-	 * @param applicationStatus to terminate the application with
-	 * @param diagnostics additional information about the shut down, can be {@code null}
-	 * @return Future which is completed once the shut down
-	 */
-	public CompletableFuture<Void> deregisterApplication(ApplicationStatus applicationStatus, @Nullable String diagnostics) {
-		synchronized (lock) {
-			if (resourceManager != null) {
-				final ResourceManagerGateway selfGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class);
-				return selfGateway.deregisterApplication(applicationStatus, diagnostics).thenApply(ack -> null);
-			} else {
-				return CompletableFuture.completedFuture(null);
+			if (jobManagerMetricGroup != null) {
+				jobManagerMetricGroup.close();
 			}
+
+			throw new FlinkException("Could not create the DispatcherResourceManagerComponent.", exception);
 		}
 	}
 
-	enum State {
-		CREATED,
-		RUNNING,
-		TERMINATED
-	}
+	protected abstract DispatcherResourceManagerComponent<T> createDispatcherResourceManagerComponent(
+		T dispatcher,
+		ResourceManager<?> resourceManager,
+		LeaderRetrievalService dispatcherLeaderRetrievalService,
+		LeaderRetrievalService resourceManagerRetrievalService,
+		WebMonitorEndpoint<?> webMonitorEndpoint,
+		JobManagerMetricGroup jobManagerMetricGroup);
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java
new file mode 100644
index 0000000..94925b2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint.component;
+
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
+import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.ExceptionUtils;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Component which starts a {@link Dispatcher}, {@link ResourceManager} and {@link WebMonitorEndpoint}
+ * in the same process.
+ */
+public class DispatcherResourceManagerComponent<T extends Dispatcher> implements AutoCloseableAsync {
+
+	@Nonnull
+	private final T dispatcher;
+
+	@Nonnull
+	private final ResourceManager<?> resourceManager;
+
+	@Nonnull
+	private final LeaderRetrievalService dispatcherLeaderRetrievalService;
+
+	@Nonnull
+	private final LeaderRetrievalService resourceManagerRetrievalService;
+
+	@Nonnull
+	private final WebMonitorEndpoint<?> webMonitorEndpoint;
+
+	@Nonnull
+	private final JobManagerMetricGroup jobManagerMetricGroup;
+
+	private final CompletableFuture<Void> terminationFuture;
+
+	private final CompletableFuture<ApplicationStatus> shutDownFuture;
+
+	private final AtomicBoolean isRunning = new AtomicBoolean(true);
+
+	DispatcherResourceManagerComponent(
+			@Nonnull T dispatcher,
+			@Nonnull ResourceManager<?> resourceManager,
+			@Nonnull LeaderRetrievalService dispatcherLeaderRetrievalService,
+			@Nonnull LeaderRetrievalService resourceManagerRetrievalService,
+			@Nonnull WebMonitorEndpoint<?> webMonitorEndpoint,
+			@Nonnull JobManagerMetricGroup jobManagerMetricGroup) {
+		this.resourceManager = resourceManager;
+		this.dispatcher = dispatcher;
+		this.dispatcherLeaderRetrievalService = dispatcherLeaderRetrievalService;
+		this.resourceManagerRetrievalService = resourceManagerRetrievalService;
+		this.webMonitorEndpoint = webMonitorEndpoint;
+		this.jobManagerMetricGroup = jobManagerMetricGroup;
+		this.terminationFuture = new CompletableFuture<>();
+		this.shutDownFuture = new CompletableFuture<>();
+
+		registerShutDownFuture();
+	}
+
+	private void registerShutDownFuture() {
+		terminationFuture.whenComplete(
+			(aVoid, throwable) -> {
+				if (throwable != null) {
+					shutDownFuture.completeExceptionally(throwable);
+				} else {
+					shutDownFuture.complete(ApplicationStatus.SUCCEEDED);
+				}
+			});
+
+		dispatcher
+			.getTerminationFuture()
+			.whenComplete(
+				(aVoid, throwable) -> {
+					if (throwable != null) {
+						shutDownFuture.completeExceptionally(throwable);
+					} else {
+						shutDownFuture.complete(ApplicationStatus.SUCCEEDED);
+					}
+				});
+	}
+
+	public CompletableFuture<Void> getTerminationFuture() {
+		return terminationFuture;
+	}
+
+	public final CompletableFuture<ApplicationStatus> getShutDownFuture() {
+		return shutDownFuture;
+	}
+
+	@Override
+	public CompletableFuture<Void> closeAsync() {
+		if (isRunning.compareAndSet(true, false)) {
+			Exception exception = null;
+
+			final Collection<CompletableFuture<Void>> terminationFutures = new ArrayList<>(4);
+
+			try {
+				dispatcherLeaderRetrievalService.stop();
+			} catch (Exception e) {
+				exception = ExceptionUtils.firstOrSuppressed(e, exception);
+			}
+
+			try {
+				resourceManagerRetrievalService.stop();
+			} catch (Exception e) {
+				exception = ExceptionUtils.firstOrSuppressed(e, exception);
+			}
+
+			terminationFutures.add(webMonitorEndpoint.closeAsync());
+
+			dispatcher.shutDown();
+			terminationFutures.add(dispatcher.getTerminationFuture());
+
+			resourceManager.shutDown();
+			terminationFutures.add(resourceManager.getTerminationFuture());
+
+			if (exception != null) {
+				terminationFutures.add(FutureUtils.completedExceptionally(exception));
+			}
+
+			final CompletableFuture<Void> componentTerminationFuture = FutureUtils.completeAll(terminationFutures);
+
+			final CompletableFuture<Void> metricGroupTerminationFuture = FutureUtils.runAfterwards(
+				componentTerminationFuture,
+				jobManagerMetricGroup::close);
+
+			metricGroupTerminationFuture.whenComplete((aVoid, throwable) -> {
+				if (throwable != null) {
+					terminationFuture.completeExceptionally(throwable);
+				} else {
+					terminationFuture.complete(aVoid);
+				}
+			});
+		}
+
+		return terminationFuture;
+	}
+
+	/**
+	 * Deregister the Flink application from the resource management system by signalling
+	 * the {@link ResourceManager}.
+	 *
+	 * @param applicationStatus to terminate the application with
+	 * @param diagnostics additional information about the shut down, can be {@code null}
+	 * @return Future which is completed once the shut down
+	 */
+	public CompletableFuture<Void> deregisterApplication(ApplicationStatus applicationStatus, @Nullable String diagnostics) {
+		final ResourceManagerGateway selfGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class);
+		return selfGateway.deregisterApplication(applicationStatus, diagnostics).thenApply(ack -> null);
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponentFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponentFactory.java
new file mode 100644
index 0000000..df22a59
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponentFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint.component;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+
+/**
+ * Factory for the {@link DispatcherResourceManagerComponent}.
+ */
+public interface DispatcherResourceManagerComponentFactory<T extends Dispatcher> {
+
+	DispatcherResourceManagerComponent<T> create(
+		Configuration configuration,
+		RpcService rpcService,
+		HighAvailabilityServices highAvailabilityServices,
+		BlobServer blobServer,
+		HeartbeatServices heartbeatServices,
+		MetricRegistry metricRegistry,
+		ArchivedExecutionGraphStore archivedExecutionGraphStore,
+		FatalErrorHandler fatalErrorHandler) throws Exception;
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/FileJobGraphRetriever.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/FileJobGraphRetriever.java
similarity index 97%
rename from flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/FileJobGraphRetriever.java
rename to flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/FileJobGraphRetriever.java
index 7a194f6..1848408 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/FileJobGraphRetriever.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/FileJobGraphRetriever.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.entrypoint;
+package org.apache.flink.runtime.entrypoint.component;
 
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobDispatcherResourceManagerComponent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/JobDispatcherResourceManagerComponent.java
similarity index 54%
rename from flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobDispatcherResourceManagerComponent.java
rename to flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/JobDispatcherResourceManagerComponent.java
index 17583ac..c1df47f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobDispatcherResourceManagerComponent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/JobDispatcherResourceManagerComponent.java
@@ -16,29 +16,33 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.entrypoint;
+package org.apache.flink.runtime.entrypoint.component;
 
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
-import org.apache.flink.runtime.dispatcher.JobDispatcherFactory;
 import org.apache.flink.runtime.dispatcher.MiniDispatcher;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
-import org.apache.flink.runtime.rest.JobRestEndpointFactory;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
 
 import java.util.concurrent.CompletableFuture;
 
 /**
- * {@link ClusterComponent} for a job cluster. The dispatcher component starts
+ * {@link DispatcherResourceManagerComponent} for a job cluster. The dispatcher component starts
  * a {@link MiniDispatcher}.
  */
-public class JobClusterComponent extends ClusterComponent<MiniDispatcher> {
+class JobDispatcherResourceManagerComponent extends DispatcherResourceManagerComponent<MiniDispatcher> {
 
-	public JobClusterComponent(ResourceManagerFactory<?> resourceManagerFactory, JobGraphRetriever jobActions) {
-		super(new JobDispatcherFactory(jobActions), resourceManagerFactory, JobRestEndpointFactory.INSTANCE);
-	}
+	JobDispatcherResourceManagerComponent(
+			MiniDispatcher dispatcher,
+			ResourceManager<?> resourceManager,
+			LeaderRetrievalService dispatcherLeaderRetrievalService,
+			LeaderRetrievalService resourceManagerRetrievalService,
+			WebMonitorEndpoint<?> webMonitorEndpoint,
+			JobManagerMetricGroup jobManagerMetricGroup) {
+		super(dispatcher, resourceManager, dispatcherLeaderRetrievalService, resourceManagerRetrievalService, webMonitorEndpoint, jobManagerMetricGroup);
 
-	@Override
-	protected void registerShutDownFuture(MiniDispatcher dispatcher, CompletableFuture<ApplicationStatus> shutDownFuture) {
-		super.registerShutDownFuture(dispatcher, shutDownFuture);
+		final CompletableFuture<ApplicationStatus> shutDownFuture = getShutDownFuture();
 
 		dispatcher.getJobTerminationFuture().whenComplete((applicationStatus, throwable) -> {
 			if (throwable != null) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/JobDispatcherResourceManagerComponentFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/JobDispatcherResourceManagerComponentFactory.java
new file mode 100644
index 0000000..c7ce14c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/JobDispatcherResourceManagerComponentFactory.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint.component;
+
+import org.apache.flink.runtime.dispatcher.JobDispatcherFactory;
+import org.apache.flink.runtime.dispatcher.MiniDispatcher;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
+import org.apache.flink.runtime.rest.JobRestEndpointFactory;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
+
+import javax.annotation.Nonnull;
+
+/**
+ * {@link DispatcherResourceManagerComponentFactory} for a {@link JobDispatcherResourceManagerComponent}.
+ */
+public class JobDispatcherResourceManagerComponentFactory extends AbstractDispatcherResourceManagerComponentFactory<MiniDispatcher, RestfulGateway> {
+
+	public JobDispatcherResourceManagerComponentFactory(@Nonnull ResourceManagerFactory<?> resourceManagerFactory, @Nonnull JobGraphRetriever jobGraphRetriever) {
+		super(new JobDispatcherFactory(jobGraphRetriever), resourceManagerFactory, JobRestEndpointFactory.INSTANCE);
+	}
+
+	@Override
+	protected DispatcherResourceManagerComponent<MiniDispatcher> createDispatcherResourceManagerComponent(
+			MiniDispatcher dispatcher,
+			ResourceManager<?> resourceManager,
+			LeaderRetrievalService dispatcherLeaderRetrievalService,
+			LeaderRetrievalService resourceManagerRetrievalService,
+			WebMonitorEndpoint<?> webMonitorEndpoint,
+			JobManagerMetricGroup jobManagerMetricGroup) {
+		return new JobDispatcherResourceManagerComponent(
+			dispatcher,
+			resourceManager,
+			dispatcherLeaderRetrievalService,
+			resourceManagerRetrievalService,
+			webMonitorEndpoint,
+			jobManagerMetricGroup);
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobGraphRetriever.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/JobGraphRetriever.java
similarity index 96%
rename from flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobGraphRetriever.java
rename to flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/JobGraphRetriever.java
index e2ace15..b1586ea 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobGraphRetriever.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/JobGraphRetriever.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.entrypoint;
+package org.apache.flink.runtime.entrypoint.component;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobGraph;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/SessionDispatcherResourceManagerComponent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/SessionDispatcherResourceManagerComponent.java
new file mode 100644
index 0000000..8be7b74
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/SessionDispatcherResourceManagerComponent.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint.component;
+
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
+
+/**
+ * {@link DispatcherResourceManagerComponent} used by session clusters.
+ */
+class SessionDispatcherResourceManagerComponent extends DispatcherResourceManagerComponent<Dispatcher> {
+	SessionDispatcherResourceManagerComponent(
+			Dispatcher dispatcher,
+			ResourceManager<?> resourceManager,
+			LeaderRetrievalService dispatcherLeaderRetrievalService,
+			LeaderRetrievalService resourceManagerRetrievalService,
+			WebMonitorEndpoint<?> webMonitorEndpoint,
+			JobManagerMetricGroup jobManagerMetricGroup) {
+		super(dispatcher, resourceManager, dispatcherLeaderRetrievalService, resourceManagerRetrievalService, webMonitorEndpoint, jobManagerMetricGroup);
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/SessionDispatcherResourceManagerComponentFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/SessionDispatcherResourceManagerComponentFactory.java
new file mode 100644
index 0000000..c44833d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/SessionDispatcherResourceManagerComponentFactory.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint.component;
+
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.SessionDispatcherFactory;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
+import org.apache.flink.runtime.rest.SessionRestEndpointFactory;
+import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
+
+import javax.annotation.Nonnull;
+
+/**
+ * {@link DispatcherResourceManagerComponentFactory} for a {@link SessionDispatcherResourceManagerComponent}.
+ */
+public class SessionDispatcherResourceManagerComponentFactory extends AbstractDispatcherResourceManagerComponentFactory<Dispatcher, DispatcherGateway> {
+
+	public SessionDispatcherResourceManagerComponentFactory(@Nonnull ResourceManagerFactory<?> resourceManagerFactory) {
+		super(SessionDispatcherFactory.INSTANCE, resourceManagerFactory, SessionRestEndpointFactory.INSTANCE);
+	}
+
+	@Override
+	protected DispatcherResourceManagerComponent<Dispatcher> createDispatcherResourceManagerComponent(
+			Dispatcher dispatcher,
+			ResourceManager<?> resourceManager,
+			LeaderRetrievalService dispatcherLeaderRetrievalService,
+			LeaderRetrievalService resourceManagerRetrievalService,
+			WebMonitorEndpoint<?> webMonitorEndpoint,
+			JobManagerMetricGroup jobManagerMetricGroup) {
+		return new SessionDispatcherResourceManagerComponent(
+			dispatcher,
+			resourceManager,
+			dispatcherLeaderRetrievalService,
+			resourceManagerRetrievalService,
+			webMonitorEndpoint,
+			jobManagerMetricGroup);
+	}
+}
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
index 1733f49..42e666e 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
@@ -19,11 +19,11 @@
 package org.apache.flink.yarn.entrypoint;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.entrypoint.ClusterComponent;
 import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
-import org.apache.flink.runtime.entrypoint.FileJobGraphRetriever;
-import org.apache.flink.runtime.entrypoint.JobClusterComponent;
 import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint;
+import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
+import org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever;
+import org.apache.flink.runtime.entrypoint.component.JobDispatcherResourceManagerComponentFactory;
 import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
@@ -62,8 +62,8 @@ public class YarnJobClusterEntrypoint extends JobClusterEntrypoint {
 	}
 
 	@Override
-	protected ClusterComponent<?> createClusterComponent(Configuration configuration) {
-		return new JobClusterComponent(
+	protected DispatcherResourceManagerComponentFactory<?> createDispatcherResourceManagerComponentFactory(Configuration configuration) {
+		return new JobDispatcherResourceManagerComponentFactory(
 			YarnResourceManagerFactory.INSTANCE,
 			FileJobGraphRetriever.createFrom(configuration));
 	}
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
index e0bebfd..0f4656e 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
@@ -19,10 +19,10 @@
 package org.apache.flink.yarn.entrypoint;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.entrypoint.ClusterComponent;
 import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
-import org.apache.flink.runtime.entrypoint.SessionClusterComponent;
 import org.apache.flink.runtime.entrypoint.SessionClusterEntrypoint;
+import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
+import org.apache.flink.runtime.entrypoint.component.SessionDispatcherResourceManagerComponentFactory;
 import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
@@ -60,8 +60,8 @@ public class YarnSessionClusterEntrypoint extends SessionClusterEntrypoint {
 	}
 
 	@Override
-	protected ClusterComponent<?> createClusterComponent(Configuration configuration) {
-		return new SessionClusterComponent(YarnResourceManagerFactory.INSTANCE);
+	protected DispatcherResourceManagerComponentFactory<?> createDispatcherResourceManagerComponentFactory(Configuration configuration) {
+		return new SessionDispatcherResourceManagerComponentFactory(YarnResourceManagerFactory.INSTANCE);
 	}
 
 	public static void main(String[] args) {


[flink] 01/04: [FLINK-10411] Make ClusterEntrypoint more compositional

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1fcbc9f816460b1a159fd79b4c9bcf83327d79c3
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Sun Sep 23 13:39:34 2018 +0200

    [FLINK-10411] Make ClusterEntrypoint more compositional
    
    Introduce a ClusterComponent which encapsulates the logic for starting the cluster
    components, Dispatcher, RestServerEndpoint and the ResourceManager. The individual
    components are created by using a factory instance. The ClusterEntrypoint is now
    only responsible for managing the required services needed by the ClusterComponent.
    This design should make the testing of these components easier, improve reusability
    and reduce code duplication.
---
 .../entrypoint/ClassPathJobGraphRetriever.java     |  80 +++++
 .../entrypoint/StandaloneJobClusterEntryPoint.java |  85 +----
 ...st.java => ClassPathJobGraphRetrieverTest.java} |  18 +-
 .../apache/flink/container/entrypoint/TestJob.java |   2 +-
 .../entrypoint/MesosJobClusterEntrypoint.java      |  89 +-----
 .../entrypoint/MesosSessionClusterEntrypoint.java  |  61 +---
 .../MesosResourceManagerFactory.java               |  90 ++++++
 .../runtime/dispatcher/DispatcherFactory.java      |  53 ++++
 .../runtime/dispatcher/JobDispatcherFactory.java   |  86 +++++
 .../dispatcher/SessionDispatcherFactory.java       |  69 ++++
 .../flink/runtime/entrypoint/ClusterComponent.java | 347 +++++++++++++++++++++
 .../runtime/entrypoint/ClusterEntrypoint.java      | 284 +++--------------
 .../runtime/entrypoint/FileJobGraphRetriever.java  |  70 +++++
 .../runtime/entrypoint/JobClusterComponent.java    |  51 +++
 .../runtime/entrypoint/JobClusterEntrypoint.java   |  98 ------
 .../runtime/entrypoint/JobGraphRetriever.java      |  38 +++
 .../entrypoint/SessionClusterComponent.java        |  34 ++
 .../entrypoint/SessionClusterEntrypoint.java       |  79 -----
 .../StandaloneSessionClusterEntrypoint.java        |  43 +--
 .../resourcemanager/ResourceManagerFactory.java    |  50 +++
 .../StandaloneResourceManagerFactory.java}         |  46 +--
 .../flink/runtime/rest/JobRestEndpointFactory.java |  66 ++++
 .../flink/runtime/rest/RestEndpointFactory.java    |  50 +++
 .../runtime/rest/SessionRestEndpointFactory.java   |  65 ++++
 .../yarn/entrypoint/YarnJobClusterEntrypoint.java  |  83 +----
 ...ypoint.java => YarnResourceManagerFactory.java} |  69 +---
 .../entrypoint/YarnSessionClusterEntrypoint.java   |  47 +--
 27 files changed, 1255 insertions(+), 898 deletions(-)

diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java
new file mode 100644
index 0000000..5ded04b
--- /dev/null
+++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.container.entrypoint;
+
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.client.program.PackagedProgramUtils;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.runtime.entrypoint.JobGraphRetriever;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.util.FlinkException;
+
+import javax.annotation.Nonnull;
+
+/**
+ * {@link JobGraphRetriever} which creates the {@link JobGraph} from a class
+ * on the class path.
+ */
+public class ClassPathJobGraphRetriever implements JobGraphRetriever {
+
+	@Nonnull
+	private final String jobClassName;
+
+	@Nonnull
+	private final SavepointRestoreSettings savepointRestoreSettings;
+
+	@Nonnull
+	private final String[] programArguments;
+
+	public ClassPathJobGraphRetriever(
+			@Nonnull String jobClassName,
+			@Nonnull SavepointRestoreSettings savepointRestoreSettings,
+			@Nonnull String[] programArguments) {
+		this.jobClassName = jobClassName;
+		this.savepointRestoreSettings = savepointRestoreSettings;
+		this.programArguments = programArguments;
+	}
+
+	@Override
+	public JobGraph retrieveJobGraph(Configuration configuration) throws FlinkException {
+		final PackagedProgram packagedProgram = createPackagedProgram();
+		final int defaultParallelism = configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM);
+		try {
+			final JobGraph jobGraph = PackagedProgramUtils.createJobGraph(packagedProgram, configuration, defaultParallelism);
+			jobGraph.setAllowQueuedScheduling(true);
+			jobGraph.setSavepointRestoreSettings(savepointRestoreSettings);
+
+			return jobGraph;
+		} catch (Exception e) {
+			throw new FlinkException("Could not create the JobGraph from the provided user code jar.", e);
+		}
+	}
+
+	private PackagedProgram createPackagedProgram() throws FlinkException {
+		try {
+			final Class<?> mainClass = getClass().getClassLoader().loadClass(jobClassName);
+			return new PackagedProgram(mainClass, programArguments);
+		} catch (ClassNotFoundException | ProgramInvocationException e) {
+			throw new FlinkException("Could not load the provided entrypoint class.", e);
+		}
+	}
+}
diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java
index 0e40095..cdd44b5 100644
--- a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java
+++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java
@@ -18,38 +18,20 @@
 
 package org.apache.flink.container.entrypoint;
 
-import org.apache.flink.client.program.PackagedProgram;
-import org.apache.flink.client.program.PackagedProgramUtils;
-import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.entrypoint.ClusterComponent;
 import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
-import org.apache.flink.runtime.entrypoint.ClusterInformation;
 import org.apache.flink.runtime.entrypoint.FlinkParseException;
+import org.apache.flink.runtime.entrypoint.JobClusterComponent;
 import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint;
 import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
-import org.apache.flink.runtime.heartbeat.HeartbeatServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
-import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.resourcemanager.ResourceManager;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
-import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.SignalHandler;
-import org.apache.flink.util.FlinkException;
 
 import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-
-import java.util.concurrent.CompletableFuture;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -79,62 +61,10 @@ public final class StandaloneJobClusterEntryPoint extends JobClusterEntrypoint {
 	}
 
 	@Override
-	protected JobGraph retrieveJobGraph(Configuration configuration) throws FlinkException {
-		final PackagedProgram packagedProgram = createPackagedProgram();
-		final int defaultParallelism = configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM);
-		try {
-			final JobGraph jobGraph = PackagedProgramUtils.createJobGraph(packagedProgram, configuration, defaultParallelism);
-			jobGraph.setAllowQueuedScheduling(true);
-			jobGraph.setSavepointRestoreSettings(savepointRestoreSettings);
-
-			return jobGraph;
-		} catch (Exception e) {
-			throw new FlinkException("Could not create the JobGraph from the provided user code jar.", e);
-		}
-	}
-
-	private PackagedProgram createPackagedProgram() throws FlinkException {
-		try {
-			final Class<?> mainClass = getClass().getClassLoader().loadClass(jobClassName);
-			return new PackagedProgram(mainClass, programArguments);
-		} catch (ClassNotFoundException | ProgramInvocationException e) {
-			throw new FlinkException("Could not load the provided entrypoint class.", e);
-		}
-	}
-
-	@Override
-	protected void registerShutdownActions(CompletableFuture<ApplicationStatus> terminationFuture) {
-		terminationFuture.thenAccept((status) -> shutDownAndTerminate(0, ApplicationStatus.SUCCEEDED, null, true));
-	}
-
-	@Override
-	protected ResourceManager<?> createResourceManager(
-			Configuration configuration,
-			ResourceID resourceId,
-			RpcService rpcService,
-			HighAvailabilityServices highAvailabilityServices,
-			HeartbeatServices heartbeatServices,
-			MetricRegistry metricRegistry,
-			FatalErrorHandler fatalErrorHandler,
-			ClusterInformation clusterInformation,
-			@Nullable String webInterfaceUrl) throws Exception {
-		final ResourceManagerRuntimeServicesConfiguration resourceManagerRuntimeServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
-		final ResourceManagerRuntimeServices resourceManagerRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
-			resourceManagerRuntimeServicesConfiguration,
-			highAvailabilityServices,
-			rpcService.getScheduledExecutor());
-
-		return new StandaloneResourceManager(
-			rpcService,
-			ResourceManager.RESOURCE_MANAGER_NAME,
-			resourceId,
-			highAvailabilityServices,
-			heartbeatServices,
-			resourceManagerRuntimeServices.getSlotManager(),
-			metricRegistry,
-			resourceManagerRuntimeServices.getJobLeaderIdService(),
-			clusterInformation,
-			fatalErrorHandler);
+	protected ClusterComponent<?> createDispatcherComponent(Configuration configuration) {
+		return new JobClusterComponent(
+			StandaloneResourceManagerFactory.INSTANCE,
+			new ClassPathJobGraphRetriever(jobClassName, savepointRestoreSettings, programArguments));
 	}
 
 	public static void main(String[] args) {
@@ -166,5 +96,4 @@ public final class StandaloneJobClusterEntryPoint extends JobClusterEntrypoint {
 
 		entrypoint.startCluster();
 	}
-
 }
diff --git a/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java b/flink-container/src/test/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetrieverTest.java
similarity index 81%
rename from flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java
rename to flink-container/src/test/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetrieverTest.java
index 2faec4c..6e460e1 100644
--- a/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPointTest.java
+++ b/flink-container/src/test/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetrieverTest.java
@@ -32,24 +32,24 @@ import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertThat;
 
 /**
- * Tests for the {@link StandaloneJobClusterEntryPoint}.
+ * Tests for the {@link ClassPathJobGraphRetriever}.
  */
-public class StandaloneJobClusterEntryPointTest extends TestLogger {
+public class ClassPathJobGraphRetrieverTest extends TestLogger {
 
 	public static final String[] PROGRAM_ARGUMENTS = {"--arg", "suffix"};
 
 	@Test
 	public void testJobGraphRetrieval() throws FlinkException {
-		final Configuration configuration = new Configuration();
 		final int parallelism = 42;
+		final Configuration configuration = new Configuration();
 		configuration.setInteger(CoreOptions.DEFAULT_PARALLELISM, parallelism);
-		final StandaloneJobClusterEntryPoint standaloneJobClusterEntryPoint = new StandaloneJobClusterEntryPoint(
-			configuration,
+
+		final ClassPathJobGraphRetriever classPathJobGraphRetriever = new ClassPathJobGraphRetriever(
 			TestJob.class.getCanonicalName(),
 			SavepointRestoreSettings.none(),
 			PROGRAM_ARGUMENTS);
 
-		final JobGraph jobGraph = standaloneJobClusterEntryPoint.retrieveJobGraph(configuration);
+		final JobGraph jobGraph = classPathJobGraphRetriever.retrieveJobGraph(configuration);
 
 		assertThat(jobGraph.getName(), is(equalTo(TestJob.class.getCanonicalName() + "-suffix")));
 		assertThat(jobGraph.getMaximumParallelism(), is(parallelism));
@@ -59,13 +59,13 @@ public class StandaloneJobClusterEntryPointTest extends TestLogger {
 	public void testSavepointRestoreSettings() throws FlinkException {
 		final Configuration configuration = new Configuration();
 		final SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.forPath("foobar", true);
-		final StandaloneJobClusterEntryPoint jobClusterEntryPoint = new StandaloneJobClusterEntryPoint(
-			configuration,
+
+		final ClassPathJobGraphRetriever classPathJobGraphRetriever = new ClassPathJobGraphRetriever(
 			TestJob.class.getCanonicalName(),
 			savepointRestoreSettings,
 			PROGRAM_ARGUMENTS);
 
-		final JobGraph jobGraph = jobClusterEntryPoint.retrieveJobGraph(configuration);
+		final JobGraph jobGraph = classPathJobGraphRetriever.retrieveJobGraph(configuration);
 
 		assertThat(jobGraph.getSavepointRestoreSettings(), is(equalTo(savepointRestoreSettings)));
 	}
diff --git a/flink-container/src/test/java/org/apache/flink/container/entrypoint/TestJob.java b/flink-container/src/test/java/org/apache/flink/container/entrypoint/TestJob.java
index ada434d..27d1f32 100644
--- a/flink-container/src/test/java/org/apache/flink/container/entrypoint/TestJob.java
+++ b/flink-container/src/test/java/org/apache/flink/container/entrypoint/TestJob.java
@@ -25,7 +25,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 
 /**
- * Test job which is used for {@link StandaloneJobClusterEntryPointTest}.
+ * Test job which is used for {@link ClassPathJobGraphRetrieverTest}.
  */
 public class TestJob {
 
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
index 38b61d8..ed2175a 100755
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
@@ -20,31 +20,21 @@ package org.apache.flink.mesos.entrypoint;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager;
+import org.apache.flink.mesos.runtime.clusterframework.MesosResourceManagerFactory;
 import org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters;
 import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
 import org.apache.flink.mesos.runtime.clusterframework.services.MesosServicesUtils;
 import org.apache.flink.mesos.util.MesosConfiguration;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.clusterframework.ContainerSpecification;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.entrypoint.ClusterComponent;
+import org.apache.flink.runtime.entrypoint.FileJobGraphRetriever;
+import org.apache.flink.runtime.entrypoint.JobClusterComponent;
 import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint;
-import org.apache.flink.runtime.heartbeat.HeartbeatServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.resourcemanager.ResourceManager;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.SignalHandler;
-import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.commons.cli.CommandLine;
@@ -52,13 +42,6 @@ import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
 
-import javax.annotation.Nullable;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.ObjectInputStream;
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -66,8 +49,6 @@ import java.util.concurrent.CompletableFuture;
  */
 public class MesosJobClusterEntrypoint extends JobClusterEntrypoint {
 
-	public static final String JOB_GRAPH_FILE_PATH = "flink.jobgraph.path";
-
 	// ------------------------------------------------------------------------
 	//  Command-line options
 	// ------------------------------------------------------------------------
@@ -114,58 +95,6 @@ public class MesosJobClusterEntrypoint extends JobClusterEntrypoint {
 	}
 
 	@Override
-	protected ResourceManager<?> createResourceManager(
-			Configuration configuration,
-			ResourceID resourceId,
-			RpcService rpcService,
-			HighAvailabilityServices highAvailabilityServices,
-			HeartbeatServices heartbeatServices,
-			MetricRegistry metricRegistry,
-			FatalErrorHandler fatalErrorHandler,
-			ClusterInformation clusterInformation,
-			@Nullable String webInterfaceUrl) throws Exception {
-		final ResourceManagerRuntimeServicesConfiguration rmServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
-		final ResourceManagerRuntimeServices rmRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
-			rmServicesConfiguration,
-			highAvailabilityServices,
-			rpcService.getScheduledExecutor());
-
-		return new MesosResourceManager(
-			rpcService,
-			ResourceManager.RESOURCE_MANAGER_NAME,
-			resourceId,
-			highAvailabilityServices,
-			heartbeatServices,
-			rmRuntimeServices.getSlotManager(),
-			metricRegistry,
-			rmRuntimeServices.getJobLeaderIdService(),
-			clusterInformation,
-			fatalErrorHandler,
-			configuration,
-			mesosServices,
-			schedulerConfiguration,
-			taskManagerParameters,
-			taskManagerContainerSpec,
-			webInterfaceUrl);
-	}
-
-	@Override
-	protected JobGraph retrieveJobGraph(Configuration configuration) throws FlinkException {
-		String jobGraphFile = configuration.getString(JOB_GRAPH_FILE_PATH, "job.graph");
-		File fp = new File(jobGraphFile);
-
-		try (FileInputStream input = new FileInputStream(fp);
-			ObjectInputStream obInput = new ObjectInputStream(input)) {
-
-			return (JobGraph) obInput.readObject();
-		} catch (FileNotFoundException e) {
-			throw new FlinkException("Could not find the JobGraph file.", e);
-		} catch (ClassNotFoundException | IOException e) {
-			throw new FlinkException("Could not load the JobGraph from file.", e);
-		}
-	}
-
-	@Override
 	protected CompletableFuture<Void> stopClusterServices(boolean cleanupHaData) {
 		final CompletableFuture<Void> serviceShutDownFuture = super.stopClusterServices(cleanupHaData);
 
@@ -179,7 +108,15 @@ public class MesosJobClusterEntrypoint extends JobClusterEntrypoint {
 	}
 
 	@Override
-	protected void registerShutdownActions(CompletableFuture<ApplicationStatus> terminationFuture) {}
+	protected ClusterComponent<?> createDispatcherComponent(Configuration configuration) {
+		return new JobClusterComponent(
+			new MesosResourceManagerFactory(
+				mesosServices,
+				schedulerConfiguration,
+				taskManagerParameters,
+				taskManagerContainerSpec),
+			FileJobGraphRetriever.createFrom(configuration));
+	}
 
 	public static void main(String[] args) {
 		// startup checks and logging
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
index 70369f6..0cc3053 100755
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
@@ -20,25 +20,17 @@ package org.apache.flink.mesos.entrypoint;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager;
+import org.apache.flink.mesos.runtime.clusterframework.MesosResourceManagerFactory;
 import org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters;
 import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
 import org.apache.flink.mesos.runtime.clusterframework.services.MesosServicesUtils;
 import org.apache.flink.mesos.util.MesosConfiguration;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.clusterframework.ContainerSpecification;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.entrypoint.ClusterComponent;
+import org.apache.flink.runtime.entrypoint.SessionClusterComponent;
 import org.apache.flink.runtime.entrypoint.SessionClusterEntrypoint;
-import org.apache.flink.runtime.heartbeat.HeartbeatServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.resourcemanager.ResourceManager;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.SignalHandler;
@@ -49,8 +41,6 @@ import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
 
-import javax.annotation.Nullable;
-
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -104,42 +94,6 @@ public class MesosSessionClusterEntrypoint extends SessionClusterEntrypoint {
 	}
 
 	@Override
-	protected ResourceManager<?> createResourceManager(
-			Configuration configuration,
-			ResourceID resourceId,
-			RpcService rpcService,
-			HighAvailabilityServices highAvailabilityServices,
-			HeartbeatServices heartbeatServices,
-			MetricRegistry metricRegistry,
-			FatalErrorHandler fatalErrorHandler,
-			ClusterInformation clusterInformation,
-			@Nullable String webInterfaceUrl) throws Exception {
-		final ResourceManagerRuntimeServicesConfiguration rmServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
-		final ResourceManagerRuntimeServices rmRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
-			rmServicesConfiguration,
-			highAvailabilityServices,
-			rpcService.getScheduledExecutor());
-
-		return new MesosResourceManager(
-			rpcService,
-			ResourceManager.RESOURCE_MANAGER_NAME,
-			resourceId,
-			highAvailabilityServices,
-			heartbeatServices,
-			rmRuntimeServices.getSlotManager(),
-			metricRegistry,
-			rmRuntimeServices.getJobLeaderIdService(),
-			clusterInformation,
-			fatalErrorHandler,
-			configuration,
-			mesosServices,
-			mesosConfig,
-			taskManagerParameters,
-			taskManagerContainerSpec,
-			webInterfaceUrl);
-	}
-
-	@Override
 	protected CompletableFuture<Void> stopClusterServices(boolean cleanupHaData) {
 		final CompletableFuture<Void> serviceShutDownFuture = super.stopClusterServices(cleanupHaData);
 
@@ -152,6 +106,15 @@ public class MesosSessionClusterEntrypoint extends SessionClusterEntrypoint {
 			});
 	}
 
+	@Override
+	protected ClusterComponent<?> createDispatcherComponent(Configuration configuration) {
+		return new SessionClusterComponent(new MesosResourceManagerFactory(
+			mesosServices,
+			mesosConfig,
+			taskManagerParameters,
+			taskManagerContainerSpec));
+	}
+
 	public static void main(String[] args) {
 		// startup checks and logging
 		EnvironmentInformation.logEnvironmentInfo(LOG, MesosSessionClusterEntrypoint.class.getSimpleName(), args);
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerFactory.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerFactory.java
new file mode 100644
index 0000000..9582e9f
--- /dev/null
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerFactory.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+/**
+ * {@link ResourceManagerFactory} which creates a {@link MesosResourceManager}.
+ */
+public class MesosResourceManagerFactory implements ResourceManagerFactory<RegisteredMesosWorkerNode> {
+
+	@Nonnull
+	private final MesosServices mesosServices;
+
+	@Nonnull
+	private final MesosConfiguration schedulerConfiguration;
+
+	@Nonnull
+	private final MesosTaskManagerParameters taskManagerParameters;
+
+	@Nonnull
+	private final ContainerSpecification taskManagerContainerSpec;
+
+	public MesosResourceManagerFactory(@Nonnull MesosServices mesosServices, @Nonnull MesosConfiguration schedulerConfiguration, @Nonnull MesosTaskManagerParameters taskManagerParameters, @Nonnull ContainerSpecification taskManagerContainerSpec) {
+		this.mesosServices = mesosServices;
+		this.schedulerConfiguration = schedulerConfiguration;
+		this.taskManagerParameters = taskManagerParameters;
+		this.taskManagerContainerSpec = taskManagerContainerSpec;
+	}
+
+	@Override
+	public ResourceManager<RegisteredMesosWorkerNode> createResourceManager(Configuration configuration, ResourceID resourceId, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, MetricRegistry metricRegistry, FatalErrorHandler fatalErrorHandler, ClusterInformation clusterInformation, @Nullable String webInterfaceUrl) throws Exception {
+		final ResourceManagerRuntimeServicesConfiguration rmServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
+		final ResourceManagerRuntimeServices rmRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
+			rmServicesConfiguration,
+			highAvailabilityServices,
+			rpcService.getScheduledExecutor());
+
+		return new MesosResourceManager(
+			rpcService,
+			ResourceManager.RESOURCE_MANAGER_NAME,
+			resourceId,
+			highAvailabilityServices,
+			heartbeatServices,
+			rmRuntimeServices.getSlotManager(),
+			metricRegistry,
+			rmRuntimeServices.getJobLeaderIdService(),
+			clusterInformation,
+			fatalErrorHandler,
+			configuration,
+			mesosServices,
+			schedulerConfiguration,
+			taskManagerParameters,
+			taskManagerContainerSpec,
+			webInterfaceUrl);
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherFactory.java
new file mode 100644
index 0000000..5952299
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherFactory.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+
+import javax.annotation.Nullable;
+
+/**
+ * {@link Dispatcher} factory interface.
+ */
+public interface DispatcherFactory<T extends Dispatcher> {
+
+	/**
+	 * Create a {@link Dispatcher} of the given type {@link T}.
+	 */
+	T createDispatcher(
+		Configuration configuration,
+		RpcService rpcService,
+		HighAvailabilityServices highAvailabilityServices,
+		ResourceManagerGateway resourceManagerGateway,
+		BlobServer blobServer,
+		HeartbeatServices heartbeatServices,
+		JobManagerMetricGroup jobManagerMetricGroup,
+		@Nullable String metricQueryServicePath,
+		ArchivedExecutionGraphStore archivedExecutionGraphStore,
+		FatalErrorHandler fatalErrorHandler,
+		@Nullable String restAddress,
+		HistoryServerArchivist historyServerArchivist) throws Exception;
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobDispatcherFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobDispatcherFactory.java
new file mode 100644
index 0000000..488f2fc
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobDispatcherFactory.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
+import org.apache.flink.runtime.entrypoint.JobGraphRetriever;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+
+import javax.annotation.Nullable;
+
+import static org.apache.flink.runtime.entrypoint.ClusterEntrypoint.EXECUTION_MODE;
+
+/**
+ * {@link DispatcherFactory} which creates a {@link MiniDispatcher}.
+ */
+public class JobDispatcherFactory implements DispatcherFactory<MiniDispatcher> {
+
+	private final JobGraphRetriever jobGraphRetriever;
+
+	public JobDispatcherFactory(JobGraphRetriever jobGraphRetriever) {
+		this.jobGraphRetriever = jobGraphRetriever;
+	}
+
+	@Override
+	public MiniDispatcher createDispatcher(
+			Configuration configuration,
+			RpcService rpcService,
+			HighAvailabilityServices highAvailabilityServices,
+			ResourceManagerGateway resourceManagerGateway,
+			BlobServer blobServer,
+			HeartbeatServices heartbeatServices,
+			JobManagerMetricGroup jobManagerMetricGroup,
+			@Nullable String metricQueryServicePath,
+			ArchivedExecutionGraphStore archivedExecutionGraphStore,
+			FatalErrorHandler fatalErrorHandler,
+			@Nullable String restAddress,
+			HistoryServerArchivist historyServerArchivist) throws Exception {
+		final JobGraph jobGraph = jobGraphRetriever.retrieveJobGraph(configuration);
+
+		final String executionModeValue = configuration.getString(EXECUTION_MODE);
+
+		final ClusterEntrypoint.ExecutionMode executionMode = ClusterEntrypoint.ExecutionMode.valueOf(executionModeValue);
+
+		return new MiniDispatcher(
+			rpcService,
+			Dispatcher.DISPATCHER_NAME,
+			configuration,
+			highAvailabilityServices,
+			resourceManagerGateway,
+			blobServer,
+			heartbeatServices,
+			jobManagerMetricGroup,
+			metricQueryServicePath,
+			archivedExecutionGraphStore,
+			Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE,
+			fatalErrorHandler,
+			restAddress,
+			historyServerArchivist,
+			jobGraph,
+			executionMode);
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SessionDispatcherFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SessionDispatcherFactory.java
new file mode 100644
index 0000000..18e29a0
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SessionDispatcherFactory.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+
+import javax.annotation.Nullable;
+
+/**
+ * {@link DispatcherFactory} which creates a {@link StandaloneDispatcher}.
+ */
+public enum SessionDispatcherFactory implements DispatcherFactory<Dispatcher> {
+	INSTANCE;
+
+	@Override
+	public Dispatcher createDispatcher(
+				Configuration configuration,
+				RpcService rpcService,
+				HighAvailabilityServices highAvailabilityServices,
+				ResourceManagerGateway resourceManagerGateway,
+				BlobServer blobServer,
+				HeartbeatServices heartbeatServices,
+				JobManagerMetricGroup jobManagerMetricGroup,
+				@Nullable String metricQueryServicePath,
+				ArchivedExecutionGraphStore archivedExecutionGraphStore,
+				FatalErrorHandler fatalErrorHandler,
+				@Nullable String restAddress,
+				HistoryServerArchivist historyServerArchivist) throws Exception {
+		// create the default dispatcher
+		return new StandaloneDispatcher(
+			rpcService,
+			Dispatcher.DISPATCHER_NAME,
+			configuration,
+			highAvailabilityServices,
+			resourceManagerGateway,
+			blobServer,
+			heartbeatServices,
+			jobManagerMetricGroup,
+			metricQueryServicePath,
+			archivedExecutionGraphStore,
+			Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE,
+			fatalErrorHandler,
+			restAddress,
+			historyServerArchivist);
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterComponent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterComponent.java
new file mode 100644
index 0000000..af02729
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterComponent.java
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ConfigurationUtils;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.runtime.dispatcher.DispatcherFactory;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.DispatcherId;
+import org.apache.flink.runtime.dispatcher.HistoryServerArchivist;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.metrics.util.MetricUtils;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
+import org.apache.flink.runtime.rest.RestEndpointFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
+import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever;
+import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
+
+import akka.actor.ActorSystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Component which starts a {@link Dispatcher}, {@link ResourceManager} and {@link WebMonitorEndpoint}
+ * in the same process.
+ */
+public class ClusterComponent<T extends Dispatcher> implements AutoCloseableAsync {
+
+	private static final Logger LOG = LoggerFactory.getLogger(ClusterComponent.class);
+
+	private final Object lock = new Object();
+
+	private final DispatcherFactory<T> dispatcherFactory;
+
+	private final ResourceManagerFactory<?> resourceManagerFactory;
+
+	private final RestEndpointFactory<?> restEndpointFactory;
+
+	private final CompletableFuture<Void> terminationFuture;
+
+	private final CompletableFuture<ApplicationStatus> shutDownFuture;
+
+	@GuardedBy("lock")
+	private State state;
+
+	@GuardedBy("lock")
+	private ResourceManager<?> resourceManager;
+
+	@GuardedBy("lock")
+	private T dispatcher;
+
+	@GuardedBy("lock")
+	private LeaderRetrievalService dispatcherLeaderRetrievalService;
+
+	@GuardedBy("lock")
+	private LeaderRetrievalService resourceManagerRetrievalService;
+
+	@GuardedBy("lock")
+	private WebMonitorEndpoint<?> webMonitorEndpoint;
+
+	@GuardedBy("lock")
+	private JobManagerMetricGroup jobManagerMetricGroup;
+
+	public ClusterComponent(
+			DispatcherFactory<T> dispatcherFactory,
+			ResourceManagerFactory<?> resourceManagerFactory,
+			RestEndpointFactory<?> restEndpointFactory) {
+		this.dispatcherFactory = dispatcherFactory;
+		this.resourceManagerFactory = resourceManagerFactory;
+		this.restEndpointFactory = restEndpointFactory;
+		this.terminationFuture = new CompletableFuture<>();
+		this.shutDownFuture = new CompletableFuture<>();
+		this.state = State.CREATED;
+
+		terminationFuture.whenComplete(
+			(aVoid, throwable) -> {
+				if (throwable != null) {
+					shutDownFuture.completeExceptionally(throwable);
+				} else {
+					shutDownFuture.complete(ApplicationStatus.SUCCEEDED);
+				}
+			});
+	}
+
+	public T getDispatcher() {
+		synchronized (lock) {
+			return dispatcher;
+		}
+	}
+
+	public CompletableFuture<Void> getTerminationFuture() {
+		return terminationFuture;
+	}
+
+	public CompletableFuture<ApplicationStatus> getShutDownFuture() {
+		return shutDownFuture;
+	}
+
+	public void startComponent(
+			Configuration configuration,
+			RpcService rpcService,
+			HighAvailabilityServices highAvailabilityServices,
+			BlobServer blobServer,
+			HeartbeatServices heartbeatServices,
+			MetricRegistry metricRegistry,
+			ArchivedExecutionGraphStore archivedExecutionGraphStore,
+			FatalErrorHandler fatalErrorHandler) throws Exception {
+		synchronized (lock) {
+			Preconditions.checkState(state == State.CREATED);
+			state = State.RUNNING;
+
+			dispatcherLeaderRetrievalService = highAvailabilityServices.getDispatcherLeaderRetriever();
+
+			resourceManagerRetrievalService = highAvailabilityServices.getResourceManagerLeaderRetriever();
+
+			LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever = new RpcGatewayRetriever<>(
+				rpcService,
+				DispatcherGateway.class,
+				DispatcherId::fromUuid,
+				10,
+				Time.milliseconds(50L));
+
+			LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever = new RpcGatewayRetriever<>(
+				rpcService,
+				ResourceManagerGateway.class,
+				ResourceManagerId::fromUuid,
+				10,
+				Time.milliseconds(50L));
+
+			// TODO: Remove once we have ported the MetricFetcher to the RpcEndpoint
+			final ActorSystem actorSystem = ((AkkaRpcService) rpcService).getActorSystem();
+			final Time timeout = Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT));
+
+			webMonitorEndpoint = restEndpointFactory.createRestEndpoint(
+				configuration,
+				dispatcherGatewayRetriever,
+				resourceManagerGatewayRetriever,
+				blobServer,
+				rpcService.getExecutor(),
+				new AkkaQueryServiceRetriever(actorSystem, timeout),
+				highAvailabilityServices.getWebMonitorLeaderElectionService(),
+				fatalErrorHandler);
+
+			LOG.debug("Starting Dispatcher REST endpoint.");
+			webMonitorEndpoint.start();
+
+			resourceManager = resourceManagerFactory.createResourceManager(
+				configuration,
+				ResourceID.generate(),
+				rpcService,
+				highAvailabilityServices,
+				heartbeatServices,
+				metricRegistry,
+				fatalErrorHandler,
+				new ClusterInformation(rpcService.getAddress(), blobServer.getPort()),
+				webMonitorEndpoint.getRestBaseUrl());
+
+			jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(
+				metricRegistry,
+				rpcService.getAddress(),
+				ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration));
+
+			final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, webMonitorEndpoint);
+
+			dispatcher = dispatcherFactory.createDispatcher(
+				configuration,
+				rpcService,
+				highAvailabilityServices,
+				resourceManager.getSelfGateway(ResourceManagerGateway.class),
+				blobServer,
+				heartbeatServices,
+				jobManagerMetricGroup,
+				metricRegistry.getMetricQueryServicePath(),
+				archivedExecutionGraphStore,
+				fatalErrorHandler,
+				webMonitorEndpoint.getRestBaseUrl(),
+				historyServerArchivist);
+
+			registerShutDownFuture(dispatcher, shutDownFuture);
+
+			LOG.debug("Starting ResourceManager.");
+			resourceManager.start();
+			resourceManagerRetrievalService.start(resourceManagerGatewayRetriever);
+
+			LOG.debug("Starting Dispatcher.");
+			dispatcher.start();
+			dispatcherLeaderRetrievalService.start(dispatcherGatewayRetriever);
+		}
+	}
+
+	protected void registerShutDownFuture(T dispatcher, CompletableFuture<ApplicationStatus> shutDownFuture) {
+			dispatcher
+				.getTerminationFuture()
+				.whenComplete(
+					(aVoid, throwable) -> {
+						if (throwable != null) {
+							shutDownFuture.completeExceptionally(throwable);
+						} else {
+							shutDownFuture.complete(ApplicationStatus.SUCCEEDED);
+						}
+					});
+	}
+
+	@Override
+	public CompletableFuture<Void> closeAsync() {
+		synchronized (lock) {
+			if (state == State.RUNNING) {
+				Exception exception = null;
+
+				final Collection<CompletableFuture<Void>> terminationFutures = new ArrayList<>(4);
+
+				if (dispatcherLeaderRetrievalService != null) {
+					try {
+						dispatcherLeaderRetrievalService.stop();
+					} catch (Exception e) {
+						exception = ExceptionUtils.firstOrSuppressed(e, exception);
+					}
+				}
+
+				if (resourceManagerRetrievalService != null) {
+					try {
+						resourceManagerRetrievalService.stop();
+					} catch (Exception e) {
+						exception = ExceptionUtils.firstOrSuppressed(e, exception);
+					}
+				}
+
+				if (webMonitorEndpoint != null) {
+					terminationFutures.add(webMonitorEndpoint.closeAsync());
+				}
+
+				if (dispatcher != null) {
+					dispatcher.shutDown();
+					terminationFutures.add(dispatcher.getTerminationFuture());
+				}
+
+				if (resourceManager != null) {
+					resourceManager.shutDown();
+					terminationFutures.add(resourceManager.getTerminationFuture());
+				}
+
+				if (exception != null) {
+					terminationFutures.add(FutureUtils.completedExceptionally(exception));
+				}
+
+				final CompletableFuture<Void> componentTerminationFuture = FutureUtils.completeAll(terminationFutures);
+
+				final CompletableFuture<Void> metricGroupTerminationFuture;
+
+				if (jobManagerMetricGroup != null) {
+					metricGroupTerminationFuture = FutureUtils.runAfterwards(
+						componentTerminationFuture,
+						() -> {
+							synchronized (lock) {
+								jobManagerMetricGroup.close();
+							}
+						});
+				} else {
+					metricGroupTerminationFuture = componentTerminationFuture;
+				}
+
+				metricGroupTerminationFuture.whenComplete((aVoid, throwable) -> {
+					if (throwable != null) {
+						terminationFuture.completeExceptionally(throwable);
+					} else {
+						terminationFuture.complete(aVoid);
+					}
+				});
+			} else if (state == State.CREATED) {
+				terminationFuture.complete(null);
+			}
+
+			state = State.TERMINATED;
+			return terminationFuture;
+		}
+	}
+
+	/**
+	 * Deregister the Flink application from the resource management system by signalling
+	 * the {@link ResourceManager}.
+	 *
+	 * @param applicationStatus to terminate the application with
+	 * @param diagnostics additional information about the shut down, can be {@code null}
+	 * @return Future which is completed once the shut down
+	 */
+	public CompletableFuture<Void> deregisterApplication(ApplicationStatus applicationStatus, @Nullable String diagnostics) {
+		synchronized (lock) {
+			if (resourceManager != null) {
+				final ResourceManagerGateway selfGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class);
+				return selfGateway.deregisterApplication(applicationStatus, diagnostics).thenApply(ack -> null);
+			} else {
+				return CompletableFuture.completedFuture(null);
+			}
+		}
+	}
+
+	enum State {
+		CREATED,
+		RUNNING,
+		TERMINATED
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index 1a8c058..39e1265 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -32,32 +32,19 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.blob.TransientBlobCache;
-import org.apache.flink.runtime.blob.TransientBlobService;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
-import org.apache.flink.runtime.dispatcher.Dispatcher;
-import org.apache.flink.runtime.dispatcher.DispatcherGateway;
-import org.apache.flink.runtime.dispatcher.DispatcherId;
-import org.apache.flink.runtime.dispatcher.HistoryServerArchivist;
 import org.apache.flink.runtime.dispatcher.MiniDispatcher;
 import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
-import org.apache.flink.runtime.leaderelection.LeaderElectionService;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.MetricRegistryImpl;
-import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
-import org.apache.flink.runtime.metrics.util.MetricUtils;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
@@ -65,11 +52,6 @@ import org.apache.flink.runtime.security.SecurityConfiguration;
 import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
-import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
-import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
-import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
-import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever;
-import org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.Preconditions;
@@ -109,7 +91,6 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 
 	protected static final Logger LOG = LoggerFactory.getLogger(ClusterEntrypoint.class);
 
-	protected static final int SUCCESS_RETURN_CODE = 0;
 	protected static final int STARTUP_FAILURE_RETURN_CODE = 1;
 	protected static final int RUNTIME_FAILURE_RETURN_CODE = 2;
 
@@ -125,6 +106,9 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 	private final AtomicBoolean isShutDown = new AtomicBoolean(false);
 
 	@GuardedBy("lock")
+	private ClusterComponent<?> dispatcherComponent;
+
+	@GuardedBy("lock")
 	private MetricRegistryImpl metricRegistry;
 
 	@GuardedBy("lock")
@@ -140,32 +124,11 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 	private RpcService commonRpcService;
 
 	@GuardedBy("lock")
-	private ResourceManager<?> resourceManager;
-
-	@GuardedBy("lock")
-	private Dispatcher dispatcher;
-
-	@GuardedBy("lock")
-	private LeaderRetrievalService dispatcherLeaderRetrievalService;
-
-	@GuardedBy("lock")
-	private LeaderRetrievalService resourceManagerRetrievalService;
-
-	@GuardedBy("lock")
-	private WebMonitorEndpoint<?> webMonitorEndpoint;
-
-	@GuardedBy("lock")
 	private ArchivedExecutionGraphStore archivedExecutionGraphStore;
 
 	@GuardedBy("lock")
 	private TransientBlobCache transientBlobCache;
 
-	@GuardedBy("lock")
-	private ClusterInformation clusterInformation;
-
-	@GuardedBy("lock")
-	private JobManagerMetricGroup jobManagerMetricGroup;
-
 	private final Thread shutDownHook;
 
 	protected ClusterEntrypoint(Configuration configuration) {
@@ -223,7 +186,7 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 		return SecurityUtils.getInstalledContext();
 	}
 
-	protected void runCluster(Configuration configuration) throws Exception {
+	private void runCluster(Configuration configuration) throws Exception {
 		synchronized (lock) {
 			initializeServices(configuration);
 
@@ -231,27 +194,35 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 			configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
 			configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());
 
-			startClusterComponents(
+			dispatcherComponent = createDispatcherComponent(configuration);
+
+			dispatcherComponent.startComponent(
 				configuration,
 				commonRpcService,
 				haServices,
 				blobServer,
 				heartbeatServices,
-				metricRegistry);
+				metricRegistry,
+				archivedExecutionGraphStore,
+				this);
 
-			dispatcher.getTerminationFuture().whenComplete(
-				(Void value, Throwable throwable) -> {
+			dispatcherComponent.getShutDownFuture().whenComplete(
+				(ApplicationStatus applicationStatus, Throwable throwable) -> {
 					if (throwable != null) {
-						LOG.info("Could not properly terminate the Dispatcher.", throwable);
+						shutDownAndTerminate(
+							RUNTIME_FAILURE_RETURN_CODE,
+							ApplicationStatus.UNKNOWN,
+							ExceptionUtils.stringifyException(throwable),
+							false);
+					} else {
+						// This is the general shutdown path. If a separate more specific shutdown was
+						// already triggered, this will do nothing
+						shutDownAndTerminate(
+							applicationStatus.processExitCode(),
+							applicationStatus,
+							null,
+							true);
 					}
-
-					// This is the general shutdown path. If a separate more specific shutdown was
-					// already triggered, this will do nothing
-					shutDownAndTerminate(
-						SUCCESS_RETURN_CODE,
-						ApplicationStatus.SUCCEEDED,
-						throwable != null ? throwable.getMessage() : null,
-						true);
 				});
 		}
 	}
@@ -283,99 +254,11 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 
 			archivedExecutionGraphStore = createSerializableExecutionGraphStore(configuration, commonRpcService.getScheduledExecutor());
 
-			clusterInformation = new ClusterInformation(
-				commonRpcService.getAddress(),
-				blobServer.getPort());
-
 			transientBlobCache = new TransientBlobCache(
 				configuration,
 				new InetSocketAddress(
-					clusterInformation.getBlobServerHostname(),
-					clusterInformation.getBlobServerPort()));
-		}
-	}
-
-	protected void startClusterComponents(
-			Configuration configuration,
-			RpcService rpcService,
-			HighAvailabilityServices highAvailabilityServices,
-			BlobServer blobServer,
-			HeartbeatServices heartbeatServices,
-			MetricRegistry metricRegistry) throws Exception {
-		synchronized (lock) {
-			dispatcherLeaderRetrievalService = highAvailabilityServices.getDispatcherLeaderRetriever();
-
-			resourceManagerRetrievalService = highAvailabilityServices.getResourceManagerLeaderRetriever();
-
-			LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever = new RpcGatewayRetriever<>(
-				rpcService,
-				DispatcherGateway.class,
-				DispatcherId::fromUuid,
-				10,
-				Time.milliseconds(50L));
-
-			LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever = new RpcGatewayRetriever<>(
-				rpcService,
-				ResourceManagerGateway.class,
-				ResourceManagerId::fromUuid,
-				10,
-				Time.milliseconds(50L));
-
-			// TODO: Remove once we have ported the MetricFetcher to the RpcEndpoint
-			final ActorSystem actorSystem = ((AkkaRpcService) rpcService).getActorSystem();
-			final Time timeout = Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT));
-
-			webMonitorEndpoint = createRestEndpoint(
-				configuration,
-				dispatcherGatewayRetriever,
-				resourceManagerGatewayRetriever,
-				transientBlobCache,
-				rpcService.getExecutor(),
-				new AkkaQueryServiceRetriever(actorSystem, timeout),
-				highAvailabilityServices.getWebMonitorLeaderElectionService());
-
-			LOG.debug("Starting Dispatcher REST endpoint.");
-			webMonitorEndpoint.start();
-
-			resourceManager = createResourceManager(
-				configuration,
-				ResourceID.generate(),
-				rpcService,
-				highAvailabilityServices,
-				heartbeatServices,
-				metricRegistry,
-				this,
-				clusterInformation,
-				webMonitorEndpoint.getRestBaseUrl());
-
-			jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(
-				metricRegistry,
-				rpcService.getAddress(),
-				ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration));
-
-			final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, webMonitorEndpoint);
-
-			dispatcher = createDispatcher(
-				configuration,
-				rpcService,
-				highAvailabilityServices,
-				resourceManager.getSelfGateway(ResourceManagerGateway.class),
-				blobServer,
-				heartbeatServices,
-				jobManagerMetricGroup,
-				metricRegistry.getMetricQueryServicePath(),
-				archivedExecutionGraphStore,
-				this,
-				webMonitorEndpoint.getRestBaseUrl(),
-				historyServerArchivist);
-
-			LOG.debug("Starting ResourceManager.");
-			resourceManager.start();
-			resourceManagerRetrievalService.start(resourceManagerGatewayRetriever);
-
-			LOG.debug("Starting Dispatcher.");
-			dispatcher.start();
-			dispatcherLeaderRetrievalService.start(dispatcherGatewayRetriever);
+					commonRpcService.getAddress(),
+					blobServer.getPort()));
 		}
 	}
 
@@ -477,63 +360,6 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 		}
 	}
 
-	protected CompletableFuture<Void> stopClusterComponents() {
-		synchronized (lock) {
-
-			Exception exception = null;
-
-			final Collection<CompletableFuture<Void>> terminationFutures = new ArrayList<>(4);
-
-			if (dispatcherLeaderRetrievalService != null) {
-				try {
-					dispatcherLeaderRetrievalService.stop();
-				} catch (Exception e) {
-					exception = ExceptionUtils.firstOrSuppressed(e, exception);
-				}
-			}
-
-			if (resourceManagerRetrievalService != null) {
-				try {
-					resourceManagerRetrievalService.stop();
-				} catch (Exception e) {
-					exception = ExceptionUtils.firstOrSuppressed(e, exception);
-				}
-			}
-
-			if (webMonitorEndpoint != null) {
-				terminationFutures.add(webMonitorEndpoint.closeAsync());
-			}
-
-			if (dispatcher != null) {
-				dispatcher.shutDown();
-				terminationFutures.add(dispatcher.getTerminationFuture());
-			}
-
-			if (resourceManager != null) {
-				resourceManager.shutDown();
-				terminationFutures.add(resourceManager.getTerminationFuture());
-			}
-
-			if (exception != null) {
-				terminationFutures.add(FutureUtils.completedExceptionally(exception));
-			}
-
-			final CompletableFuture<Void> componentTerminationFuture = FutureUtils.completeAll(terminationFutures);
-
-			if (jobManagerMetricGroup != null) {
-				return FutureUtils.runAfterwards(
-					componentTerminationFuture,
-					() -> {
-						synchronized (lock) {
-							jobManagerMetricGroup.close();
-						}
-					});
-			} else {
-				return componentTerminationFuture;
-			}
-		}
-	}
-
 	@Override
 	public void onFatalError(Throwable exception) {
 		LOG.error("Fatal error occurred in the cluster entrypoint.", exception);
@@ -563,14 +389,10 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 		if (isShutDown.compareAndSet(false, true)) {
 			LOG.info("Stopping {}.", getClass().getSimpleName());
 
-			final CompletableFuture<Void> shutDownApplicationFuture = deregisterApplication(applicationStatus, diagnostics);
-
-			final CompletableFuture<Void> componentShutdownFuture = FutureUtils.composeAfterwards(
-				shutDownApplicationFuture,
-				this::stopClusterComponents);
+			final CompletableFuture<Void> shutDownApplicationFuture = closeClusterComponent(applicationStatus, diagnostics);
 
 			final CompletableFuture<Void> serviceShutdownFuture = FutureUtils.composeAfterwards(
-				componentShutdownFuture,
+				shutDownApplicationFuture,
 				() -> stopClusterServices(cleanupHaData));
 
 			final CompletableFuture<Void> cleanupDirectoriesFuture = FutureUtils.runAfterwards(
@@ -597,10 +419,11 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 		boolean cleanupHaData) {
 
 		if (isTerminating.compareAndSet(false, true)) {
-			LOG.info("Shut down and terminate {} with return code {} and application status {}.",
+			LOG.info("Shut down and terminate {} with return code {} and application status {}. Diagnostics {}.",
 				getClass().getSimpleName(),
 				returnCode,
-				applicationStatus);
+				applicationStatus,
+				diagnostics);
 
 			shutDownAsync(
 				cleanupHaData,
@@ -628,11 +451,12 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 	 * @param diagnostics additional information about the shut down, can be {@code null}
 	 * @return Future which is completed once the shut down
 	 */
-	private CompletableFuture<Void> deregisterApplication(ApplicationStatus applicationStatus, @Nullable String diagnostics) {
+	private CompletableFuture<Void> closeClusterComponent(ApplicationStatus applicationStatus, @Nullable String diagnostics) {
 		synchronized (lock) {
-			if (resourceManager != null) {
-				final ResourceManagerGateway selfGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class);
-				return selfGateway.deregisterApplication(applicationStatus, diagnostics).thenApply(ack -> null);
+			if (dispatcherComponent != null) {
+				final CompletableFuture<Void> deregisterApplicationFuture = dispatcherComponent.deregisterApplication(applicationStatus, diagnostics);
+
+				return FutureUtils.runAfterwards(deregisterApplicationFuture, dispatcherComponent::closeAsync);
 			} else {
 				return CompletableFuture.completedFuture(null);
 			}
@@ -656,39 +480,7 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 	// Abstract methods
 	// --------------------------------------------------
 
-	protected abstract Dispatcher createDispatcher(
-		Configuration configuration,
-		RpcService rpcService,
-		HighAvailabilityServices highAvailabilityServices,
-		ResourceManagerGateway resourceManagerGateway,
-		BlobServer blobServer,
-		HeartbeatServices heartbeatServices,
-		JobManagerMetricGroup jobManagerMetricGroup,
-		@Nullable String metricQueryServicePath,
-		ArchivedExecutionGraphStore archivedExecutionGraphStore,
-		FatalErrorHandler fatalErrorHandler,
-		@Nullable String restAddress,
-		HistoryServerArchivist historyServerArchivist) throws Exception;
-
-	protected abstract ResourceManager<?> createResourceManager(
-		Configuration configuration,
-		ResourceID resourceId,
-		RpcService rpcService,
-		HighAvailabilityServices highAvailabilityServices,
-		HeartbeatServices heartbeatServices,
-		MetricRegistry metricRegistry,
-		FatalErrorHandler fatalErrorHandler,
-		ClusterInformation clusterInformation,
-		@Nullable String webInterfaceUrl) throws Exception;
-
-	protected abstract WebMonitorEndpoint<?> createRestEndpoint(
-		Configuration configuration,
-		LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever,
-		LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
-		TransientBlobService transientBlobService,
-		Executor executor,
-		MetricQueryServiceRetriever metricQueryServiceRetriever,
-		LeaderElectionService leaderElectionService) throws Exception;
+	protected abstract ClusterComponent<?> createDispatcherComponent(Configuration configuration);
 
 	protected abstract ArchivedExecutionGraphStore createSerializableExecutionGraphStore(
 		Configuration configuration,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/FileJobGraphRetriever.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/FileJobGraphRetriever.java
new file mode 100644
index 0000000..7a194f6
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/FileJobGraphRetriever.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.util.FlinkException;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+
+/**
+ * {@link JobGraphRetriever} implementation which retrieves the {@link JobGraph} from
+ * a file on disk.
+ */
+public class FileJobGraphRetriever implements JobGraphRetriever {
+
+	public static final ConfigOption<String> JOB_GRAPH_FILE_PATH = ConfigOptions
+		.key("internal.jobgraph-path")
+		.defaultValue("job.graph");
+
+	@Nonnull
+	private final String jobGraphFile;
+
+	public FileJobGraphRetriever(@Nonnull String jobGraphFile) {
+		this.jobGraphFile = jobGraphFile;
+	}
+
+	@Override
+	public JobGraph retrieveJobGraph(Configuration configuration) throws FlinkException {
+		File fp = new File(jobGraphFile);
+
+		try (FileInputStream input = new FileInputStream(fp);
+			ObjectInputStream obInput = new ObjectInputStream(input)) {
+
+			return (JobGraph) obInput.readObject();
+		} catch (FileNotFoundException e) {
+			throw new FlinkException("Could not find the JobGraph file.", e);
+		} catch (ClassNotFoundException | IOException e) {
+			throw new FlinkException("Could not load the JobGraph from file.", e);
+		}
+	}
+
+	public static FileJobGraphRetriever createFrom(Configuration configuration) {
+		return new FileJobGraphRetriever(configuration.getString(JOB_GRAPH_FILE_PATH));
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterComponent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterComponent.java
new file mode 100644
index 0000000..17583ac
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterComponent.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint;
+
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.dispatcher.JobDispatcherFactory;
+import org.apache.flink.runtime.dispatcher.MiniDispatcher;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
+import org.apache.flink.runtime.rest.JobRestEndpointFactory;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * {@link ClusterComponent} for a job cluster. The dispatcher component starts
+ * a {@link MiniDispatcher}.
+ */
+public class JobClusterComponent extends ClusterComponent<MiniDispatcher> {
+
+	public JobClusterComponent(ResourceManagerFactory<?> resourceManagerFactory, JobGraphRetriever jobActions) {
+		super(new JobDispatcherFactory(jobActions), resourceManagerFactory, JobRestEndpointFactory.INSTANCE);
+	}
+
+	@Override
+	protected void registerShutDownFuture(MiniDispatcher dispatcher, CompletableFuture<ApplicationStatus> shutDownFuture) {
+		super.registerShutDownFuture(dispatcher, shutDownFuture);
+
+		dispatcher.getJobTerminationFuture().whenComplete((applicationStatus, throwable) -> {
+			if (throwable != null) {
+				shutDownFuture.completeExceptionally(throwable);
+			} else {
+				shutDownFuture.complete(applicationStatus);
+			}
+		});
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
index 80a9da2..0426df9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
@@ -19,35 +19,9 @@
 package org.apache.flink.runtime.entrypoint;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.blob.BlobServer;
-import org.apache.flink.runtime.blob.TransientBlobService;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
-import org.apache.flink.runtime.dispatcher.Dispatcher;
-import org.apache.flink.runtime.dispatcher.DispatcherGateway;
-import org.apache.flink.runtime.dispatcher.HistoryServerArchivist;
 import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore;
-import org.apache.flink.runtime.dispatcher.MiniDispatcher;
-import org.apache.flink.runtime.heartbeat.HeartbeatServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint;
-import org.apache.flink.runtime.leaderelection.LeaderElectionService;
-import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
-import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
-import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
-import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
-import org.apache.flink.util.FlinkException;
-
-import javax.annotation.Nullable;
-
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
 
 /**
  * Base class for per-job cluster entry points.
@@ -59,81 +33,9 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
 	}
 
 	@Override
-	protected MiniDispatcherRestEndpoint createRestEndpoint(
-			Configuration configuration,
-			LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever,
-			LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
-			TransientBlobService transientBlobService,
-			Executor executor,
-			MetricQueryServiceRetriever metricQueryServiceRetriever,
-			LeaderElectionService leaderElectionService) throws Exception {
-		final RestHandlerConfiguration restHandlerConfiguration = RestHandlerConfiguration.fromConfiguration(configuration);
-
-		return new MiniDispatcherRestEndpoint(
-			RestServerEndpointConfiguration.fromConfiguration(configuration),
-			dispatcherGatewayRetriever,
-			configuration,
-			restHandlerConfiguration,
-			resourceManagerGatewayRetriever,
-			transientBlobService,
-			executor,
-			metricQueryServiceRetriever,
-			leaderElectionService,
-			this);
-	}
-
-	@Override
 	protected ArchivedExecutionGraphStore createSerializableExecutionGraphStore(
 			Configuration configuration,
 			ScheduledExecutor scheduledExecutor) {
 		return new MemoryArchivedExecutionGraphStore();
 	}
-
-	@Override
-	protected Dispatcher createDispatcher(
-			Configuration configuration,
-			RpcService rpcService,
-			HighAvailabilityServices highAvailabilityServices,
-			ResourceManagerGateway resourceManagerGateway,
-			BlobServer blobServer,
-			HeartbeatServices heartbeatServices,
-			JobManagerMetricGroup jobManagerMetricGroup,
-			@Nullable String metricQueryServicePath,
-			ArchivedExecutionGraphStore archivedExecutionGraphStore,
-			FatalErrorHandler fatalErrorHandler,
-			@Nullable String restAddress,
-			HistoryServerArchivist historyServerArchivist) throws Exception {
-
-		final JobGraph jobGraph = retrieveJobGraph(configuration);
-
-		final String executionModeValue = configuration.getString(EXECUTION_MODE);
-
-		final ExecutionMode executionMode = ExecutionMode.valueOf(executionModeValue);
-
-		final MiniDispatcher dispatcher = new MiniDispatcher(
-			rpcService,
-			Dispatcher.DISPATCHER_NAME,
-			configuration,
-			highAvailabilityServices,
-			resourceManagerGateway,
-			blobServer,
-			heartbeatServices,
-			jobManagerMetricGroup,
-			metricQueryServicePath,
-			archivedExecutionGraphStore,
-			Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE,
-			fatalErrorHandler,
-			restAddress,
-			historyServerArchivist,
-			jobGraph,
-			executionMode);
-
-		registerShutdownActions(dispatcher.getJobTerminationFuture());
-
-		return dispatcher;
-	}
-
-	protected abstract JobGraph retrieveJobGraph(Configuration configuration) throws FlinkException;
-
-	protected abstract void registerShutdownActions(CompletableFuture<ApplicationStatus> terminationFuture);
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobGraphRetriever.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobGraphRetriever.java
new file mode 100644
index 0000000..e2ace15
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobGraphRetriever.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.util.FlinkException;
+
+/**
+ * Interface which allows to retrieve the {@link JobGraph}.
+ */
+public interface JobGraphRetriever {
+
+	/**
+	 * Retrieve the {@link JobGraph}.
+	 *
+	 * @param configuration cluster configuration
+	 * @return the retrieved {@link JobGraph}.
+	 * @throws FlinkException if the {@link JobGraph} could not be retrieved
+	 */
+	JobGraph retrieveJobGraph(Configuration configuration) throws FlinkException;
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterComponent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterComponent.java
new file mode 100644
index 0000000..8ab0701
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterComponent.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint;
+
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.runtime.dispatcher.SessionDispatcherFactory;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
+import org.apache.flink.runtime.rest.SessionRestEndpointFactory;
+
+/**
+ * {@link ClusterComponent} used by session clusters.
+ */
+public class SessionClusterComponent extends ClusterComponent<Dispatcher> {
+
+	public SessionClusterComponent(ResourceManagerFactory<?> resourceManagerFactory) {
+		super(SessionDispatcherFactory.INSTANCE, resourceManagerFactory, SessionRestEndpointFactory.INSTANCE);
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
index 40eb8b7..1fb693c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
@@ -22,35 +22,14 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.runtime.blob.BlobServer;
-import org.apache.flink.runtime.blob.TransientBlobService;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
-import org.apache.flink.runtime.dispatcher.Dispatcher;
-import org.apache.flink.runtime.dispatcher.DispatcherGateway;
-import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
 import org.apache.flink.runtime.dispatcher.FileArchivedExecutionGraphStore;
-import org.apache.flink.runtime.dispatcher.HistoryServerArchivist;
-import org.apache.flink.runtime.dispatcher.StandaloneDispatcher;
-import org.apache.flink.runtime.heartbeat.HeartbeatServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.leaderelection.LeaderElectionService;
-import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
-import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
-import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
-import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 
 import org.apache.flink.shaded.guava18.com.google.common.base.Ticker;
 
-import javax.annotation.Nullable;
-
 import java.io.File;
 import java.io.IOException;
-import java.util.concurrent.Executor;
 
 /**
  * Base class for session cluster entry points.
@@ -77,62 +56,4 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
 			scheduledExecutor,
 			Ticker.systemTicker());
 	}
-
-	@Override
-	protected DispatcherRestEndpoint createRestEndpoint(
-			Configuration configuration,
-			LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever,
-			LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
-			TransientBlobService transientBlobService,
-			Executor executor,
-			MetricQueryServiceRetriever metricQueryServiceRetriever,
-			LeaderElectionService leaderElectionService) throws Exception {
-
-		final RestHandlerConfiguration restHandlerConfiguration = RestHandlerConfiguration.fromConfiguration(configuration);
-
-		return new DispatcherRestEndpoint(
-			RestServerEndpointConfiguration.fromConfiguration(configuration),
-			dispatcherGatewayRetriever,
-			configuration,
-			restHandlerConfiguration,
-			resourceManagerGatewayRetriever,
-			transientBlobService,
-			executor,
-			metricQueryServiceRetriever,
-			leaderElectionService,
-			this);
-	}
-
-	@Override
-	protected Dispatcher createDispatcher(
-			Configuration configuration,
-			RpcService rpcService,
-			HighAvailabilityServices highAvailabilityServices,
-			ResourceManagerGateway resourceManagerGateway,
-			BlobServer blobServer,
-			HeartbeatServices heartbeatServices,
-			JobManagerMetricGroup jobManagerMetricGroup,
-			@Nullable String metricQueryServicePath,
-			ArchivedExecutionGraphStore archivedExecutionGraphStore,
-			FatalErrorHandler fatalErrorHandler,
-			@Nullable String restAddress,
-			HistoryServerArchivist historyServerArchivist) throws Exception {
-
-		// create the default dispatcher
-		return new StandaloneDispatcher(
-			rpcService,
-			Dispatcher.DISPATCHER_NAME,
-			configuration,
-			highAvailabilityServices,
-			resourceManagerGateway,
-			blobServer,
-			heartbeatServices,
-			jobManagerMetricGroup,
-			metricQueryServicePath,
-			archivedExecutionGraphStore,
-			Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE,
-			fatalErrorHandler,
-			restAddress,
-			historyServerArchivist);
-	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java
index edff87b..e92248c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java
@@ -19,24 +19,12 @@
 package org.apache.flink.runtime.entrypoint;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
-import org.apache.flink.runtime.heartbeat.HeartbeatServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.resourcemanager.ResourceManager;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
-import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.SignalHandler;
 
-import javax.annotation.Nullable;
-
 /**
  * Entry point for the standalone session cluster.
  */
@@ -47,33 +35,8 @@ public class StandaloneSessionClusterEntrypoint extends SessionClusterEntrypoint
 	}
 
 	@Override
-	protected ResourceManager<?> createResourceManager(
-			Configuration configuration,
-			ResourceID resourceId,
-			RpcService rpcService,
-			HighAvailabilityServices highAvailabilityServices,
-			HeartbeatServices heartbeatServices,
-			MetricRegistry metricRegistry,
-			FatalErrorHandler fatalErrorHandler,
-			ClusterInformation clusterInformation,
-			@Nullable String webInterfaceUrl) throws Exception {
-		final ResourceManagerRuntimeServicesConfiguration resourceManagerRuntimeServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
-		final ResourceManagerRuntimeServices resourceManagerRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
-			resourceManagerRuntimeServicesConfiguration,
-			highAvailabilityServices,
-			rpcService.getScheduledExecutor());
-
-		return new StandaloneResourceManager(
-			rpcService,
-			FlinkResourceManager.RESOURCE_MANAGER_NAME,
-			resourceId,
-			highAvailabilityServices,
-			heartbeatServices,
-			resourceManagerRuntimeServices.getSlotManager(),
-			metricRegistry,
-			resourceManagerRuntimeServices.getJobLeaderIdService(),
-			clusterInformation,
-			fatalErrorHandler);
+	protected ClusterComponent<?> createDispatcherComponent(Configuration configuration) {
+		return new SessionClusterComponent(StandaloneResourceManagerFactory.INSTANCE);
 	}
 
 	public static void main(String[] args) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerFactory.java
new file mode 100644
index 0000000..91a7b26
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+
+import javax.annotation.Nullable;
+
+/**
+ * {@link ResourceManager} factory.
+ *
+ * @param <T> type of the workers of the ResourceManager
+ */
+public interface ResourceManagerFactory<T extends ResourceIDRetrievable> {
+
+	ResourceManager<T> createResourceManager(
+		Configuration configuration,
+		ResourceID resourceId,
+		RpcService rpcService,
+		HighAvailabilityServices highAvailabilityServices,
+		HeartbeatServices heartbeatServices,
+		MetricRegistry metricRegistry,
+		FatalErrorHandler fatalErrorHandler,
+		ClusterInformation clusterInformation,
+		@Nullable String webInterfaceUrl) throws Exception;
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactory.java
similarity index 56%
copy from flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java
copy to flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactory.java
index edff87b..c8e314f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactory.java
@@ -16,38 +16,28 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.entrypoint;
+package org.apache.flink.runtime.resourcemanager;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.resourcemanager.ResourceManager;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
-import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.util.EnvironmentInformation;
-import org.apache.flink.runtime.util.JvmShutdownSafeguard;
-import org.apache.flink.runtime.util.SignalHandler;
 
 import javax.annotation.Nullable;
 
 /**
- * Entry point for the standalone session cluster.
+ * {@link ResourceManagerFactory} which creates a {@link StandaloneResourceManager}.
  */
-public class StandaloneSessionClusterEntrypoint extends SessionClusterEntrypoint {
-
-	public StandaloneSessionClusterEntrypoint(Configuration configuration) {
-		super(configuration);
-	}
+public enum StandaloneResourceManagerFactory implements ResourceManagerFactory<ResourceID> {
+	INSTANCE;
 
 	@Override
-	protected ResourceManager<?> createResourceManager(
+	public ResourceManager<ResourceID> createResourceManager(
 			Configuration configuration,
 			ResourceID resourceId,
 			RpcService rpcService,
@@ -75,28 +65,4 @@ public class StandaloneSessionClusterEntrypoint extends SessionClusterEntrypoint
 			clusterInformation,
 			fatalErrorHandler);
 	}
-
-	public static void main(String[] args) {
-		// startup checks and logging
-		EnvironmentInformation.logEnvironmentInfo(LOG, StandaloneSessionClusterEntrypoint.class.getSimpleName(), args);
-		SignalHandler.register(LOG);
-		JvmShutdownSafeguard.installAsShutdownHook(LOG);
-
-		EntrypointClusterConfiguration entrypointClusterConfiguration = null;
-		final CommandLineParser<EntrypointClusterConfiguration> commandLineParser = new CommandLineParser<>(new EntrypointClusterConfigurationParserFactory());
-
-		try {
-			entrypointClusterConfiguration = commandLineParser.parse(args);
-		} catch (FlinkParseException e) {
-			LOG.error("Could not parse command line arguments {}.", args, e);
-			commandLineParser.printHelp(StandaloneSessionClusterEntrypoint.class.getSimpleName());
-			System.exit(1);
-		}
-
-		Configuration configuration = loadConfiguration(entrypointClusterConfiguration);
-
-		StandaloneSessionClusterEntrypoint entrypoint = new StandaloneSessionClusterEntrypoint(configuration);
-
-		entrypoint.startCluster();
-	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/JobRestEndpointFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/JobRestEndpointFactory.java
new file mode 100644
index 0000000..da4b063
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/JobRestEndpointFactory.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.TransientBlobService;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
+import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
+
+import java.util.concurrent.Executor;
+
+/**
+ * {@link RestEndpointFactory} which creates a {@link MiniDispatcherRestEndpoint}.
+ */
+public enum JobRestEndpointFactory implements RestEndpointFactory<RestfulGateway> {
+	INSTANCE;
+
+	@Override
+	public WebMonitorEndpoint<RestfulGateway> createRestEndpoint(
+			Configuration configuration,
+			LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever,
+			LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
+			TransientBlobService transientBlobService,
+			Executor executor,
+			MetricQueryServiceRetriever metricQueryServiceRetriever,
+			LeaderElectionService leaderElectionService,
+			FatalErrorHandler fatalErrorHandler) throws Exception {
+		final RestHandlerConfiguration restHandlerConfiguration = RestHandlerConfiguration.fromConfiguration(configuration);
+
+		return new MiniDispatcherRestEndpoint(
+			RestServerEndpointConfiguration.fromConfiguration(configuration),
+			dispatcherGatewayRetriever,
+			configuration,
+			restHandlerConfiguration,
+			resourceManagerGatewayRetriever,
+			transientBlobService,
+			executor,
+			metricQueryServiceRetriever,
+			leaderElectionService,
+			fatalErrorHandler);
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java
new file mode 100644
index 0000000..ffdc0cb
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestEndpointFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.TransientBlobService;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
+import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
+
+import java.util.concurrent.Executor;
+
+/**
+ * {@link WebMonitorEndpoint} factory.
+ *
+ * @param <T> type of the {@link RestfulGateway}
+ */
+public interface RestEndpointFactory<T extends RestfulGateway> {
+
+	WebMonitorEndpoint<T> createRestEndpoint(
+		Configuration configuration,
+		LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever,
+		LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
+		TransientBlobService transientBlobService,
+		Executor executor,
+		MetricQueryServiceRetriever metricQueryServiceRetriever,
+		LeaderElectionService leaderElectionService,
+		FatalErrorHandler fatalErrorHandler) throws Exception;
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/SessionRestEndpointFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/SessionRestEndpointFactory.java
new file mode 100644
index 0000000..359efbf
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/SessionRestEndpointFactory.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.TransientBlobService;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
+import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
+
+import java.util.concurrent.Executor;
+
+/**
+ * {@link RestEndpointFactory} which creates a {@link DispatcherRestEndpoint}.
+ */
+public enum SessionRestEndpointFactory implements RestEndpointFactory<DispatcherGateway> {
+	INSTANCE;
+
+	@Override
+	public WebMonitorEndpoint<DispatcherGateway> createRestEndpoint(
+			Configuration configuration,
+			LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever,
+			LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
+			TransientBlobService transientBlobService,
+			Executor executor,
+			MetricQueryServiceRetriever metricQueryServiceRetriever,
+			LeaderElectionService leaderElectionService,
+			FatalErrorHandler fatalErrorHandler) throws Exception {
+		final RestHandlerConfiguration restHandlerConfiguration = RestHandlerConfiguration.fromConfiguration(configuration);
+
+		return new DispatcherRestEndpoint(
+			RestServerEndpointConfiguration.fromConfiguration(configuration),
+			dispatcherGatewayRetriever,
+			configuration,
+			restHandlerConfiguration,
+			resourceManagerGatewayRetriever,
+			transientBlobService,
+			executor,
+			metricQueryServiceRetriever,
+			leaderElectionService,
+			fatalErrorHandler);
+	}
+}
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
index 482e2b7..a52975a 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
@@ -19,48 +19,27 @@
 package org.apache.flink.yarn.entrypoint;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.entrypoint.ClusterComponent;
+import org.apache.flink.runtime.entrypoint.FileJobGraphRetriever;
+import org.apache.flink.runtime.entrypoint.JobClusterComponent;
 import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint;
-import org.apache.flink.runtime.heartbeat.HeartbeatServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.resourcemanager.ResourceManager;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.SignalHandler;
-import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
-import org.apache.flink.yarn.YarnResourceManager;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 
-import javax.annotation.Nullable;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.io.ObjectInputStream;
 import java.util.Map;
-import java.util.concurrent.CompletableFuture;
 
 /**
  * Entry point for Yarn per-job clusters.
  */
 public class YarnJobClusterEntrypoint extends JobClusterEntrypoint {
 
-	/** The job graph file path. */
-	public static final String JOB_GRAPH_FILE_PATH = "flink.jobgraph.path";
-
 	private final String workingDirectory;
 
 	public YarnJobClusterEntrypoint(
@@ -82,58 +61,10 @@ public class YarnJobClusterEntrypoint extends JobClusterEntrypoint {
 	}
 
 	@Override
-	protected ResourceManager<?> createResourceManager(
-			Configuration configuration,
-			ResourceID resourceId,
-			RpcService rpcService,
-			HighAvailabilityServices highAvailabilityServices,
-			HeartbeatServices heartbeatServices,
-			MetricRegistry metricRegistry,
-			FatalErrorHandler fatalErrorHandler,
-			ClusterInformation clusterInformation,
-			@Nullable String webInterfaceUrl) throws Exception {
-		final ResourceManagerRuntimeServicesConfiguration rmServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
-		final ResourceManagerRuntimeServices rmRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
-			rmServicesConfiguration,
-			highAvailabilityServices,
-			rpcService.getScheduledExecutor());
-
-		return new YarnResourceManager(
-			rpcService,
-			ResourceManager.RESOURCE_MANAGER_NAME,
-			resourceId,
-			configuration,
-			System.getenv(),
-			highAvailabilityServices,
-			heartbeatServices,
-			rmRuntimeServices.getSlotManager(),
-			metricRegistry,
-			rmRuntimeServices.getJobLeaderIdService(),
-			clusterInformation,
-			fatalErrorHandler,
-			webInterfaceUrl);
-	}
-
-	@Override
-	protected JobGraph retrieveJobGraph(Configuration configuration) throws FlinkException {
-		String jobGraphFile = configuration.getString(JOB_GRAPH_FILE_PATH, "job.graph");
-		File fp = new File(jobGraphFile);
-
-		try (FileInputStream input = new FileInputStream(fp);
-			ObjectInputStream obInput = new ObjectInputStream(input)) {
-
-			return (JobGraph) obInput.readObject();
-		} catch (FileNotFoundException e) {
-			throw new FlinkException("Could not find the JobGraph file.", e);
-		} catch (ClassNotFoundException | IOException e) {
-			throw new FlinkException("Could not load the JobGraph from file.", e);
-		}
-	}
-
-	@Override
-	protected void registerShutdownActions(CompletableFuture<ApplicationStatus> terminationFuture) {
-		terminationFuture.thenAccept((status) ->
-			shutDownAndTerminate(status.processExitCode(), status, null, true));
+	protected ClusterComponent<?> createDispatcherComponent(Configuration configuration) {
+		return new JobClusterComponent(
+			YarnResourceManagerFactory.INSTANCE,
+			FileJobGraphRetriever.createFrom(configuration));
 	}
 
 	// ------------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnResourceManagerFactory.java
similarity index 54%
copy from flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
copy to flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnResourceManagerFactory.java
index 6c92861..bfd1b4a 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnResourceManagerFactory.java
@@ -21,56 +21,28 @@ package org.apache.flink.yarn.entrypoint;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.entrypoint.ClusterInformation;
-import org.apache.flink.runtime.entrypoint.SessionClusterEntrypoint;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.security.SecurityContext;
-import org.apache.flink.runtime.util.EnvironmentInformation;
-import org.apache.flink.runtime.util.JvmShutdownSafeguard;
-import org.apache.flink.runtime.util.SignalHandler;
-import org.apache.flink.util.Preconditions;
 import org.apache.flink.yarn.YarnResourceManager;
-import org.apache.flink.yarn.configuration.YarnConfigOptions;
-
-import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.flink.yarn.YarnWorkerNode;
 
 import javax.annotation.Nullable;
 
-import java.io.IOException;
-import java.util.Map;
-
 /**
- * Entry point for Yarn session clusters.
+ * {@link ResourceManagerFactory} implementation which creates a {@link YarnResourceManager}.
  */
-public class YarnSessionClusterEntrypoint extends SessionClusterEntrypoint {
-
-	private final String workingDirectory;
-
-	public YarnSessionClusterEntrypoint(
-			Configuration configuration,
-			String workingDirectory) {
-		super(configuration);
-		this.workingDirectory = Preconditions.checkNotNull(workingDirectory);
-	}
+public enum YarnResourceManagerFactory implements ResourceManagerFactory<YarnWorkerNode> {
+	INSTANCE;
 
 	@Override
-	protected SecurityContext installSecurityContext(Configuration configuration) throws Exception {
-		return YarnEntrypointUtils.installSecurityContext(configuration, workingDirectory);
-	}
-
-	@Override
-	protected String getRPCPortRange(Configuration configuration) {
-		return configuration.getString(YarnConfigOptions.APPLICATION_MASTER_PORT);
-	}
-
-	@Override
-	protected ResourceManager<?> createResourceManager(
+	public ResourceManager<YarnWorkerNode> createResourceManager(
 			Configuration configuration,
 			ResourceID resourceId,
 			RpcService rpcService,
@@ -101,33 +73,4 @@ public class YarnSessionClusterEntrypoint extends SessionClusterEntrypoint {
 			fatalErrorHandler,
 			webInterfaceUrl);
 	}
-
-	public static void main(String[] args) {
-		// startup checks and logging
-		EnvironmentInformation.logEnvironmentInfo(LOG, YarnSessionClusterEntrypoint.class.getSimpleName(), args);
-		SignalHandler.register(LOG);
-		JvmShutdownSafeguard.installAsShutdownHook(LOG);
-
-		Map<String, String> env = System.getenv();
-
-		final String workingDirectory = env.get(ApplicationConstants.Environment.PWD.key());
-		Preconditions.checkArgument(
-			workingDirectory != null,
-			"Working directory variable (%s) not set",
-			ApplicationConstants.Environment.PWD.key());
-
-		try {
-			YarnEntrypointUtils.logYarnEnvironmentInformation(env, LOG);
-		} catch (IOException e) {
-			LOG.warn("Could not log YARN environment information.", e);
-		}
-
-		Configuration configuration = YarnEntrypointUtils.loadConfiguration(workingDirectory, env, LOG);
-
-		YarnSessionClusterEntrypoint yarnSessionClusterEntrypoint = new YarnSessionClusterEntrypoint(
-			configuration,
-			workingDirectory);
-
-		yarnSessionClusterEntrypoint.startCluster();
-	}
 }
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
index 6c92861..116e2ff 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
@@ -19,29 +19,18 @@
 package org.apache.flink.yarn.entrypoint;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.entrypoint.ClusterComponent;
 import org.apache.flink.runtime.entrypoint.SessionClusterEntrypoint;
-import org.apache.flink.runtime.heartbeat.HeartbeatServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.resourcemanager.ResourceManager;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.entrypoint.SessionClusterComponent;
 import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.SignalHandler;
 import org.apache.flink.util.Preconditions;
-import org.apache.flink.yarn.YarnResourceManager;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 
-import javax.annotation.Nullable;
-
 import java.io.IOException;
 import java.util.Map;
 
@@ -70,36 +59,8 @@ public class YarnSessionClusterEntrypoint extends SessionClusterEntrypoint {
 	}
 
 	@Override
-	protected ResourceManager<?> createResourceManager(
-			Configuration configuration,
-			ResourceID resourceId,
-			RpcService rpcService,
-			HighAvailabilityServices highAvailabilityServices,
-			HeartbeatServices heartbeatServices,
-			MetricRegistry metricRegistry,
-			FatalErrorHandler fatalErrorHandler,
-			ClusterInformation clusterInformation,
-			@Nullable String webInterfaceUrl) throws Exception {
-		final ResourceManagerRuntimeServicesConfiguration rmServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
-		final ResourceManagerRuntimeServices rmRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration(
-			rmServicesConfiguration,
-			highAvailabilityServices,
-			rpcService.getScheduledExecutor());
-
-		return new YarnResourceManager(
-			rpcService,
-			ResourceManager.RESOURCE_MANAGER_NAME,
-			resourceId,
-			configuration,
-			System.getenv(),
-			highAvailabilityServices,
-			heartbeatServices,
-			rmRuntimeServices.getSlotManager(),
-			metricRegistry,
-			rmRuntimeServices.getJobLeaderIdService(),
-			clusterInformation,
-			fatalErrorHandler,
-			webInterfaceUrl);
+	protected ClusterComponent<?> createDispatcherComponent(Configuration configuration) {
+		return new SessionClusterComponent(YarnResourceManagerFactory.INSTANCE);
 	}
 
 	public static void main(String[] args) {


[flink] 03/04: [FLINK-10411] Rename ClusterComponent into DispatcherResourceManagerComponent

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 54c1b19181408ab280fd3d41c2d83a557e27a519
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Thu Sep 27 14:16:56 2018 +0200

    [FLINK-10411] Rename ClusterComponent into DispatcherResourceManagerComponent
---
 .../{ClusterComponent.java => DispatcherResourceManagerComponent.java}    | 0
 ...obClusterComponent.java => JobDispatcherResourceManagerComponent.java} | 0
 ...usterComponent.java => SessionDispatcherResourceManagerComponent.java} | 0
 3 files changed, 0 insertions(+), 0 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterComponent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/DispatcherResourceManagerComponent.java
similarity index 100%
rename from flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterComponent.java
rename to flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/DispatcherResourceManagerComponent.java
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterComponent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobDispatcherResourceManagerComponent.java
similarity index 100%
rename from flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterComponent.java
rename to flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobDispatcherResourceManagerComponent.java
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterComponent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionDispatcherResourceManagerComponent.java
similarity index 100%
rename from flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterComponent.java
rename to flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionDispatcherResourceManagerComponent.java


[flink] 02/04: [FLINK-10411] Move System.exit out of ClusterEntrypoint

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b9ebf421c8979d99f2ae960075b76796ba0b6ac4
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Mon Sep 24 09:27:34 2018 +0200

    [FLINK-10411] Move System.exit out of ClusterEntrypoint
    
    Move the logic of when to exit the JVM process out of the ClusterEntrypoint
    so that the caller is now responsible to make this call. This improves the
    usage of the ClusterEntrypoint for testing purposes.
---
 .../entrypoint/StandaloneJobClusterEntryPoint.java |   4 +-
 .../entrypoint/MesosJobClusterEntrypoint.java      |   5 +-
 .../entrypoint/MesosSessionClusterEntrypoint.java  |   5 +-
 .../runtime/entrypoint/ClusterEntrypoint.java      | 128 +++++++++++----------
 .../entrypoint/ClusterEntrypointException.java     |  40 +++++++
 .../StandaloneSessionClusterEntrypoint.java        |   4 +-
 .../yarn/entrypoint/YarnJobClusterEntrypoint.java  |   5 +-
 .../entrypoint/YarnSessionClusterEntrypoint.java   |   7 +-
 8 files changed, 125 insertions(+), 73 deletions(-)

diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java
index cdd44b5..b81d992 100644
--- a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java
+++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java
@@ -61,7 +61,7 @@ public final class StandaloneJobClusterEntryPoint extends JobClusterEntrypoint {
 	}
 
 	@Override
-	protected ClusterComponent<?> createDispatcherComponent(Configuration configuration) {
+	protected ClusterComponent<?> createClusterComponent(Configuration configuration) {
 		return new JobClusterComponent(
 			StandaloneResourceManagerFactory.INSTANCE,
 			new ClassPathJobGraphRetriever(jobClassName, savepointRestoreSettings, programArguments));
@@ -94,6 +94,6 @@ public final class StandaloneJobClusterEntryPoint extends JobClusterEntrypoint {
 			clusterConfiguration.getSavepointRestoreSettings(),
 			clusterConfiguration.getArgs());
 
-		entrypoint.startCluster();
+		ClusterEntrypoint.runClusterEntrypoint(entrypoint);
 	}
 }
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
index ed2175a..922d5cd 100755
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.clusterframework.ContainerSpecification;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.entrypoint.ClusterComponent;
+import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
 import org.apache.flink.runtime.entrypoint.FileJobGraphRetriever;
 import org.apache.flink.runtime.entrypoint.JobClusterComponent;
 import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint;
@@ -108,7 +109,7 @@ public class MesosJobClusterEntrypoint extends JobClusterEntrypoint {
 	}
 
 	@Override
-	protected ClusterComponent<?> createDispatcherComponent(Configuration configuration) {
+	protected ClusterComponent<?> createClusterComponent(Configuration configuration) {
 		return new JobClusterComponent(
 			new MesosResourceManagerFactory(
 				mesosServices,
@@ -141,6 +142,6 @@ public class MesosJobClusterEntrypoint extends JobClusterEntrypoint {
 
 		MesosJobClusterEntrypoint clusterEntrypoint = new MesosJobClusterEntrypoint(configuration, dynamicProperties);
 
-		clusterEntrypoint.startCluster();
+		ClusterEntrypoint.runClusterEntrypoint(clusterEntrypoint);
 	}
 }
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
index 0cc3053..f691940 100755
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.clusterframework.ContainerSpecification;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.entrypoint.ClusterComponent;
+import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
 import org.apache.flink.runtime.entrypoint.SessionClusterComponent;
 import org.apache.flink.runtime.entrypoint.SessionClusterEntrypoint;
 import org.apache.flink.runtime.util.EnvironmentInformation;
@@ -107,7 +108,7 @@ public class MesosSessionClusterEntrypoint extends SessionClusterEntrypoint {
 	}
 
 	@Override
-	protected ClusterComponent<?> createDispatcherComponent(Configuration configuration) {
+	protected ClusterComponent<?> createClusterComponent(Configuration configuration) {
 		return new SessionClusterComponent(new MesosResourceManagerFactory(
 			mesosServices,
 			mesosConfig,
@@ -138,7 +139,7 @@ public class MesosSessionClusterEntrypoint extends SessionClusterEntrypoint {
 
 		MesosSessionClusterEntrypoint clusterEntrypoint = new MesosSessionClusterEntrypoint(configuration, dynamicProperties);
 
-		clusterEntrypoint.startCluster();
+		ClusterEntrypoint.runClusterEntrypoint(clusterEntrypoint);
 	}
 
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index 39e1265..54ccaec 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -73,7 +73,10 @@ import java.util.Collection;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import scala.concurrent.duration.FiniteDuration;
@@ -94,19 +97,19 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 	protected static final int STARTUP_FAILURE_RETURN_CODE = 1;
 	protected static final int RUNTIME_FAILURE_RETURN_CODE = 2;
 
+	private static final Time INITIALIZATION_SHUTDOWN_TIMEOUT = Time.seconds(30L);
+
 	/** The lock to guard startup / shutdown / manipulation methods. */
 	private final Object lock = new Object();
 
 	private final Configuration configuration;
 
-	private final CompletableFuture<Void> terminationFuture;
-
-	private final AtomicBoolean isTerminating = new AtomicBoolean(false);
+	private final CompletableFuture<ApplicationStatus> terminationFuture;
 
 	private final AtomicBoolean isShutDown = new AtomicBoolean(false);
 
 	@GuardedBy("lock")
-	private ClusterComponent<?> dispatcherComponent;
+	private ClusterComponent<?> clusterComponent;
 
 	@GuardedBy("lock")
 	private MetricRegistryImpl metricRegistry;
@@ -138,11 +141,11 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 		shutDownHook = ShutdownHookUtil.addShutdownHook(this::cleanupDirectories, getClass().getSimpleName(), LOG);
 	}
 
-	public CompletableFuture<Void> getTerminationFuture() {
+	public CompletableFuture<ApplicationStatus> getTerminationFuture() {
 		return terminationFuture;
 	}
 
-	protected void startCluster() {
+	protected void startCluster() throws ClusterEntrypointException {
 		LOG.info("Starting {}.", getClass().getSimpleName());
 
 		try {
@@ -157,17 +160,24 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 			});
 		} catch (Throwable t) {
 			final Throwable strippedThrowable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
-			LOG.error("Cluster initialization failed.", strippedThrowable);
 
-			shutDownAndTerminate(
-				STARTUP_FAILURE_RETURN_CODE,
-				ApplicationStatus.FAILED,
-				strippedThrowable.getMessage(),
-				false);
+			try {
+				// clean up any partial state
+				shutDownAsync(
+					ApplicationStatus.FAILED,
+					ExceptionUtils.stringifyException(strippedThrowable),
+					false).get(INITIALIZATION_SHUTDOWN_TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
+			} catch (InterruptedException | ExecutionException | TimeoutException e) {
+				strippedThrowable.addSuppressed(e);
+			}
+
+			throw new ClusterEntrypointException(
+				String.format("Failed to initialize the cluster entrypoint %s.", getClass().getSimpleName()),
+				strippedThrowable);
 		}
 	}
 
-	protected void configureFileSystems(Configuration configuration) throws Exception {
+	private void configureFileSystems(Configuration configuration) throws Exception {
 		LOG.info("Install default filesystem.");
 
 		try {
@@ -194,9 +204,9 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 			configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
 			configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());
 
-			dispatcherComponent = createDispatcherComponent(configuration);
+			clusterComponent = createClusterComponent(configuration);
 
-			dispatcherComponent.startComponent(
+			clusterComponent.startComponent(
 				configuration,
 				commonRpcService,
 				haServices,
@@ -206,19 +216,17 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 				archivedExecutionGraphStore,
 				this);
 
-			dispatcherComponent.getShutDownFuture().whenComplete(
+			clusterComponent.getShutDownFuture().whenComplete(
 				(ApplicationStatus applicationStatus, Throwable throwable) -> {
 					if (throwable != null) {
-						shutDownAndTerminate(
-							RUNTIME_FAILURE_RETURN_CODE,
+						shutDownAsync(
 							ApplicationStatus.UNKNOWN,
 							ExceptionUtils.stringifyException(throwable),
 							false);
 					} else {
 						// This is the general shutdown path. If a separate more specific shutdown was
 						// already triggered, this will do nothing
-						shutDownAndTerminate(
-							applicationStatus.processExitCode(),
+						shutDownAsync(
 							applicationStatus,
 							null,
 							true);
@@ -382,12 +390,15 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 		return resultConfiguration;
 	}
 
-	private CompletableFuture<Void> shutDownAsync(
-			boolean cleanupHaData,
+	private CompletableFuture<ApplicationStatus> shutDownAsync(
 			ApplicationStatus applicationStatus,
-			@Nullable String diagnostics) {
+			@Nullable String diagnostics,
+			boolean cleanupHaData) {
 		if (isShutDown.compareAndSet(false, true)) {
-			LOG.info("Stopping {}.", getClass().getSimpleName());
+			LOG.info("Shutting {} down with application status {}. Diagnostics {}.",
+				getClass().getSimpleName(),
+				applicationStatus,
+				diagnostics);
 
 			final CompletableFuture<Void> shutDownApplicationFuture = closeClusterComponent(applicationStatus, diagnostics);
 
@@ -404,7 +415,7 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 					if (serviceThrowable != null) {
 						terminationFuture.completeExceptionally(serviceThrowable);
 					} else {
-						terminationFuture.complete(null);
+						terminationFuture.complete(applicationStatus);
 					}
 				});
 		}
@@ -412,37 +423,6 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 		return terminationFuture;
 	}
 
-	protected void shutDownAndTerminate(
-		int returnCode,
-		ApplicationStatus applicationStatus,
-		@Nullable String diagnostics,
-		boolean cleanupHaData) {
-
-		if (isTerminating.compareAndSet(false, true)) {
-			LOG.info("Shut down and terminate {} with return code {} and application status {}. Diagnostics {}.",
-				getClass().getSimpleName(),
-				returnCode,
-				applicationStatus,
-				diagnostics);
-
-			shutDownAsync(
-				cleanupHaData,
-				applicationStatus,
-				diagnostics).whenComplete(
-				(Void ignored, Throwable t) -> {
-					if (t != null) {
-						LOG.info("Could not properly shut down cluster entrypoint.", t);
-					}
-
-					System.exit(returnCode);
-				});
-		} else {
-			LOG.debug("Concurrent termination call detected. Ignoring termination call with return code {} and application status {}.",
-				returnCode,
-				applicationStatus);
-		}
-	}
-
 	/**
 	 * Deregister the Flink application from the resource management system by signalling
 	 * the {@link ResourceManager}.
@@ -453,10 +433,10 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 	 */
 	private CompletableFuture<Void> closeClusterComponent(ApplicationStatus applicationStatus, @Nullable String diagnostics) {
 		synchronized (lock) {
-			if (dispatcherComponent != null) {
-				final CompletableFuture<Void> deregisterApplicationFuture = dispatcherComponent.deregisterApplication(applicationStatus, diagnostics);
+			if (clusterComponent != null) {
+				final CompletableFuture<Void> deregisterApplicationFuture = clusterComponent.deregisterApplication(applicationStatus, diagnostics);
 
-				return FutureUtils.runAfterwards(deregisterApplicationFuture, dispatcherComponent::closeAsync);
+				return FutureUtils.runAfterwards(deregisterApplicationFuture, clusterComponent::closeAsync);
 			} else {
 				return CompletableFuture.completedFuture(null);
 			}
@@ -480,7 +460,7 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 	// Abstract methods
 	// --------------------------------------------------
 
-	protected abstract ClusterComponent<?> createDispatcherComponent(Configuration configuration);
+	protected abstract ClusterComponent<?> createClusterComponent(Configuration configuration);
 
 	protected abstract ArchivedExecutionGraphStore createSerializableExecutionGraphStore(
 		Configuration configuration,
@@ -511,6 +491,34 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 		return configuration;
 	}
 
+	// --------------------------------------------------
+	// Helper methods
+	// --------------------------------------------------
+
+	public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) {
+
+		final String clusterEntrypointName = clusterEntrypoint.getClass().getSimpleName();
+		try {
+			clusterEntrypoint.startCluster();
+		} catch (ClusterEntrypointException e) {
+			LOG.error(String.format("Could not start cluster entrypoint %s.", clusterEntrypointName), e);
+			System.exit(STARTUP_FAILURE_RETURN_CODE);
+		}
+
+		clusterEntrypoint.getTerminationFuture().whenComplete((applicationStatus, throwable) -> {
+			final int returnCode;
+
+			if (throwable != null) {
+				returnCode = RUNTIME_FAILURE_RETURN_CODE;
+			} else {
+				returnCode = applicationStatus.processExitCode();
+			}
+
+			LOG.info("Terminating cluster entrypoint process {} with exit code {}.", clusterEntrypointName, returnCode, throwable);
+			System.exit(returnCode);
+		});
+	}
+
 	/**
 	 * Execution mode of the {@link MiniDispatcher}.
 	 */
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointException.java
new file mode 100644
index 0000000..21d37ee
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointException.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint;
+
+import org.apache.flink.util.FlinkException;
+
+/**
+ * Exceptions thrown by the {@link ClusterEntrypoint}.
+ */
+public class ClusterEntrypointException extends FlinkException {
+	private static final long serialVersionUID = -3855286807063809945L;
+
+	public ClusterEntrypointException(String message) {
+		super(message);
+	}
+
+	public ClusterEntrypointException(Throwable cause) {
+		super(cause);
+	}
+
+	public ClusterEntrypointException(String message, Throwable cause) {
+		super(message, cause);
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java
index e92248c..1675235 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java
@@ -35,7 +35,7 @@ public class StandaloneSessionClusterEntrypoint extends SessionClusterEntrypoint
 	}
 
 	@Override
-	protected ClusterComponent<?> createDispatcherComponent(Configuration configuration) {
+	protected ClusterComponent<?> createClusterComponent(Configuration configuration) {
 		return new SessionClusterComponent(StandaloneResourceManagerFactory.INSTANCE);
 	}
 
@@ -60,6 +60,6 @@ public class StandaloneSessionClusterEntrypoint extends SessionClusterEntrypoint
 
 		StandaloneSessionClusterEntrypoint entrypoint = new StandaloneSessionClusterEntrypoint(configuration);
 
-		entrypoint.startCluster();
+		ClusterEntrypoint.runClusterEntrypoint(entrypoint);
 	}
 }
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
index a52975a..1733f49 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
@@ -20,6 +20,7 @@ package org.apache.flink.yarn.entrypoint;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.entrypoint.ClusterComponent;
+import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
 import org.apache.flink.runtime.entrypoint.FileJobGraphRetriever;
 import org.apache.flink.runtime.entrypoint.JobClusterComponent;
 import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint;
@@ -61,7 +62,7 @@ public class YarnJobClusterEntrypoint extends JobClusterEntrypoint {
 	}
 
 	@Override
-	protected ClusterComponent<?> createDispatcherComponent(Configuration configuration) {
+	protected ClusterComponent<?> createClusterComponent(Configuration configuration) {
 		return new JobClusterComponent(
 			YarnResourceManagerFactory.INSTANCE,
 			FileJobGraphRetriever.createFrom(configuration));
@@ -98,6 +99,6 @@ public class YarnJobClusterEntrypoint extends JobClusterEntrypoint {
 			configuration,
 			workingDirectory);
 
-		yarnJobClusterEntrypoint.startCluster();
+		ClusterEntrypoint.runClusterEntrypoint(yarnJobClusterEntrypoint);
 	}
 }
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
index 116e2ff..e0bebfd 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
@@ -20,8 +20,9 @@ package org.apache.flink.yarn.entrypoint;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.entrypoint.ClusterComponent;
-import org.apache.flink.runtime.entrypoint.SessionClusterEntrypoint;
+import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
 import org.apache.flink.runtime.entrypoint.SessionClusterComponent;
+import org.apache.flink.runtime.entrypoint.SessionClusterEntrypoint;
 import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
@@ -59,7 +60,7 @@ public class YarnSessionClusterEntrypoint extends SessionClusterEntrypoint {
 	}
 
 	@Override
-	protected ClusterComponent<?> createDispatcherComponent(Configuration configuration) {
+	protected ClusterComponent<?> createClusterComponent(Configuration configuration) {
 		return new SessionClusterComponent(YarnResourceManagerFactory.INSTANCE);
 	}
 
@@ -89,6 +90,6 @@ public class YarnSessionClusterEntrypoint extends SessionClusterEntrypoint {
 			configuration,
 			workingDirectory);
 
-		yarnSessionClusterEntrypoint.startCluster();
+		ClusterEntrypoint.runClusterEntrypoint(yarnSessionClusterEntrypoint);
 	}
 }