You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/12/14 08:01:23 UTC
[2/2] flink git commit: [FLINK-8030] Instantiate
JobMasterRestEndpoint in JobClusterEntrypoint
[FLINK-8030] Instantiate JobMasterRestEndpoint in JobClusterEntrypoint
This closes #4988.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0d551640
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0d551640
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0d551640
Branch: refs/heads/master
Commit: 0d551640e65073afd8755e04f7817a6379149251
Parents: 0e3027d
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Nov 7 17:27:38 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Dec 13 22:35:21 2017 +0100
----------------------------------------------------------------------
.../entrypoint/JobClusterEntrypoint.java | 107 ++++++++++++++++++-
.../runtime/jobmaster/JobManagerRunner.java | 4 +
.../runtime/webmonitor/WebMonitorEndpoint.java | 2 +-
3 files changed, 110 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0d551640/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
index 1c8fb21..bd1f573 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
@@ -20,7 +20,9 @@ package org.apache.flink.runtime.entrypoint;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.WebOptions;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
@@ -29,16 +31,35 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.OnCompletionActions;
import org.apache.flink.runtime.jobmaster.JobManagerRunner;
import org.apache.flink.runtime.jobmaster.JobManagerServices;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.jobmaster.JobMasterRestEndpoint;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
+import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
+import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever;
+import org.apache.flink.util.ConfigurationException;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
+import akka.actor.ActorSystem;
+
import javax.annotation.Nullable;
+import java.util.concurrent.Executor;
+
/**
* Base class for per-job cluster entry points.
*/
@@ -48,6 +69,12 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
private JobManagerServices jobManagerServices;
+ private JobMasterRestEndpoint jobMasterRestEndpoint;
+
+ private LeaderRetrievalService jobMasterRetrievalService;
+
+ private LeaderRetrievalService resourceManagerRetrievalService;
+
private JobManagerRunner jobManagerRunner;
public JobClusterEntrypoint(Configuration configuration) {
@@ -74,6 +101,36 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
jobManagerServices = JobManagerServices.fromConfiguration(configuration, blobServer);
+ resourceManagerRetrievalService = highAvailabilityServices.getResourceManagerLeaderRetriever();
+
+ final LeaderGatewayRetriever<JobMasterGateway> jobMasterGatewayRetriever = new RpcGatewayRetriever<>(
+ rpcService,
+ JobMasterGateway.class,
+ JobMasterId::new,
+ 10,
+ Time.milliseconds(50L));
+
+ final LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever = new RpcGatewayRetriever<>(
+ rpcService,
+ ResourceManagerGateway.class,
+ ResourceManagerId::new,
+ 10,
+ Time.milliseconds(50L));
+
+ // TODO: Remove once we have ported the MetricFetcher to the RpcEndpoint
+ final ActorSystem actorSystem = ((AkkaRpcService) rpcService).getActorSystem();
+ final Time timeout = Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT));
+
+ jobMasterRestEndpoint = createJobMasterRestEndpoint(
+ configuration,
+ jobMasterGatewayRetriever,
+ resourceManagerGatewayRetriever,
+ rpcService.getExecutor(),
+ new AkkaQueryServiceRetriever(actorSystem, timeout));
+
+ LOG.debug("Starting JobMaster REST endpoint.");
+ jobMasterRestEndpoint.start();
+
jobManagerRunner = createJobManagerRunner(
configuration,
ResourceID.generate(),
@@ -83,13 +140,39 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
heartbeatServices,
metricRegistry,
this,
- null);
+ jobMasterRestEndpoint.getRestAddress());
LOG.debug("Starting ResourceManager.");
resourceManager.start();
+ resourceManagerRetrievalService.start(resourceManagerGatewayRetriever);
LOG.debug("Starting JobManager.");
jobManagerRunner.start();
+
+ jobMasterRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever(
+ jobManagerRunner.getJobGraph().getJobID(),
+ jobManagerRunner.getAddress());
+ jobMasterRetrievalService.start(jobMasterGatewayRetriever);
+ }
+
+ protected JobMasterRestEndpoint createJobMasterRestEndpoint(
+ Configuration configuration,
+ GatewayRetriever<JobMasterGateway> jobMasterGatewayRetriever,
+ GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
+ Executor executor,
+ MetricQueryServiceRetriever metricQueryServiceRetriever) throws ConfigurationException {
+
+ final RestHandlerConfiguration restHandlerConfiguration = RestHandlerConfiguration.fromConfiguration(configuration);
+
+ return new JobMasterRestEndpoint(
+ RestServerEndpointConfiguration.fromConfiguration(configuration),
+ jobMasterGatewayRetriever,
+ configuration,
+ restHandlerConfiguration,
+ resourceManagerGatewayRetriever,
+ executor,
+ metricQueryServiceRetriever);
+
}
protected JobManagerRunner createJobManagerRunner(
@@ -103,7 +186,7 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
FatalErrorHandler fatalErrorHandler,
@Nullable String restAddress) throws Exception {
- JobGraph jobGraph = retrieveJobGraph(configuration);
+ final JobGraph jobGraph = retrieveJobGraph(configuration);
return new JobManagerRunner(
resourceId,
@@ -123,6 +206,18 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
protected void stopClusterComponents(boolean cleanupHaData) throws Exception {
Throwable exception = null;
+ if (jobMasterRestEndpoint != null) {
+ jobMasterRestEndpoint.shutdown(Time.seconds(10L));
+ }
+
+ if (jobMasterRetrievalService != null) {
+ try {
+ jobMasterRetrievalService.stop();
+ } catch (Throwable t) {
+ exception = ExceptionUtils.firstOrSuppressed(t, exception);
+ }
+ }
+
if (jobManagerRunner != null) {
try {
jobManagerRunner.shutdown();
@@ -139,6 +234,14 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
}
}
+ if (resourceManagerRetrievalService != null) {
+ try {
+ resourceManagerRetrievalService.stop();
+ } catch (Throwable t) {
+ exception = ExceptionUtils.firstOrSuppressed(t, exception);
+ }
+ }
+
if (resourceManager != null) {
try {
resourceManager.shutDown();
http://git-wip-us.apache.org/repos/asf/flink/blob/0d551640/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index ed3d43d..e699d6d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -192,6 +192,10 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F
return jobManager.getSelfGateway(JobMasterGateway.class);
}
+ public JobGraph getJobGraph() {
+ return jobGraph;
+ }
+
//----------------------------------------------------------------------------------------------
// Lifecycle management
//----------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0d551640/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index 5835c87..703a754 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -142,7 +142,7 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
@Override
protected List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(CompletableFuture<String> restAddressFuture) {
- ArrayList<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = new ArrayList<>(3);
+ ArrayList<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = new ArrayList<>(30);
final Time timeout = restConfiguration.getTimeout();
final Map<String, String> responseHeaders = restConfiguration.getResponseHeaders();