You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/12/14 08:01:23 UTC

[2/2] flink git commit: [FLINK-8030] Instantiate JobMasterRestEndpoint in JobClusterEntrypoint

[FLINK-8030] Instantiate JobMasterRestEndpoint in JobClusterEntrypoint

This closes #4988.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0d551640
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0d551640
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0d551640

Branch: refs/heads/master
Commit: 0d551640e65073afd8755e04f7817a6379149251
Parents: 0e3027d
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Nov 7 17:27:38 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Dec 13 22:35:21 2017 +0100

----------------------------------------------------------------------
 .../entrypoint/JobClusterEntrypoint.java        | 107 ++++++++++++++++++-
 .../runtime/jobmaster/JobManagerRunner.java     |   4 +
 .../runtime/webmonitor/WebMonitorEndpoint.java  |   2 +-
 3 files changed, 110 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0d551640/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
index 1c8fb21..bd1f573 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
@@ -20,7 +20,9 @@ package org.apache.flink.runtime.entrypoint;
 
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
@@ -29,16 +31,35 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmaster.JobManagerRunner;
 import org.apache.flink.runtime.jobmaster.JobManagerServices;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.jobmaster.JobMasterRestEndpoint;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
+import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
+import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever;
+import org.apache.flink.util.ConfigurationException;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 
+import akka.actor.ActorSystem;
+
 import javax.annotation.Nullable;
 
+import java.util.concurrent.Executor;
+
 /**
  * Base class for per-job cluster entry points.
  */
@@ -48,6 +69,12 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
 
 	private JobManagerServices jobManagerServices;
 
+	private JobMasterRestEndpoint jobMasterRestEndpoint;
+
+	private LeaderRetrievalService jobMasterRetrievalService;
+
+	private LeaderRetrievalService resourceManagerRetrievalService;
+
 	private JobManagerRunner jobManagerRunner;
 
 	public JobClusterEntrypoint(Configuration configuration) {
@@ -74,6 +101,36 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
 
 		jobManagerServices = JobManagerServices.fromConfiguration(configuration, blobServer);
 
+		resourceManagerRetrievalService = highAvailabilityServices.getResourceManagerLeaderRetriever();
+
+		final LeaderGatewayRetriever<JobMasterGateway> jobMasterGatewayRetriever = new RpcGatewayRetriever<>(
+			rpcService,
+			JobMasterGateway.class,
+			JobMasterId::new,
+			10,
+			Time.milliseconds(50L));
+
+		final LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever = new RpcGatewayRetriever<>(
+			rpcService,
+			ResourceManagerGateway.class,
+			ResourceManagerId::new,
+			10,
+			Time.milliseconds(50L));
+
+		// TODO: Remove once we have ported the MetricFetcher to the RpcEndpoint
+		final ActorSystem actorSystem = ((AkkaRpcService) rpcService).getActorSystem();
+		final Time timeout = Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT));
+
+		jobMasterRestEndpoint = createJobMasterRestEndpoint(
+			configuration,
+			jobMasterGatewayRetriever,
+			resourceManagerGatewayRetriever,
+			rpcService.getExecutor(),
+			new AkkaQueryServiceRetriever(actorSystem, timeout));
+
+		LOG.debug("Starting JobMaster REST endpoint.");
+		jobMasterRestEndpoint.start();
+
 		jobManagerRunner = createJobManagerRunner(
 			configuration,
 			ResourceID.generate(),
@@ -83,13 +140,39 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
 			heartbeatServices,
 			metricRegistry,
 			this,
-			null);
+			jobMasterRestEndpoint.getRestAddress());
 
 		LOG.debug("Starting ResourceManager.");
 		resourceManager.start();
+		resourceManagerRetrievalService.start(resourceManagerGatewayRetriever);
 
 		LOG.debug("Starting JobManager.");
 		jobManagerRunner.start();
+
+		jobMasterRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever(
+			jobManagerRunner.getJobGraph().getJobID(),
+			jobManagerRunner.getAddress());
+		jobMasterRetrievalService.start(jobMasterGatewayRetriever);
+	}
+
+	protected JobMasterRestEndpoint createJobMasterRestEndpoint(
+			Configuration configuration,
+			GatewayRetriever<JobMasterGateway> jobMasterGatewayRetriever,
+			GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
+			Executor executor,
+			MetricQueryServiceRetriever metricQueryServiceRetriever) throws ConfigurationException {
+
+		final RestHandlerConfiguration restHandlerConfiguration = RestHandlerConfiguration.fromConfiguration(configuration);
+
+		return new JobMasterRestEndpoint(
+			RestServerEndpointConfiguration.fromConfiguration(configuration),
+			jobMasterGatewayRetriever,
+			configuration,
+			restHandlerConfiguration,
+			resourceManagerGatewayRetriever,
+			executor,
+			metricQueryServiceRetriever);
+
 	}
 
 	protected JobManagerRunner createJobManagerRunner(
@@ -103,7 +186,7 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
 			FatalErrorHandler fatalErrorHandler,
 			@Nullable String restAddress) throws Exception {
 
-		JobGraph jobGraph = retrieveJobGraph(configuration);
+		final JobGraph jobGraph = retrieveJobGraph(configuration);
 
 		return new JobManagerRunner(
 			resourceId,
@@ -123,6 +206,18 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
 	protected void stopClusterComponents(boolean cleanupHaData) throws Exception {
 		Throwable exception = null;
 
+		if (jobMasterRestEndpoint != null) {
+			jobMasterRestEndpoint.shutdown(Time.seconds(10L));
+		}
+
+		if (jobMasterRetrievalService != null) {
+			try {
+				jobMasterRetrievalService.stop();
+			} catch (Throwable t) {
+				exception = ExceptionUtils.firstOrSuppressed(t, exception);
+			}
+		}
+
 		if (jobManagerRunner != null) {
 			try {
 				jobManagerRunner.shutdown();
@@ -139,6 +234,14 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
 			}
 		}
 
+		if (resourceManagerRetrievalService != null) {
+			try {
+				resourceManagerRetrievalService.stop();
+			} catch (Throwable t) {
+				exception = ExceptionUtils.firstOrSuppressed(t, exception);
+			}
+		}
+
 		if (resourceManager != null) {
 			try {
 				resourceManager.shutDown();

http://git-wip-us.apache.org/repos/asf/flink/blob/0d551640/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index ed3d43d..e699d6d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -192,6 +192,10 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F
 		return jobManager.getSelfGateway(JobMasterGateway.class);
 	}
 
+	public JobGraph getJobGraph() {
+		return jobGraph;
+	}
+
 	//----------------------------------------------------------------------------------------------
 	// Lifecycle management
 	//----------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/0d551640/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index 5835c87..703a754 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -142,7 +142,7 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
 
 	@Override
 	protected List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(CompletableFuture<String> restAddressFuture) {
-		ArrayList<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = new ArrayList<>(3);
+		ArrayList<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = new ArrayList<>(30);
 
 		final Time timeout = restConfiguration.getTimeout();
 		final Map<String, String> responseHeaders = restConfiguration.getResponseHeaders();