You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/23 20:22:26 UTC
[07/52] [abbrv] flink git commit: [FLINK-4836] [cluster management]
Add flink mini cluster (part 1)
[FLINK-4836] [cluster management] Add flink mini cluster (part 1)
This implements
- mini cluster configuration
- startup / shutdown of common services (rpc, ha)
- startup / shutdown of JobManager and Dispatcher
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/106cb9e3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/106cb9e3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/106cb9e3
Branch: refs/heads/master
Commit: 106cb9e3ddb8ad019db99bc746e77c2ef48cb5e4
Parents: 9615f15
Author: Stephan Ewen <se...@apache.org>
Authored: Sat Oct 15 00:25:41 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Dec 23 20:54:23 2016 +0100
----------------------------------------------------------------------
.../org/apache/flink/util/ExceptionUtils.java | 12 +-
.../HighAvailabilityServicesUtils.java | 17 +
.../highavailability/ZookeeperHaServices.java | 2 +-
.../runtime/jobmaster/JobManagerRunner.java | 1 -
.../jobmaster/MiniClusterJobDispatcher.java | 394 -----------------
.../flink/runtime/minicluster/MiniCluster.java | 406 ++++++++++++++++++
.../minicluster/MiniClusterConfiguration.java | 147 +++++++
.../minicluster/MiniClusterJobDispatcher.java | 418 +++++++++++++++++++
.../resourcemanager/ResourceManager.java | 2 +-
.../runtime/taskexecutor/JobLeaderService.java | 3 +-
.../TestingHighAvailabilityServices.java | 2 +-
.../runtime/minicluster/MiniClusterITCase.java | 79 ++++
12 files changed, 1075 insertions(+), 408 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/106cb9e3/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
index d1357a8..1069f2d 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
@@ -110,8 +110,6 @@ public final class ExceptionUtils {
}
}
-
-
/**
* Throws the given {@code Throwable} in scenarios where the signatures do not allow you to
* throw an arbitrary Throwable. Errors and RuntimeExceptions are thrown directly, other exceptions
@@ -214,10 +212,8 @@ public final class ExceptionUtils {
}
}
- /**
- * Private constructor to prevent instantiation.
- */
- private ExceptionUtils() {
- throw new RuntimeException();
- }
+ // ------------------------------------------------------------------------
+
+ /** Private constructor to prevent instantiation. */
+ private ExceptionUtils() {}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/106cb9e3/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
index f3da847..9113309 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
@@ -24,6 +24,23 @@ import org.apache.flink.runtime.util.LeaderRetrievalUtils;
public class HighAvailabilityServicesUtils {
+ public static HighAvailabilityServices createAvailableOrEmbeddedServices(Configuration config) throws Exception {
+ HighAvailabilityMode highAvailabilityMode = LeaderRetrievalUtils.getRecoveryMode(config);
+
+ switch (highAvailabilityMode) {
+ case NONE:
+ return new EmbeddedNonHaServices();
+
+ case ZOOKEEPER:
+ throw new UnsupportedOperationException("ZooKeeper high availability services " +
+ "have not been implemented yet.");
+
+ default:
+ throw new Exception("High availability mode " + highAvailabilityMode + " is not supported.");
+ }
+ }
+
+
public static HighAvailabilityServices createHighAvailabilityServices(Configuration configuration) throws Exception {
HighAvailabilityMode highAvailabilityMode = LeaderRetrievalUtils.getRecoveryMode(configuration);
http://git-wip-us.apache.org/repos/asf/flink/blob/106cb9e3/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
index be19c60..e38840b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
@@ -113,7 +113,7 @@ public class ZookeeperHaServices implements HighAvailabilityServices {
}
@Override
- public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultAddress) throws Exception {
+ public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) throws Exception {
return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, getPathForJobManager(jobID));
}
http://git-wip-us.apache.org/repos/asf/flink/blob/106cb9e3/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index 74ca6f3..3313d8a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -289,7 +289,6 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F
@Override
public void jobFinishedByOther() {
try {
- unregisterJobFromHighAvailability();
shutdownInternally();
}
finally {
http://git-wip-us.apache.org/repos/asf/flink/blob/106cb9e3/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java
deleted file mode 100644
index 019ccfe..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java
+++ /dev/null
@@ -1,394 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmaster;
-
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobmanager.OnCompletionActions;
-import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcService;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
-
-/**
- * The dispatcher that runs in the mini cluster, waits for jobs, and starts job masters
- * upon receiving jobs.
- */
-public class MiniClusterJobDispatcher {
-
- private static final Logger LOG = LoggerFactory.getLogger(MiniClusterJobDispatcher.class);
-
- // ------------------------------------------------------------------------
-
- /** lock to ensure that this dispatcher executes only one job at a time */
- private final Object lock = new Object();
-
- /** the configuration with which the mini cluster was started */
- private final Configuration configuration;
-
- /** the RPC service to use by the job managers */
- private final RpcService rpcService;
-
- /** services for discovery, leader election, and recovery */
- private final HighAvailabilityServices haServices;
-
- /** al the services that the JobManager needs, such as BLOB service, factories, etc */
- private final JobManagerServices jobManagerServices;
-
- /** Registry for all metrics in the mini cluster */
- private final MetricRegistry metricRegistry;
-
- /** The number of JobManagers to launch (more than one simulates a high-availability setup) */
- private final int numJobManagers;
-
- /** The runner for the job and master. non-null if a job is currently running */
- private volatile JobManagerRunner[] runners;
-
- /** flag marking the dispatcher as hut down */
- private volatile boolean shutdown;
-
-
- /**
- * Starts a mini cluster job dispatcher.
- *
- * <p>The dispatcher kicks off one JobManager per job, a behavior similar to a
- * non-highly-available setup.
- *
- * @param config The configuration of the mini cluster
- * @param haServices Access to the discovery, leader election, and recovery services
- *
- * @throws Exception Thrown, if the services for the JobMaster could not be started.
- */
- public MiniClusterJobDispatcher(
- Configuration config,
- RpcService rpcService,
- HighAvailabilityServices haServices,
- MetricRegistry metricRegistry) throws Exception {
- this(config, rpcService, haServices, metricRegistry, 1);
- }
-
- /**
- * Starts a mini cluster job dispatcher.
- *
- * <p>The dispatcher may kick off more than one JobManager per job, thus simulating
- * a highly-available setup.
- *
- * @param config The configuration of the mini cluster
- * @param haServices Access to the discovery, leader election, and recovery services
- * @param numJobManagers The number of JobMasters to start for each job.
- *
- * @throws Exception Thrown, if the services for the JobMaster could not be started.
- */
- public MiniClusterJobDispatcher(
- Configuration config,
- RpcService rpcService,
- HighAvailabilityServices haServices,
- MetricRegistry metricRegistry,
- int numJobManagers) throws Exception {
-
- checkArgument(numJobManagers >= 1);
- this.configuration = checkNotNull(config);
- this.rpcService = checkNotNull(rpcService);
- this.haServices = checkNotNull(haServices);
- this.metricRegistry = checkNotNull(metricRegistry);
- this.numJobManagers = numJobManagers;
-
- LOG.info("Creating JobMaster services");
- this.jobManagerServices = JobManagerServices.fromConfiguration(config, haServices);
- }
-
- // ------------------------------------------------------------------------
- // life cycle
- // ------------------------------------------------------------------------
-
- /**
- * Shuts down the mini cluster dispatcher. If a job is currently running, that job will be
- * terminally failed.
- */
- public void shutdown() {
- synchronized (lock) {
- if (!shutdown) {
- shutdown = true;
-
- LOG.info("Shutting down the dispatcher");
-
- // in this shutdown code we copy the references to the stack first,
- // to avoid concurrent modification
-
- JobManagerRunner[] runners = this.runners;
- if (runners != null) {
- this.runners = null;
-
- for (JobManagerRunner runner : runners) {
- runner.shutdown();
- }
- }
- }
- }
- }
-
- // ------------------------------------------------------------------------
- // submitting jobs
- // ------------------------------------------------------------------------
-
- /**
- * This method executes a job in detached mode. The method returns immediately after the job
- * has been added to the
- *
- * @param job The Flink job to execute
- *
- * @throws JobExecutionException Thrown if anything went amiss during initial job launch,
- * or if the job terminally failed.
- */
- public void runDetached(JobGraph job) throws JobExecutionException {
- checkNotNull(job);
-
- LOG.info("Received job for detached execution {} ({})", job.getName(), job.getJobID());
-
- synchronized (lock) {
- checkState(!shutdown, "mini cluster is shut down");
- checkState(runners == null, "mini cluster can only execute one job at a time");
-
- DetachedFinalizer finalizer = new DetachedFinalizer(numJobManagers);
-
- this.runners = startJobRunners(job, finalizer, finalizer);
- }
- }
-
- /**
- * This method runs a job in blocking mode. The method returns only after the job
- * completed successfully, or after it failed terminally.
- *
- * @param job The Flink job to execute
- * @return The result of the job execution
- *
- * @throws JobExecutionException Thrown if anything went amiss during initial job lauch,
- * or if the job terminally failed.
- */
- public JobExecutionResult runJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException {
- checkNotNull(job);
-
- LOG.info("Received job for blocking execution {} ({})", job.getName(), job.getJobID());
- final BlockingJobSync sync = new BlockingJobSync(job.getJobID(), numJobManagers);
-
- synchronized (lock) {
- checkState(!shutdown, "mini cluster is shut down");
- checkState(runners == null, "mini cluster can only execute one job at a time");
-
- this.runners = startJobRunners(job, sync, sync);
- }
-
- try {
- return sync.getResult();
- }
- finally {
- // always clear the status for the next job
- runners = null;
- }
- }
-
- private JobManagerRunner[] startJobRunners(
- JobGraph job,
- OnCompletionActions onCompletion,
- FatalErrorHandler errorHandler) throws JobExecutionException {
- LOG.info("Starting {} JobMaster(s) for job {} ({})", numJobManagers, job.getName(), job.getJobID());
-
- JobManagerRunner[] runners = new JobManagerRunner[numJobManagers];
- for (int i = 0; i < numJobManagers; i++) {
- try {
- runners[i] = new JobManagerRunner(job, configuration,
- rpcService, haServices, jobManagerServices, metricRegistry,
- onCompletion, errorHandler);
- runners[i].start();
- }
- catch (Throwable t) {
- // shut down all the ones so far
- for (int k = 0; k <= i; k++) {
- try {
- if (runners[i] != null) {
- runners[i].shutdown();
- }
- } catch (Throwable ignored) {
- // silent shutdown
- }
- }
-
- throw new JobExecutionException(job.getJobID(), "Could not start the JobManager(s) for the job", t);
- }
- }
-
- return runners;
- }
-
- // ------------------------------------------------------------------------
- // test methods to simulate job master failures
- // ------------------------------------------------------------------------
-
-// public void killJobMaster(int which) {
-// checkArgument(which >= 0 && which < numJobManagers, "no such job master");
-// checkState(!shutdown, "mini cluster is shut down");
-//
-// JobManagerRunner[] runners = this.runners;
-// checkState(runners != null, "mini cluster it not executing a job right now");
-//
-// runners[which].shutdown(new Throwable("kill JobManager"));
-// }
-
- // ------------------------------------------------------------------------
- // utility classes
- // ------------------------------------------------------------------------
-
- /**
- * Simple class that waits for all runners to have reported that they are done.
- * In the case of a high-availability test setup, there may be multiple runners.
- * After that, it marks the mini cluster as ready to receive new jobs.
- */
- private class DetachedFinalizer implements OnCompletionActions, FatalErrorHandler {
-
- private final AtomicInteger numJobManagersToWaitFor;
-
- private DetachedFinalizer(int numJobManagersToWaitFor) {
- this.numJobManagersToWaitFor = new AtomicInteger(numJobManagersToWaitFor);
- }
-
- @Override
- public void jobFinished(JobExecutionResult result) {
- decrementCheckAndCleanup();
- }
-
- @Override
- public void jobFailed(Throwable cause) {
- decrementCheckAndCleanup();
- }
-
- @Override
- public void jobFinishedByOther() {
- decrementCheckAndCleanup();
- }
-
- @Override
- public void onFatalError(Throwable exception) {
- decrementCheckAndCleanup();
- }
-
- private void decrementCheckAndCleanup() {
- if (numJobManagersToWaitFor.decrementAndGet() == 0) {
- MiniClusterJobDispatcher.this.runners = null;
- }
- }
- }
-
- // ------------------------------------------------------------------------
-
- /**
- * This class is used to sync on blocking jobs across multiple runners.
- * Only after all runners reported back that they are finished, the
- * result will be released.
- *
- * That way it is guaranteed that after the blocking job submit call returns,
- * the dispatcher is immediately free to accept another job.
- */
- private static class BlockingJobSync implements OnCompletionActions, FatalErrorHandler {
-
- private final JobID jobId;
-
- private final CountDownLatch jobMastersToWaitFor;
-
- private volatile Throwable jobException;
-
- private volatile Throwable runnerException;
-
- private volatile JobExecutionResult result;
-
- BlockingJobSync(JobID jobId, int numJobMastersToWaitFor) {
- this.jobId = jobId;
- this.jobMastersToWaitFor = new CountDownLatch(numJobMastersToWaitFor);
- }
-
- @Override
- public void jobFinished(JobExecutionResult jobResult) {
- this.result = jobResult;
- jobMastersToWaitFor.countDown();
- }
-
- @Override
- public void jobFailed(Throwable cause) {
- jobException = cause;
- jobMastersToWaitFor.countDown();
- }
-
- @Override
- public void jobFinishedByOther() {
- this.jobMastersToWaitFor.countDown();
- }
-
- @Override
- public void onFatalError(Throwable exception) {
- if (runnerException == null) {
- runnerException = exception;
- }
- }
-
- public JobExecutionResult getResult() throws JobExecutionException, InterruptedException {
- jobMastersToWaitFor.await();
-
- final Throwable jobFailureCause = this.jobException;
- final Throwable runnerException = this.runnerException;
- final JobExecutionResult result = this.result;
-
- // (1) we check if the job terminated with an exception
- // (2) we check whether the job completed successfully
- // (3) we check if we have exceptions from the JobManagers. the job may still have
- // completed successfully in that case, if multiple JobMasters were running
- // and other took over. only if all encounter a fatal error, the job cannot finish
-
- if (jobFailureCause != null) {
- if (jobFailureCause instanceof JobExecutionException) {
- throw (JobExecutionException) jobFailureCause;
- }
- else {
- throw new JobExecutionException(jobId, "The job execution failed", jobFailureCause);
- }
- }
- else if (result != null) {
- return result;
- }
- else if (runnerException != null) {
- throw new JobExecutionException(jobId,
- "The job execution failed because all JobManagers encountered fatal errors", runnerException);
- }
- else {
- throw new IllegalStateException("Bug: Job finished with neither error nor result.");
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/106cb9e3/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
new file mode 100644
index 0000000..1ee38e0
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.minicluster;
+
+import akka.actor.ActorSystem;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.util.ExceptionUtils;
+
+import scala.Option;
+import scala.Tuple2;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import static org.apache.flink.util.ExceptionUtils.firstOrSuppressed;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+
+public class MiniCluster {
+
+ /** The lock to guard startup / shutdown / manipulation methods */
+ private final Object lock = new Object();
+
+ /** The configuration for this mini cluster */
+ private final MiniClusterConfiguration config;
+
+ @GuardedBy("lock")
+ private MetricRegistry metricRegistry;
+
+ @GuardedBy("lock")
+ private RpcService commonRpcService;
+
+ @GuardedBy("lock")
+ private RpcService[] jobManagerRpcServices;
+
+ @GuardedBy("lock")
+ private RpcService[] taskManagerRpcServices;
+
+ @GuardedBy("lock")
+ private HighAvailabilityServices haServices;
+
+ @GuardedBy("lock")
+ private MiniClusterJobDispatcher jobDispatcher;
+
+ /** Flag marking the mini cluster as started/running */
+ @GuardedBy("lock")
+ private boolean running;
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * Creates a new mini cluster with the default configuration:
+ * <ul>
+ * <li>One JobManager</li>
+ * <li>One TaskManager</li>
+ * <li>One task slot in the TaskManager</li>
+ * <li>All components share the same RPC subsystem (minimizes communication overhead)</li>
+ * </ul>
+ */
+ public MiniCluster() {
+ this(new MiniClusterConfiguration());
+ }
+
+ /**
+ *
+ * @param config The configuration for the mini cluster
+ */
+ public MiniCluster(MiniClusterConfiguration config) {
+ this.config = checkNotNull(config, "config may not be null");
+ }
+
+ /**
+ * Creates a mini cluster based on the given configuration.
+ *
+ * @deprecated Use {@link #MiniCluster(MiniClusterConfiguration)} instead.
+ * @see #MiniCluster(MiniClusterConfiguration)
+ */
+ @Deprecated
+ public MiniCluster(Configuration config) {
+ this(createConfig(config, true));
+ }
+
+ /**
+ * Creates a mini cluster based on the given configuration, starting one or more
+ * RPC services, depending on the given flag.
+ *
+ * @deprecated Use {@link #MiniCluster(MiniClusterConfiguration)} instead.
+ * @see #MiniCluster(MiniClusterConfiguration)
+ */
+ @Deprecated
+ public MiniCluster(Configuration config, boolean singleRpcService) {
+ this(createConfig(config, singleRpcService));
+ }
+
+ // ------------------------------------------------------------------------
+ // life cycle
+ // ------------------------------------------------------------------------
+
+ /**
+ * Checks if the mini cluster was started and is running.
+ */
+ public boolean isRunning() {
+ return running;
+ }
+
+ /**
+ * Starts the mini cluster, based on the configured properties.
+ *
+ * @throws Exception This method passes on any exception that occurs during the startup of
+ * the mini cluster.
+ */
+ public void start() throws Exception {
+ synchronized (lock) {
+ checkState(!running, "FlinkMiniCluster is already running");
+
+ final Configuration configuration = new UnmodifiableConfiguration(config.getConfiguration());
+ final Time rpcTimeout = config.getRpcTimeout();
+ final int numJobManagers = config.getNumJobManagers();
+ final int numTaskManagers = config.getNumTaskManagers();
+ final boolean singleRpc = config.getUseSingleRpcSystem();
+
+ try {
+ metricRegistry = createMetricRegistry(configuration);
+
+ RpcService[] jobManagerRpcServices = new RpcService[numJobManagers];
+ RpcService[] taskManagerRpcServices = new RpcService[numTaskManagers];
+
+ // bring up all the RPC services
+ if (singleRpc) {
+ // one common RPC for all
+ commonRpcService = createRpcService(configuration, rpcTimeout, false, null);
+
+ // set that same RPC service for all JobManagers and TaskManagers
+ for (int i = 0; i < numJobManagers; i++) {
+ jobManagerRpcServices[i] = commonRpcService;
+ }
+ for (int i = 0; i < numTaskManagers; i++) {
+ taskManagerRpcServices[i] = commonRpcService;
+ }
+ }
+ else {
+ // start a new service per component, possibly with custom bind addresses
+ final String jobManagerBindAddress = config.getJobManagerBindAddress();
+ final String taskManagerBindAddress = config.getTaskManagerBindAddress();
+
+ for (int i = 0; i < numJobManagers; i++) {
+ jobManagerRpcServices[i] = createRpcService(
+ configuration, rpcTimeout, true, jobManagerBindAddress);
+ }
+
+ for (int i = 0; i < numTaskManagers; i++) {
+ taskManagerRpcServices[i] = createRpcService(
+ configuration, rpcTimeout, true, taskManagerBindAddress);
+ }
+
+ this.jobManagerRpcServices = jobManagerRpcServices;
+ this.taskManagerRpcServices = taskManagerRpcServices;
+ }
+
+ // create the high-availability services
+ haServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(configuration);
+
+ // bring up the dispatcher that launches JobManagers when jobs submitted
+ jobDispatcher = new MiniClusterJobDispatcher(
+ configuration, haServices, metricRegistry, numJobManagers, jobManagerRpcServices);
+ }
+ catch (Exception e) {
+ // cleanup everything
+ try {
+ shutdownInternally();
+ } catch (Exception ee) {
+ e.addSuppressed(ee);
+ }
+ throw e;
+ }
+
+ // now officially mark this as running
+ running = true;
+ }
+ }
+
+ /**
+ * Shuts down the mini cluster, failing all currently executing jobs.
+ * The mini cluster can be started again by calling the {@link #start()} method again.
+ *
+ * <p>This method shuts down all started services and components,
+ * even if an exception occurs in the process of shutting down some component.
+ *
+ * @throws Exception Thrown, if the shutdown did not complete cleanly.
+ */
+ public void shutdown() throws Exception {
+ synchronized (lock) {
+ if (running) {
+ try {
+ shutdownInternally();
+ } finally {
+ running = false;
+ }
+ }
+ }
+ }
+
+ @GuardedBy("lock")
+ private void shutdownInternally() throws Exception {
+ // this should always be called under the lock
+ assert Thread.holdsLock(lock);
+
+ // collect the first exception, but continue and add all successive
+ // exceptions as suppressed
+ Throwable exception = null;
+
+ // cancel all jobs and shut down the job dispatcher
+ if (jobDispatcher != null) {
+ try {
+ jobDispatcher.shutdown();
+ } catch (Exception e) {
+ exception = firstOrSuppressed(e, exception);
+ }
+ jobDispatcher = null;
+ }
+
+ // shut down high-availability services
+ if (haServices != null) {
+ try {
+ haServices.shutdown();
+ } catch (Exception e) {
+ exception = firstOrSuppressed(e, exception);
+ }
+ haServices = null;
+ }
+
+ // shut down the RpcServices
+ if (commonRpcService != null) {
+ exception = shutDownRpc(commonRpcService, exception);
+ commonRpcService = null;
+ }
+ if (jobManagerRpcServices != null) {
+ for (RpcService service : jobManagerRpcServices) {
+ exception = shutDownRpc(service, exception);
+ }
+ jobManagerRpcServices = null;
+ }
+ if (taskManagerRpcServices != null) {
+ for (RpcService service : taskManagerRpcServices) {
+ exception = shutDownRpc(service, exception);
+ }
+ taskManagerRpcServices = null;
+ }
+
+ // metrics shutdown
+ if (metricRegistry != null) {
+ metricRegistry.shutdown();
+ metricRegistry = null;
+ }
+
+ // if anything went wrong, throw the first error with all the additional suppressed exceptions
+ if (exception != null) {
+ ExceptionUtils.rethrowException(exception, "Error while shutting down mini cluster");
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // running jobs
+ // ------------------------------------------------------------------------
+
+ /**
+ * This method executes a job in detached mode. The method returns immediately after the job
+ * has been added to the
+ *
+ * @param job The Flink job to execute
+ *
+ * @throws JobExecutionException Thrown if anything went amiss during initial job launch,
+ * or if the job terminally failed.
+ */
+ public void runDetached(JobGraph job) throws JobExecutionException {
+ checkNotNull(job, "job is null");
+
+ synchronized (lock) {
+ checkState(running, "mini cluster is not running");
+ jobDispatcher.runDetached(job);
+ }
+ }
+
+ /**
+ * This method runs a job in blocking mode. The method returns only after the job
+ * completed successfully, or after it failed terminally.
+ *
+ * @param job The Flink job to execute
+ * @return The result of the job execution
+ *
+ * @throws JobExecutionException Thrown if anything went amiss during initial job launch,
+ * or if the job terminally failed.
+ */
+ public JobExecutionResult runJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException {
+ checkNotNull(job, "job is null");
+
+ MiniClusterJobDispatcher dispatcher;
+ synchronized (lock) {
+ checkState(running, "mini cluster is not running");
+ dispatcher = this.jobDispatcher;
+ }
+
+ return dispatcher.runJobBlocking(job);
+ }
+
+ // ------------------------------------------------------------------------
+ // factories - can be overridden by subclasses to alter behavior
+ // ------------------------------------------------------------------------
+
+ /**
+ * Factory method to create the metric registry for the mini cluster
+ *
+ * @param config The configuration of the mini cluster
+ */
+ protected MetricRegistry createMetricRegistry(Configuration config) {
+ return new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
+ }
+
+ /**
+ * Factory method to instantiate the RPC service.
+ *
+ * @param config
+ * The configuration of the mini cluster
+ * @param askTimeout
+ * The default RPC timeout for asynchronous "ask" requests.
+ * @param remoteEnabled
+ * True, if the RPC service should be reachable from other (remote) RPC services.
+ * @param bindAddress
+ * The address to bind the RPC service to. Only relevant when "remoteEnabled" is true.
+ *
+ * @return The instantiated RPC service
+ */
+ protected RpcService createRpcService(
+ Configuration config,
+ Time askTimeout,
+ boolean remoteEnabled,
+ String bindAddress) {
+
+ ActorSystem actorSystem;
+ if (remoteEnabled) {
+ Tuple2<String, Object> remoteSettings = new Tuple2<String, Object>(bindAddress, 0);
+ actorSystem = AkkaUtils.createActorSystem(config, Option.apply(remoteSettings));
+ } else {
+ actorSystem = AkkaUtils.createLocalActorSystem(config);
+ }
+
+ return new AkkaRpcService(actorSystem, askTimeout);
+ }
+
+ // ------------------------------------------------------------------------
+ // miscellaneous utilities
+ // ------------------------------------------------------------------------
+
+ private static Throwable shutDownRpc(RpcService rpcService, Throwable priorException) {
+ try {
+ if (rpcService != null) {
+ rpcService.stopService();
+ }
+ return priorException;
+ }
+ catch (Throwable t) {
+ return firstOrSuppressed(t, priorException);
+ }
+ }
+
+ private static MiniClusterConfiguration createConfig(Configuration cfg, boolean singleActorSystem) {
+ MiniClusterConfiguration config = cfg == null ?
+ new MiniClusterConfiguration() :
+ new MiniClusterConfiguration(cfg);
+
+ if (!singleActorSystem) {
+ config.setUseRpcServicePerComponent();
+ }
+
+ return config;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/106cb9e3/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
new file mode 100644
index 0000000..a8d7b10
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.minicluster;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+public class MiniClusterConfiguration {
+
+ private final Configuration config;
+
+ private boolean singleRpcService = true;
+
+ private int numJobManagers = 1;
+
+ private int numTaskManagers = 1;
+
+ private String commonBindAddress;
+
+ // ------------------------------------------------------------------------
+ // Construction
+ // ------------------------------------------------------------------------
+
+ public MiniClusterConfiguration() {
+ this.config = new Configuration();
+ }
+
+ public MiniClusterConfiguration(Configuration config) {
+ checkNotNull(config);
+ this.config = new Configuration(config);
+ }
+
+ // ------------------------------------------------------------------------
+ // setters
+ // ------------------------------------------------------------------------
+
+ public void addConfiguration(Configuration config) {
+ checkNotNull(config, "configuration must not be null");
+ this.config.addAll(config);
+ }
+
+ public void setUseSingleRpcService() {
+ this.singleRpcService = true;
+ }
+
+ public void setUseRpcServicePerComponent() {
+ this.singleRpcService = false;
+ }
+
+ public void setNumJobManagers(int numJobManagers) {
+ checkArgument(numJobManagers >= 1, "must have at least one JobManager");
+ this.numJobManagers = numJobManagers;
+ }
+
+ public void setNumTaskManagers(int numTaskManagers) {
+ checkArgument(numTaskManagers >= 1, "must have at least one TaskManager");
+ this.numTaskManagers = numTaskManagers;
+ }
+
+ public void setNumTaskManagerSlots(int numTaskSlots) {
+ checkArgument(numTaskSlots >= 1, "must have at least one task slot per TaskManager");
+ this.config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numTaskSlots);
+ }
+
+ public void setCommonRpcBindAddress(String bindAddress) {
+ checkNotNull(bindAddress, "bind address must not be null");
+ this.commonBindAddress = bindAddress;
+ }
+
+ // ------------------------------------------------------------------------
+ // getters
+ // ------------------------------------------------------------------------
+
+ public Configuration getConfiguration() {
+ return config;
+ }
+
+ public boolean getUseSingleRpcSystem() {
+ return singleRpcService;
+ }
+
+ public int getNumJobManagers() {
+ return numJobManagers;
+ }
+
+ public int getNumTaskManagers() {
+ return numTaskManagers;
+ }
+
+ public int getNumSlotsPerTaskManager() {
+ return config.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
+ }
+
+ public String getJobManagerBindAddress() {
+ return commonBindAddress != null ?
+ commonBindAddress :
+ config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
+ }
+
+ public String getTaskManagerBindAddress() {
+ return commonBindAddress != null ?
+ commonBindAddress :
+ config.getString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, "localhost");
+ }
+
+ public Time getRpcTimeout() {
+ FiniteDuration duration = AkkaUtils.getTimeout(config);
+ return Time.of(duration.length(), duration.unit());
+ }
+
+ // ------------------------------------------------------------------------
+ // utils
+ // ------------------------------------------------------------------------
+
+ @Override
+ public String toString() {
+ return "MiniClusterConfiguration{" +
+ "singleRpcService=" + singleRpcService +
+ ", numJobManagers=" + numJobManagers +
+ ", numTaskManagers=" + numTaskManagers +
+ ", commonBindAddress='" + commonBindAddress + '\'' +
+ ", config=" + config +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/106cb9e3/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
new file mode 100644
index 0000000..d99eff6
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.minicluster;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.JobManagerServices;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The dispatcher that runs in the mini cluster, waits for jobs, and starts job masters
+ * upon receiving jobs.
+ */
+public class MiniClusterJobDispatcher {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MiniClusterJobDispatcher.class);
+
+ // ------------------------------------------------------------------------
+
+ /** lock to ensure that this dispatcher executes only one job at a time */
+ private final Object lock = new Object();
+
+ /** the configuration with which the mini cluster was started */
+ private final Configuration configuration;
+
+ /** the RPC services to use by the job managers */
+ private final RpcService[] rpcServices;
+
+ /** services for discovery, leader election, and recovery */
+ private final HighAvailabilityServices haServices;
+
+ /** al the services that the JobManager needs, such as BLOB service, factories, etc */
+ private final JobManagerServices jobManagerServices;
+
+ /** Registry for all metrics in the mini cluster */
+ private final MetricRegistry metricRegistry;
+
+ /** The number of JobManagers to launch (more than one simulates a high-availability setup) */
+ private final int numJobManagers;
+
+ /** The runner for the job and master. non-null if a job is currently running */
+ private volatile JobManagerRunner[] runners;
+
+ /** flag marking the dispatcher as hut down */
+ private volatile boolean shutdown;
+
+
+ /**
+ * Starts a mini cluster job dispatcher.
+ *
+ * <p>The dispatcher kicks off one JobManager per job, a behavior similar to a
+ * non-highly-available setup.
+ *
+ * @param config The configuration of the mini cluster
+ * @param haServices Access to the discovery, leader election, and recovery services
+ *
+ * @throws Exception Thrown, if the services for the JobMaster could not be started.
+ */
+ public MiniClusterJobDispatcher(
+ Configuration config,
+ RpcService rpcService,
+ HighAvailabilityServices haServices,
+ MetricRegistry metricRegistry) throws Exception {
+ this(config, haServices, metricRegistry, 1, new RpcService[] { rpcService });
+ }
+
+ /**
+ * Starts a mini cluster job dispatcher.
+ *
+ * <p>The dispatcher may kick off more than one JobManager per job, thus simulating
+ * a highly-available setup.
+ *
+ * @param config The configuration of the mini cluster
+ * @param haServices Access to the discovery, leader election, and recovery services
+ * @param numJobManagers The number of JobMasters to start for each job.
+ *
+ * @throws Exception Thrown, if the services for the JobMaster could not be started.
+ */
+ public MiniClusterJobDispatcher(
+ Configuration config,
+ HighAvailabilityServices haServices,
+ MetricRegistry metricRegistry,
+ int numJobManagers,
+ RpcService[] rpcServices) throws Exception {
+
+ checkArgument(numJobManagers >= 1);
+ checkArgument(rpcServices.length == numJobManagers);
+
+ this.configuration = checkNotNull(config);
+ this.rpcServices = rpcServices;
+ this.haServices = checkNotNull(haServices);
+ this.metricRegistry = checkNotNull(metricRegistry);
+ this.numJobManagers = numJobManagers;
+
+ LOG.info("Creating JobMaster services");
+ this.jobManagerServices = JobManagerServices.fromConfiguration(config, haServices);
+ }
+
+ // ------------------------------------------------------------------------
+ // life cycle
+ // ------------------------------------------------------------------------
+
+ /**
+ * Shuts down the mini cluster dispatcher. If a job is currently running, that job will be
+ * terminally failed.
+ */
+ public void shutdown() {
+ synchronized (lock) {
+ if (!shutdown) {
+ shutdown = true;
+
+ LOG.info("Shutting down the dispatcher");
+
+ // in this shutdown code we copy the references to the stack first,
+ // to avoid concurrent modification
+
+ JobManagerRunner[] runners = this.runners;
+ if (runners != null) {
+ this.runners = null;
+
+ for (JobManagerRunner runner : runners) {
+ runner.shutdown();
+ }
+ }
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // submitting jobs
+ // ------------------------------------------------------------------------
+
+ /**
+ * This method executes a job in detached mode. The method returns immediately after the job
+ * has been added to the
+ *
+ * @param job The Flink job to execute
+ *
+ * @throws JobExecutionException Thrown if anything went amiss during initial job launch,
+ * or if the job terminally failed.
+ */
+ public void runDetached(JobGraph job) throws JobExecutionException {
+ checkNotNull(job);
+
+ LOG.info("Received job for detached execution {} ({})", job.getName(), job.getJobID());
+
+ synchronized (lock) {
+ checkState(!shutdown, "mini cluster is shut down");
+ checkState(runners == null, "mini cluster can only execute one job at a time");
+
+ DetachedFinalizer finalizer = new DetachedFinalizer(numJobManagers);
+
+ this.runners = startJobRunners(job, finalizer, finalizer);
+ }
+ }
+
+ /**
+ * This method runs a job in blocking mode. The method returns only after the job
+ * completed successfully, or after it failed terminally.
+ *
+ * @param job The Flink job to execute
+ * @return The result of the job execution
+ *
+ * @throws JobExecutionException Thrown if anything went amiss during initial job launch,
+ * or if the job terminally failed.
+ */
+ public JobExecutionResult runJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException {
+ checkNotNull(job);
+
+ LOG.info("Received job for blocking execution {} ({})", job.getName(), job.getJobID());
+ final BlockingJobSync sync = new BlockingJobSync(job.getJobID(), numJobManagers);
+
+ synchronized (lock) {
+ checkState(!shutdown, "mini cluster is shut down");
+ checkState(runners == null, "mini cluster can only execute one job at a time");
+
+ this.runners = startJobRunners(job, sync, sync);
+ }
+
+ try {
+ return sync.getResult();
+ }
+ finally {
+ // always clear the status for the next job
+ runners = null;
+ }
+ }
+
+ private JobManagerRunner[] startJobRunners(
+ JobGraph job,
+ OnCompletionActions onCompletion,
+ FatalErrorHandler errorHandler) throws JobExecutionException {
+
+ LOG.info("Starting {} JobMaster(s) for job {} ({})", numJobManagers, job.getName(), job.getJobID());
+
+ // we first need to mark the job as running in the HA services, so that the
+ // JobManager leader will recognize that it as work to do
+ try {
+ haServices.getRunningJobsRegistry().setJobRunning(job.getJobID());
+ }
+ catch (Throwable t) {
+ throw new JobExecutionException(job.getJobID(),
+ "Could not register the job at the high-availability services", t);
+ }
+
+ // start all JobManagers
+ JobManagerRunner[] runners = new JobManagerRunner[numJobManagers];
+ for (int i = 0; i < numJobManagers; i++) {
+ try {
+ runners[i] = new JobManagerRunner(job, configuration,
+ rpcServices[i], haServices, jobManagerServices, metricRegistry,
+ onCompletion, errorHandler);
+ runners[i].start();
+ }
+ catch (Throwable t) {
+ // shut down all the ones so far
+ for (int k = 0; k <= i; k++) {
+ try {
+ if (runners[i] != null) {
+ runners[i].shutdown();
+ }
+ } catch (Throwable ignored) {
+ // silent shutdown
+ }
+ }
+
+ // un-register the job from the high.availability services
+ try {
+ haServices.getRunningJobsRegistry().setJobFinished(job.getJobID());
+ }
+ catch (Throwable tt) {
+ LOG.warn("Could not properly unregister job from high-availability services", tt);
+ }
+
+ throw new JobExecutionException(job.getJobID(), "Could not start the JobManager(s) for the job", t);
+ }
+ }
+
+ return runners;
+ }
+
+ // ------------------------------------------------------------------------
+ // test methods to simulate job master failures
+ // ------------------------------------------------------------------------
+
+// public void killJobMaster(int which) {
+// checkArgument(which >= 0 && which < numJobManagers, "no such job master");
+// checkState(!shutdown, "mini cluster is shut down");
+//
+// JobManagerRunner[] runners = this.runners;
+// checkState(runners != null, "mini cluster it not executing a job right now");
+//
+// runners[which].shutdown(new Throwable("kill JobManager"));
+// }
+
+ // ------------------------------------------------------------------------
+ // utility classes
+ // ------------------------------------------------------------------------
+
+ /**
+ * Simple class that waits for all runners to have reported that they are done.
+ * In the case of a high-availability test setup, there may be multiple runners.
+ * After that, it marks the mini cluster as ready to receive new jobs.
+ */
+ private class DetachedFinalizer implements OnCompletionActions, FatalErrorHandler {
+
+ private final AtomicInteger numJobManagersToWaitFor;
+
+ private DetachedFinalizer(int numJobManagersToWaitFor) {
+ this.numJobManagersToWaitFor = new AtomicInteger(numJobManagersToWaitFor);
+ }
+
+ @Override
+ public void jobFinished(JobExecutionResult result) {
+ decrementCheckAndCleanup();
+ }
+
+ @Override
+ public void jobFailed(Throwable cause) {
+ decrementCheckAndCleanup();
+ }
+
+ @Override
+ public void jobFinishedByOther() {
+ decrementCheckAndCleanup();
+ }
+
+ @Override
+ public void onFatalError(Throwable exception) {
+ decrementCheckAndCleanup();
+ }
+
+ private void decrementCheckAndCleanup() {
+ if (numJobManagersToWaitFor.decrementAndGet() == 0) {
+ MiniClusterJobDispatcher.this.runners = null;
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * This class is used to sync on blocking jobs across multiple runners.
+ * Only after all runners reported back that they are finished, the
+ * result will be released.
+ *
+ * That way it is guaranteed that after the blocking job submit call returns,
+ * the dispatcher is immediately free to accept another job.
+ */
+ private static class BlockingJobSync implements OnCompletionActions, FatalErrorHandler {
+
+ private final JobID jobId;
+
+ private final CountDownLatch jobMastersToWaitFor;
+
+ private volatile Throwable jobException;
+
+ private volatile Throwable runnerException;
+
+ private volatile JobExecutionResult result;
+
+ BlockingJobSync(JobID jobId, int numJobMastersToWaitFor) {
+ this.jobId = jobId;
+ this.jobMastersToWaitFor = new CountDownLatch(numJobMastersToWaitFor);
+ }
+
+ @Override
+ public void jobFinished(JobExecutionResult jobResult) {
+ this.result = jobResult;
+ jobMastersToWaitFor.countDown();
+ }
+
+ @Override
+ public void jobFailed(Throwable cause) {
+ jobException = cause;
+ jobMastersToWaitFor.countDown();
+ }
+
+ @Override
+ public void jobFinishedByOther() {
+ this.jobMastersToWaitFor.countDown();
+ }
+
+ @Override
+ public void onFatalError(Throwable exception) {
+ if (runnerException == null) {
+ runnerException = exception;
+ }
+ }
+
+ public JobExecutionResult getResult() throws JobExecutionException, InterruptedException {
+ jobMastersToWaitFor.await();
+
+ final Throwable jobFailureCause = this.jobException;
+ final Throwable runnerException = this.runnerException;
+ final JobExecutionResult result = this.result;
+
+ // (1) we check if the job terminated with an exception
+ // (2) we check whether the job completed successfully
+ // (3) we check if we have exceptions from the JobManagers. the job may still have
+ // completed successfully in that case, if multiple JobMasters were running
+ // and other took over. only if all encounter a fatal error, the job cannot finish
+
+ if (jobFailureCause != null) {
+ if (jobFailureCause instanceof JobExecutionException) {
+ throw (JobExecutionException) jobFailureCause;
+ }
+ else {
+ throw new JobExecutionException(jobId, "The job execution failed", jobFailureCause);
+ }
+ }
+ else if (result != null) {
+ return result;
+ }
+ else if (runnerException != null) {
+ throw new JobExecutionException(jobId,
+ "The job execution failed because all JobManagers encountered fatal errors", runnerException);
+ }
+ else {
+ throw new IllegalStateException("Bug: Job finished with neither error nor result.");
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/106cb9e3/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 6f6d525..3122804 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -188,7 +188,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
} else {
try {
LeaderRetrievalService jobMasterLeaderRetriever =
- highAvailabilityServices.getJobManagerLeaderRetriever(jobID, jobMasterAddress);
+ highAvailabilityServices.getJobManagerLeaderRetriever(jobID);
jobIdLeaderListener = new JobIdLeaderListener(jobID, jobMasterLeaderRetriever);
} catch (Exception e) {
log.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID);
http://git-wip-us.apache.org/repos/asf/flink/blob/106cb9e3/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
index 9e71349..e7f52e2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
@@ -191,8 +191,7 @@ public class JobLeaderService {
LOG.info("Add job {} for job leader monitoring.", jobId);
final LeaderRetrievalService leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever(
- jobId,
- defaultTargetAddress);
+ jobId);
JobLeaderService.JobManagerLeaderListener jobManagerLeaderListener = new JobManagerLeaderListener(jobId);
http://git-wip-us.apache.org/repos/asf/flink/blob/106cb9e3/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
index 3e88e8c..877812b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
@@ -91,7 +91,7 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
}
@Override
- public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultJobManagerAddress) throws Exception {
+ public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) throws Exception {
LeaderRetrievalService service = this.jobMasterLeaderRetrievers.get(jobID);
if (service != null) {
return service;
http://git-wip-us.apache.org/repos/asf/flink/blob/106cb9e3/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
new file mode 100644
index 0000000..dd43337
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.minicluster;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+/**
+ * Integration test cases for the {@link MiniCluster}.
+ */
+public class MiniClusterITCase extends TestLogger {
+
+// @Test
+ public void runJobWithSingleRpcService() throws Exception {
+ MiniClusterConfiguration cfg = new MiniClusterConfiguration();
+
+ // should be the default, but set anyways to make sure the test
+ // stays valid when the default changes
+ cfg.setUseSingleRpcService();
+
+ MiniCluster miniCluster = new MiniCluster(cfg);
+ executeJob(miniCluster);
+ }
+
+// @Test
+ public void runJobWithMultipleRpcServices() throws Exception {
+ MiniClusterConfiguration cfg = new MiniClusterConfiguration();
+ cfg.setUseRpcServicePerComponent();
+
+ MiniCluster miniCluster = new MiniCluster(cfg);
+ executeJob(miniCluster);
+ }
+
+// @Test
+ public void runJobWithMultipleJobManagers() throws Exception {
+ MiniClusterConfiguration cfg = new MiniClusterConfiguration();
+ cfg.setNumJobManagers(3);
+
+ MiniCluster miniCluster = new MiniCluster(cfg);
+ executeJob(miniCluster);
+ }
+
+ private static void executeJob(MiniCluster miniCluster) throws Exception {
+ miniCluster.start();
+
+ JobGraph job = getSimpleJob();
+ miniCluster.runJobBlocking(job);
+ }
+
+ private static JobGraph getSimpleJob() {
+ JobVertex task = new JobVertex("Test task");
+ task.setParallelism(1);
+ task.setMaxParallelism(1);
+ task.setInvokableClass(NoOpInvokable.class);
+
+ return new JobGraph(new JobID(), "Test Job", task);
+ }
+}