You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/07/26 16:28:28 UTC
[1/2] flink git commit: [FLINK-7082] Add generic entry point for
session and per-job clusters
Repository: flink
Updated Branches:
refs/heads/master ee16ee5a2 -> 5e19a0da6
[FLINK-7082] Add generic entry point for session and per-job clusters
This closes #4261.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e08ebce9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e08ebce9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e08ebce9
Branch: refs/heads/master
Commit: e08ebce906a0a8b05e728c8d3c275433f52788b9
Parents: ee16ee5
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Jul 6 17:16:18 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jul 26 18:03:16 2017 +0200
----------------------------------------------------------------------
.../clusterframework/BootstrapTools.java | 8 +-
.../dispatcher/StandaloneDispatcher.java | 2 +-
.../entrypoint/ClusterConfiguration.java | 37 +++
.../runtime/entrypoint/ClusterEntrypoint.java | 247 +++++++++++++++++++
.../entrypoint/JobClusterEntrypoint.java | 185 ++++++++++++++
.../entrypoint/SessionClusterEntrypoint.java | 135 ++++++++++
.../resourcemanager/ResourceManager.java | 6 +
.../yarn/YarnFlinkApplicationMasterRunner.java | 7 +-
8 files changed, 619 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e08ebce9/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index a98e574..fd30e43 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -69,10 +69,10 @@ public class BootstrapTools {
* @throws Exception
*/
public static ActorSystem startActorSystem(
- Configuration configuration,
- String listeningAddress,
- String portRangeDefinition,
- Logger logger) throws Exception {
+ Configuration configuration,
+ String listeningAddress,
+ String portRangeDefinition,
+ Logger logger) throws Exception {
// parse port range definition and create port iterator
Iterator<Integer> portsIterator;
http://git-wip-us.apache.org/repos/asf/flink/blob/e08ebce9/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
index 6687657..54d698e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
@@ -38,7 +38,7 @@ import org.apache.flink.runtime.rpc.RpcService;
* can be used as the default for all different session clusters.
*/
public class StandaloneDispatcher extends Dispatcher {
- protected StandaloneDispatcher(
+ public StandaloneDispatcher(
RpcService rpcService,
String endpointId,
Configuration configuration,
http://git-wip-us.apache.org/repos/asf/flink/blob/e08ebce9/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterConfiguration.java
new file mode 100644
index 0000000..dbec0b6
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterConfiguration.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint;
+
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Configuration class which contains the parsed command line arguments for
+ * the {@link ClusterEntrypoint}.
+ */
+public class ClusterConfiguration {
+ private final String configDir;
+
+ public ClusterConfiguration(String configDir) {
+ this.configDir = Preconditions.checkNotNull(configDir);
+ }
+
+ public String getConfigDir() {
+ return configDir;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e08ebce9/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
new file mode 100644
index 0000000..fa866e4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.runtime.security.SecurityUtils;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+
+import akka.actor.ActorSystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Base class for the Flink cluster entry points.
+ *
+ * <p>Specialization of this class can be used for the session mode and the per-job mode
+ */
+public abstract class ClusterEntrypoint implements FatalErrorHandler {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(ClusterEntrypoint.class);
+
+ protected static final int SUCCESS_RETURN_CODE = 0;
+ protected static final int STARTUP_FAILURE_RETURN_CODE = 1;
+ protected static final int RUNTIME_FAILURE_RETURN_CODE = 2;
+
+ /** The lock to guard startup / shutdown / manipulation methods. */
+ private final Object lock = new Object();
+
+ @GuardedBy("lock")
+ private MetricRegistry metricRegistry = null;
+
+ @GuardedBy("lock")
+ private HighAvailabilityServices haServices = null;
+
+ @GuardedBy("lock")
+ private BlobServer blobServer = null;
+
+ @GuardedBy("lock")
+ private HeartbeatServices heartbeatServices = null;
+
+ @GuardedBy("lock")
+ private RpcService commonRpcService = null;
+
+ protected void startCluster(String[] args) {
+ final ClusterConfiguration clusterConfiguration = parseArguments(args);
+
+ final Configuration configuration = loadConfiguration(clusterConfiguration);
+
+ try {
+ SecurityContext securityContext = installSecurityContext(configuration);
+
+ securityContext.runSecured(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ runCluster(configuration);
+
+ return null;
+ }
+ });
+ } catch (Throwable t) {
+ LOG.error("Cluster initialization failed.", t);
+
+ try {
+ shutDown(false);
+ } catch (Throwable st) {
+ LOG.error("Could not properly shut down cluster entrypoint.", st);
+ }
+
+ System.exit(STARTUP_FAILURE_RETURN_CODE);
+ }
+ }
+
+ protected ClusterConfiguration parseArguments(String[] args) {
+ ParameterTool parameterTool = ParameterTool.fromArgs(args);
+
+ final String configDir = parameterTool.get("configDir", "");
+
+ return new ClusterConfiguration(configDir);
+ }
+
+ protected Configuration loadConfiguration(ClusterConfiguration clusterConfiguration) {
+ return GlobalConfiguration.loadConfiguration(clusterConfiguration.getConfigDir());
+ }
+
+ protected SecurityContext installSecurityContext(Configuration configuration) throws Exception {
+ SecurityUtils.install(new SecurityUtils.SecurityConfiguration(configuration));
+
+ return SecurityUtils.getInstalledContext();
+ }
+
+ protected void runCluster(Configuration configuration) throws Exception {
+ synchronized (lock) {
+ initializeServices(configuration);
+
+ startClusterComponents(
+ configuration,
+ commonRpcService,
+ haServices,
+ blobServer,
+ heartbeatServices,
+ metricRegistry);
+ }
+ }
+
+ protected void initializeServices(Configuration configuration) throws Exception {
+ assert(Thread.holdsLock(lock));
+
+ LOG.info("Initializing cluster services.");
+
+ final String bindAddress = configuration.getString(JobManagerOptions.ADDRESS);
+ // TODO: Add support for port ranges
+ final String portRange = String.valueOf(configuration.getInteger(JobManagerOptions.PORT));
+
+ commonRpcService = createRpcService(configuration, bindAddress, portRange);
+ haServices = createHaServices(configuration, commonRpcService.getExecutor());
+ blobServer = new BlobServer(configuration, haServices.createBlobStore());
+ heartbeatServices = createHeartbeatServices(configuration);
+ metricRegistry = createMetricRegistry(configuration);
+ }
+
+ protected RpcService createRpcService(
+ Configuration configuration,
+ String bindAddress,
+ String portRange) throws Exception {
+ ActorSystem actorSystem = BootstrapTools.startActorSystem(configuration, bindAddress, portRange, LOG);
+ FiniteDuration duration = AkkaUtils.getTimeout(configuration);
+ return new AkkaRpcService(actorSystem, Time.of(duration.length(), duration.unit()));
+ }
+
+ protected HighAvailabilityServices createHaServices(
+ Configuration configuration,
+ Executor executor) throws Exception {
+ return HighAvailabilityServicesUtils.createHighAvailabilityServices(
+ configuration,
+ executor,
+ HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
+ }
+
+ protected HeartbeatServices createHeartbeatServices(Configuration configuration) {
+ return HeartbeatServices.fromConfiguration(configuration);
+ }
+
+ protected MetricRegistry createMetricRegistry(Configuration configuration) {
+ return new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(configuration));
+ }
+
+ protected void shutDown(boolean cleanupHaData) throws FlinkException {
+ Throwable exception = null;
+
+ synchronized (lock) {
+ if (metricRegistry != null) {
+ try {
+ metricRegistry.shutdown();
+ } catch (Throwable t) {
+ exception = t;
+ }
+ }
+
+ if (blobServer != null) {
+ try {
+ blobServer.close();
+ } catch (Throwable t) {
+ exception = ExceptionUtils.firstOrSuppressed(t, exception);
+ }
+ }
+
+ if (haServices != null) {
+ try {
+ if (cleanupHaData) {
+ haServices.closeAndCleanupAllData();
+ } else {
+ haServices.close();
+ }
+ } catch (Throwable t) {
+ exception = ExceptionUtils.firstOrSuppressed(t, exception);
+ }
+ }
+
+ if (commonRpcService != null) {
+ try {
+ commonRpcService.stopService();
+ } catch (Throwable t) {
+ exception = ExceptionUtils.firstOrSuppressed(t, exception);
+ }
+ }
+ }
+
+ if (exception != null) {
+ throw new FlinkException("Could not properly shut down the cluster services.", exception);
+ }
+ }
+
+ @Override
+ public void onFatalError(Throwable exception) {
+ LOG.error("Fatal error occurred in the cluster entrypoint.", exception);
+
+ System.exit(RUNTIME_FAILURE_RETURN_CODE);
+ }
+
+ protected abstract void startClusterComponents(
+ Configuration configuration,
+ RpcService rpcService,
+ HighAvailabilityServices highAvailabilityServices,
+ BlobServer blobServer,
+ HeartbeatServices heartbeatServices,
+ MetricRegistry metricRegistry) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e08ebce9/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
new file mode 100644
index 0000000..4133f07
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.BlobService;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Base class for per-job cluster entry points.
+ */
+public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
+
+ private ResourceManager<?> resourceManager;
+
+ private JobManagerRunner jobManagerRunner;
+
+ @Override
+ protected void startClusterComponents(
+ Configuration configuration,
+ RpcService rpcService,
+ HighAvailabilityServices highAvailabilityServices,
+ BlobServer blobServer,
+ HeartbeatServices heartbeatServices,
+ MetricRegistry metricRegistry) throws Exception {
+
+ resourceManager = createResourceManager(
+ configuration,
+ ResourceID.generate(),
+ rpcService,
+ highAvailabilityServices,
+ heartbeatServices,
+ metricRegistry);
+
+ jobManagerRunner = createJobManagerRunner(
+ configuration,
+ ResourceID.generate(),
+ rpcService,
+ highAvailabilityServices,
+ blobServer,
+ heartbeatServices,
+ metricRegistry);
+
+ LOG.debug("Starting ResourceManager.");
+ resourceManager.start();
+
+ LOG.debug("Starting JobManager.");
+ jobManagerRunner.start();
+ }
+
+ protected JobManagerRunner createJobManagerRunner(
+ Configuration configuration,
+ ResourceID resourceId,
+ RpcService rpcService,
+ HighAvailabilityServices highAvailabilityServices,
+ BlobService blobService,
+ HeartbeatServices heartbeatServices,
+ MetricRegistry metricRegistry) throws Exception {
+
+ JobGraph jobGraph = retrieveJobGraph(configuration);
+
+ return new JobManagerRunner(
+ resourceId,
+ jobGraph,
+ configuration,
+ rpcService,
+ highAvailabilityServices,
+ blobService,
+ heartbeatServices,
+ metricRegistry,
+ new TerminatingOnCompleteActions(jobGraph.getJobID()),
+ this);
+ }
+
+ @Override
+ protected void shutDown(boolean cleanupHaData) throws FlinkException {
+ Throwable exception = null;
+
+ if (jobManagerRunner != null) {
+ try {
+ jobManagerRunner.shutdown();
+ } catch (Throwable t) {
+ exception = t;
+ }
+ }
+
+ if (resourceManager != null) {
+ try {
+ resourceManager.shutDown();
+ } catch (Throwable t) {
+ exception = ExceptionUtils.firstOrSuppressed(t, exception);
+ }
+ }
+
+ try {
+ super.shutDown(cleanupHaData);
+ } catch (Throwable t) {
+ exception = ExceptionUtils.firstOrSuppressed(t, exception);
+ }
+
+ if (exception != null) {
+ throw new FlinkException("Could not properly shut down the session cluster entry point.", exception);
+ }
+ }
+
+ private void shutDownAndTerminate(boolean cleanupHaData) {
+ try {
+ shutDown(cleanupHaData);
+ } catch (Throwable t) {
+ LOG.error("Could not properly shut down cluster entrypoint.", t);
+ }
+
+ System.exit(SUCCESS_RETURN_CODE);
+ }
+
+ protected abstract ResourceManager<?> createResourceManager(
+ Configuration configuration,
+ ResourceID resourceId,
+ RpcService rpcService,
+ HighAvailabilityServices highAvailabilityServices,
+ HeartbeatServices heartbeatServices,
+ MetricRegistry metricRegistry) throws Exception;
+
+ protected abstract JobGraph retrieveJobGraph(Configuration configuration);
+
+ private final class TerminatingOnCompleteActions implements OnCompletionActions {
+
+ private final JobID jobId;
+
+ private TerminatingOnCompleteActions(JobID jobId) {
+ this.jobId = Preconditions.checkNotNull(jobId);
+ }
+
+ @Override
+ public void jobFinished(JobExecutionResult result) {
+ LOG.info("Job({}) finished.", jobId);
+
+ shutDownAndTerminate(true);
+ }
+
+ @Override
+ public void jobFailed(Throwable cause) {
+ LOG.info("Job({}) failed.", jobId, cause);
+
+ shutDownAndTerminate(false);
+ }
+
+ @Override
+ public void jobFinishedByOther() {
+ LOG.info("Job({}) was finished by another JobManager.", jobId);
+
+ shutDownAndTerminate(false);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e08ebce9/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
new file mode 100644
index 0000000..02a5e4a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.runtime.dispatcher.StandaloneDispatcher;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+
+/**
+ * Base class for session cluster entry points.
+ */
+public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
+
+ private ResourceManager<?> resourceManager;
+
+ private Dispatcher dispatcher;
+
+ @Override
+ protected void startClusterComponents(
+ Configuration configuration,
+ RpcService rpcService,
+ HighAvailabilityServices highAvailabilityServices,
+ BlobServer blobServer,
+ HeartbeatServices heartbeatServices,
+ MetricRegistry metricRegistry) throws Exception {
+
+ resourceManager = createResourceManager(
+ configuration,
+ ResourceID.generate(),
+ rpcService,
+ highAvailabilityServices,
+ heartbeatServices,
+ metricRegistry,
+ this);
+
+ dispatcher = createDispatcher(
+ rpcService,
+ highAvailabilityServices,
+ blobServer,
+ heartbeatServices,
+ metricRegistry,
+ this);
+
+ LOG.debug("Starting ResourceManager.");
+ resourceManager.start();
+
+ LOG.debug("Starting Dispatcher.");
+ dispatcher.start();
+ }
+
+ @Override
+ protected void shutDown(boolean cleanupHaData) throws FlinkException {
+ Throwable exception = null;
+
+ if (dispatcher != null) {
+ try {
+ dispatcher.shutDown();
+ } catch (Throwable t) {
+ exception = t;
+ }
+ }
+
+ if (resourceManager != null) {
+ try {
+ resourceManager.shutDown();
+ } catch (Throwable t) {
+ exception = ExceptionUtils.firstOrSuppressed(t, exception);
+ }
+ }
+
+ try {
+ super.shutDown(cleanupHaData);
+ } catch (Throwable t) {
+ exception = ExceptionUtils.firstOrSuppressed(t, exception);
+ }
+
+ if (exception != null) {
+ throw new FlinkException("Could not properly shut down the session cluster entry point.", exception);
+ }
+ }
+
+ protected Dispatcher createDispatcher(
+ RpcService rpcService,
+ HighAvailabilityServices highAvailabilityServices,
+ BlobServer blobServer,
+ HeartbeatServices heartbeatServices,
+ MetricRegistry metricRegistry,
+ FatalErrorHandler fatalErrorHandler) throws Exception {
+
+ // create the default dispatcher
+ return new StandaloneDispatcher(
+ rpcService,
+ Dispatcher.DISPATCHER_NAME,
+ highAvailabilityServices,
+ blobServer,
+ heartbeatServices,
+ metricRegistry,
+ fatalErrorHandler);
+ }
+
+ protected abstract ResourceManager<?> createResourceManager(
+ Configuration configuration,
+ ResourceID resourceId,
+ RpcService rpcService,
+ HighAvailabilityServices highAvailabilityServices,
+ HeartbeatServices heartbeatServices,
+ MetricRegistry metricRegistry,
+ FatalErrorHandler fatalErrorHandler) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e08ebce9/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index d2e0222..ebba9ca 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -224,6 +224,12 @@ public abstract class ResourceManager<WorkerType extends Serializable>
}
try {
+ jobLeaderIdService.stop();
+ } catch (Exception e) {
+ exception = ExceptionUtils.firstOrSuppressed(e, exception);
+ }
+
+ try {
super.shutDown();
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
http://git-wip-us.apache.org/repos/asf/flink/blob/e08ebce9/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
index 0107f80..1f5af17 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.OnCompletionActions;
import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.JobMaster;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.resourcemanager.ResourceManager;
@@ -70,9 +71,9 @@ import scala.concurrent.duration.FiniteDuration;
* <p>It starts actor system and the actors for {@link JobManagerRunner}
* and {@link YarnResourceManager}.
*
- * <p>The JobManagerRunner start a {@link org.apache.flink.runtime.jobmaster.JobMaster}
- * JobMaster handles Flink job execution, while the YarnResourceManager handles container
- * allocation and failure detection.
+ * <p>The JobManagerRunner start a {@link JobMaster} JobMaster handles Flink job
+ * execution, while the YarnResourceManager handles container allocation and failure
+ * detection.
*/
public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicationMasterRunner
implements OnCompletionActions, FatalErrorHandler {
[2/2] flink git commit: [FLINK-7135] [flip-6] Pass in proper
configuration to Dispatcher component
Posted by tr...@apache.org.
[FLINK-7135] [flip-6] Pass in proper configuration to Dispatcher component
This closes #4286.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5e19a0da
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5e19a0da
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5e19a0da
Branch: refs/heads/master
Commit: 5e19a0da63d578dd0638cba9b112076eed530fa8
Parents: e08ebce
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Jul 26 12:02:19 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jul 26 18:03:17 2017 +0200
----------------------------------------------------------------------
.../apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java | 3 +++
1 file changed, 3 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5e19a0da/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
index 02a5e4a..4c7df1b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
@@ -60,6 +60,7 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
this);
dispatcher = createDispatcher(
+ configuration,
rpcService,
highAvailabilityServices,
blobServer,
@@ -106,6 +107,7 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
}
protected Dispatcher createDispatcher(
+ Configuration configuration,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
BlobServer blobServer,
@@ -117,6 +119,7 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
return new StandaloneDispatcher(
rpcService,
Dispatcher.DISPATCHER_NAME,
+ configuration,
highAvailabilityServices,
blobServer,
heartbeatServices,