You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/09/28 08:21:27 UTC
[44/50] [abbrv] flink git commit: [FLINK-4408] [JobManager] Introduce
JobMasterRunner and implement job submission & setting up the ExecutionGraph
[FLINK-4408] [JobManager] Introduce JobMasterRunner and implement job submission & setting up the ExecutionGraph
This closes #2480
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d9c4814c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d9c4814c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d9c4814c
Branch: refs/heads/flip-6
Commit: d9c4814c96653bf885e41e1e023e2d8dd6688a76
Parents: 073f608
Author: Kurt Young <yk...@gmail.com>
Authored: Thu Sep 8 12:00:13 2016 +0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Sep 27 19:25:01 2016 +0200
----------------------------------------------------------------------
.../flink/api/common/JobExecutionResult.java | 2 +-
.../flink/api/common/JobSubmissionResult.java | 2 +-
.../HighAvailabilityServices.java | 12 +
.../runtime/highavailability/NonHaServices.java | 16 +-
.../runtime/jobmanager/OnCompletionActions.java | 31 ++
.../runtime/jobmanager/scheduler/Scheduler.java | 9 +
.../runtime/jobmaster/JobManagerRunner.java | 288 +++++++++++
.../runtime/jobmaster/JobManagerServices.java | 73 +++
.../flink/runtime/jobmaster/JobMaster.java | 485 ++++++++++++++-----
.../runtime/jobmaster/JobMasterGateway.java | 13 +
.../jobmaster/MiniClusterJobDispatcher.java | 385 +++++++++++++++
.../flink/runtime/rpc/FatalErrorHandler.java | 24 +
.../runtime/taskexecutor/TaskExecutor.java | 12 +
.../TestingHighAvailabilityServices.java | 39 +-
.../jobmaster/JobManagerRunnerMockTest.java | 254 ++++++++++
.../flink/runtime/rpc/RpcConnectionTest.java | 17 +-
16 files changed, 1533 insertions(+), 129 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d9c4814c/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java b/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java
index cb4ecc5..7286cc5 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java
@@ -32,7 +32,7 @@ import java.util.concurrent.TimeUnit;
@Public
public class JobExecutionResult extends JobSubmissionResult {
- private long netRuntime;
+ private final long netRuntime;
private final Map<String, Object> accumulatorResults;
http://git-wip-us.apache.org/repos/asf/flink/blob/d9c4814c/flink-core/src/main/java/org/apache/flink/api/common/JobSubmissionResult.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/JobSubmissionResult.java b/flink-core/src/main/java/org/apache/flink/api/common/JobSubmissionResult.java
index c5dc869..b0e7e24 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/JobSubmissionResult.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/JobSubmissionResult.java
@@ -26,7 +26,7 @@ import org.apache.flink.annotation.Public;
@Public
public class JobSubmissionResult {
- private JobID jobID;
+ private final JobID jobID;
public JobSubmissionResult(JobID jobID) {
this.jobID = jobID;
http://git-wip-us.apache.org/repos/asf/flink/blob/d9c4814c/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
index 7634176..d67e927 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
@@ -19,6 +19,8 @@
package org.apache.flink.runtime.highavailability;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -61,4 +63,14 @@ public interface HighAvailabilityServices {
* @param jobID The identifier of the job running the election.
*/
LeaderElectionService getJobMasterLeaderElectionService(JobID jobID) throws Exception;
+
+ /**
+ * Gets the checkpoint recovery factory for the job manager
+ */
+ CheckpointRecoveryFactory getCheckpointRecoveryFactory() throws Exception;
+
+ /**
+ * Gets the submitted job graph store for the job manager
+ */
+ SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d9c4814c/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
index 33dc2d7..a2c9cc4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
@@ -19,13 +19,17 @@
package org.apache.flink.runtime.highavailability;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.hadoop.shaded.org.jboss.netty.util.internal.ConcurrentHashMap;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.jobmanager.StandaloneSubmittedJobGraphStore;
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.leaderelection.StandaloneLeaderElectionService;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -88,4 +92,14 @@ public class NonHaServices implements HighAvailabilityServices {
public LeaderElectionService getJobMasterLeaderElectionService(JobID jobID) throws Exception {
return new StandaloneLeaderElectionService();
}
+
+ @Override
+ public CheckpointRecoveryFactory getCheckpointRecoveryFactory() throws Exception {
+ return new StandaloneCheckpointRecoveryFactory();
+ }
+
+ @Override
+ public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception {
+ return new StandaloneSubmittedJobGraphStore();
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d9c4814c/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java
new file mode 100644
index 0000000..6de4253
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+
+public interface OnCompletionActions extends FatalErrorHandler {
+
+ void jobFinished(JobExecutionResult result);
+
+ void jobFailed(Throwable cause);
+
+ void jobFinishedByOther();
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/d9c4814c/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index b839e0e..aa09314 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -31,6 +31,7 @@ import java.util.Queue;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import akka.dispatch.Futures;
@@ -57,6 +58,7 @@ import org.apache.flink.util.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.ExecutionContext;
+import scala.concurrent.ExecutionContext$;
/**
* The scheduler is responsible for distributing the ready-to-run tasks among instances and slots.
@@ -110,6 +112,13 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
/**
* Creates a new scheduler.
*/
+ public Scheduler(ExecutorService executor) {
+ this(ExecutionContext$.MODULE$.fromExecutor(executor));
+ }
+
+ /**
+ * Creates a new scheduler.
+ */
public Scheduler(ExecutionContext executionContext) {
this.executionContext = executionContext;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d9c4814c/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
new file mode 100644
index 0000000..bc2bf9a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.rpc.RpcService;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.UUID;
+import java.util.concurrent.Executor;
+
+/**
+ * The runner for the job manager. It deals with job level leader election and make underlying job manager
+ * properly reacted.
+ */
+public class JobManagerRunner implements LeaderContender, OnCompletionActions {
+
+ private final Logger log = LoggerFactory.getLogger(JobManagerRunner.class);
+
+ /** Lock to ensure that this runner can deal with leader election event and job completion notifies simultaneously */
+ private final Object lock = new Object();
+
+ /** The job graph needs to run */
+ private final JobGraph jobGraph;
+
+ private final OnCompletionActions toNotify;
+
+ /** The execution context which is used to execute futures */
+ private final Executor executionContext;
+
+ // TODO: use this to decide whether the job is finished by other
+ private final CheckpointRecoveryFactory checkpointRecoveryFactory;
+
+ /** Leader election for this job */
+ private final LeaderElectionService leaderElectionService;
+
+ private final JobMaster jobManager;
+
+ /** Leader session id when granted leadership */
+ private UUID leaderSessionID;
+
+ /** flag marking the runner as shut down */
+ private volatile boolean shutdown;
+
+ public JobManagerRunner(
+ final JobGraph jobGraph,
+ final Configuration configuration,
+ final RpcService rpcService,
+ final HighAvailabilityServices haServices,
+ final OnCompletionActions toNotify) throws Exception
+ {
+ this(jobGraph, configuration, rpcService, haServices,
+ JobManagerServices.fromConfiguration(configuration), toNotify);
+ }
+
+ public JobManagerRunner(
+ final JobGraph jobGraph,
+ final Configuration configuration,
+ final RpcService rpcService,
+ final HighAvailabilityServices haServices,
+ final JobManagerServices jobManagerServices,
+ final OnCompletionActions toNotify) throws Exception
+ {
+ this.jobGraph = jobGraph;
+ this.toNotify = toNotify;
+ this.executionContext = rpcService.getExecutor();
+ this.checkpointRecoveryFactory = haServices.getCheckpointRecoveryFactory();
+ this.leaderElectionService = haServices.getJobMasterLeaderElectionService(jobGraph.getJobID());
+ this.leaderSessionID = null;
+
+ this.jobManager = new JobMaster(
+ jobGraph, configuration, rpcService, haServices,
+ jobManagerServices.libraryCacheManager,
+ jobManagerServices.restartStrategyFactory,
+ jobManagerServices.savepointStore,
+ jobManagerServices.timeout,
+ new Scheduler(jobManagerServices.executorService),
+ jobManagerServices.jobManagerMetricGroup,
+ this);
+ }
+
+ //----------------------------------------------------------------------------------------------
+ // Lifecycle management
+ //----------------------------------------------------------------------------------------------
+
+ public void start() throws Exception {
+ jobManager.init();
+ jobManager.start();
+
+ try {
+ leaderElectionService.start(this);
+ }
+ catch (Exception e) {
+ log.error("Could not start the JobManager because the leader election service did not start.", e);
+ throw new Exception("Could not start the leader election service.", e);
+ }
+ }
+
+ public void shutdown() {
+ shutdown(new Exception("The JobManager runner is shutting down"));
+ }
+
+ public void shutdown(Throwable cause) {
+ // TODO what is the cause used for ?
+ shutdownInternally();
+ }
+
+ private void shutdownInternally() {
+ synchronized (lock) {
+ shutdown = true;
+
+ if (leaderElectionService != null) {
+ try {
+ leaderElectionService.stop();
+ } catch (Exception e) {
+ log.error("Could not properly shutdown the leader election service.");
+ }
+ }
+
+ jobManager.shutDown();
+ }
+ }
+
+ //----------------------------------------------------------------------------------------------
+ // Result and error handling methods
+ //----------------------------------------------------------------------------------------------
+
+ /**
+ * Job completion notification triggered by JobManager
+ */
+ @Override
+ public void jobFinished(JobExecutionResult result) {
+ try {
+ shutdownInternally();
+ }
+ finally {
+ if (toNotify != null) {
+ toNotify.jobFinished(result);
+ }
+ }
+ }
+
+ /**
+ * Job completion notification triggered by JobManager
+ */
+ @Override
+ public void jobFailed(Throwable cause) {
+ try {
+ shutdownInternally();
+ }
+ finally {
+ if (toNotify != null) {
+ toNotify.jobFailed(cause);
+ }
+ }
+ }
+
+ /**
+ * Job completion notification triggered by self
+ */
+ @Override
+ public void jobFinishedByOther() {
+ try {
+ shutdownInternally();
+ }
+ finally {
+ if (toNotify != null) {
+ toNotify.jobFinishedByOther();
+ }
+ }
+ }
+
+ /**
+ * Job completion notification triggered by JobManager or self
+ */
+ @Override
+ public void onFatalError(Throwable exception) {
+ // first and in any case, notify our handler, so it can react fast
+ try {
+ if (toNotify != null) {
+ toNotify.onFatalError(exception);
+ }
+ }
+ finally {
+ log.error("JobManager runner encountered a fatal error.", exception);
+ shutdownInternally();
+ }
+ }
+
+ //----------------------------------------------------------------------------------------------
+ // Leadership methods
+ //----------------------------------------------------------------------------------------------
+
+ @Override
+ public void grantLeadership(final UUID leaderSessionID) {
+ synchronized (lock) {
+ if (shutdown) {
+ log.info("JobManagerRunner already shutdown.");
+ return;
+ }
+
+ log.info("JobManager runner for job {} ({}) was granted leadership with session id {} at {}.",
+ jobGraph.getName(), jobGraph.getJobID(), leaderSessionID, getAddress());
+
+ // The operation may be blocking, but since this runner is idle before it been granted the leadership,
+ // it's okay that job manager wait for the operation complete
+ leaderElectionService.confirmLeaderSessionID(leaderSessionID);
+ this.leaderSessionID = leaderSessionID;
+
+ // Double check the leadership after we confirm that, there is a small chance that multiple
+ // job managers schedule the same job after if they try to recover at the same time.
+ // This will eventually be noticed, but can not be ruled out from the beginning.
+ if (leaderElectionService.hasLeadership()) {
+ if (isJobFinishedByOthers()) {
+ log.info("Job {} ({}) already finished by others.", jobGraph.getName(), jobGraph.getJobID());
+ jobFinishedByOther();
+ } else {
+ jobManager.getSelf().startJob();
+ }
+ }
+ }
+ }
+
+ @Override
+ public void revokeLeadership() {
+ synchronized (lock) {
+ if (shutdown) {
+ log.info("JobManagerRunner already shutdown.");
+ return;
+ }
+
+ log.info("JobManager for job {} ({}) was revoked leadership at {}.",
+ jobGraph.getName(), jobGraph.getJobID(), getAddress());
+
+ leaderSessionID = null;
+ jobManager.getSelf().suspendJob(new Exception("JobManager is no longer the leader."));
+ }
+ }
+
+ @Override
+ public String getAddress() {
+ return jobManager.getAddress();
+ }
+
+ @Override
+ public void handleError(Exception exception) {
+ log.error("Leader Election Service encountered a fatal error.", exception);
+ onFatalError(exception);
+ }
+
+ @VisibleForTesting
+ boolean isJobFinishedByOthers() {
+ // TODO
+ return false;
+ }
+
+ @VisibleForTesting
+ boolean isShutdown() {
+ return shutdown;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/d9c4814c/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
new file mode 100644
index 0000000..e6beba6
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
+import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+
+import java.util.concurrent.ExecutorService;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Utility class to hold all auxiliary services used by the {@link JobMaster}.
+ */
+public class JobManagerServices {
+
+ public final ExecutorService executorService;
+
+ public final BlobLibraryCacheManager libraryCacheManager;
+
+ public final RestartStrategyFactory restartStrategyFactory;
+
+ public final SavepointStore savepointStore;
+
+ public final Time timeout;
+
+ public final JobManagerMetricGroup jobManagerMetricGroup;
+
+ public JobManagerServices(
+ ExecutorService executorService,
+ BlobLibraryCacheManager libraryCacheManager,
+ RestartStrategyFactory restartStrategyFactory,
+ SavepointStore savepointStore,
+ Time timeout,
+ JobManagerMetricGroup jobManagerMetricGroup) {
+
+ this.executorService = checkNotNull(executorService);
+ this.libraryCacheManager = checkNotNull(libraryCacheManager);
+ this.restartStrategyFactory = checkNotNull(restartStrategyFactory);
+ this.savepointStore = checkNotNull(savepointStore);
+ this.timeout = checkNotNull(timeout);
+ this.jobManagerMetricGroup = checkNotNull(jobManagerMetricGroup);
+ }
+
+ // ------------------------------------------------------------------------
+ // Creating the components from a configuration
+ // ------------------------------------------------------------------------
+
+ public static JobManagerServices fromConfiguration(Configuration config) throws Exception {
+ // TODO not yet implemented
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/d9c4814c/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 1537396..b52a23c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -18,21 +18,50 @@
package org.apache.flink.runtime.jobmaster;
-import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
+import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.stats.SimpleCheckpointStatsTracker;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.client.JobSubmissionException;
+import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.leaderelection.LeaderContender;
-import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
+import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.rpc.RpcMethod;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcMethod;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
-import org.apache.flink.util.Preconditions;
+import scala.concurrent.ExecutionContext$;
+import scala.concurrent.duration.FiniteDuration;
-import java.util.UUID;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* JobMaster implementation. The job master is responsible for the execution of a single
@@ -41,7 +70,7 @@ import java.util.UUID;
* It offers the following methods as part of its rpc interface to interact with the JobMaster
* remotely:
* <ul>
- * <li>{@link #updateTaskExecutionState(TaskExecutionState)} updates the task execution state for
+ * <li>{@link #updateTaskExecutionState(TaskExecutionState)} updates the task execution state for
* given task</li>
* </ul>
*/
@@ -52,7 +81,6 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
/** Logical representation of the job */
private final JobGraph jobGraph;
- private final JobID jobID;
/** Configuration of the job */
private final Configuration configuration;
@@ -60,32 +88,67 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
/** Service to contend for and retrieve the leadership of JM and RM */
private final HighAvailabilityServices highAvailabilityServices;
- /** Leader Management */
- private LeaderElectionService leaderElectionService = null;
- private UUID leaderSessionID;
+ /** Blob cache manager used across jobs */
+ private final BlobLibraryCacheManager libraryCacheManager;
+
+ /** Factory to create restart strategy for this job */
+ private final RestartStrategyFactory restartStrategyFactory;
+
+ /** Store for save points */
+ private final SavepointStore savepointStore;
+
+ /** The timeout for this job */
+ private final Time timeout;
+
+ /** The scheduler to use for scheduling new tasks as they are needed */
+ private final Scheduler scheduler;
+
+ /** The metrics group used across jobs */
+ private final JobManagerMetricGroup jobManagerMetricGroup;
+
+ /** The execution context which is used to execute futures */
+ private final Executor executionContext;
+
+ private final OnCompletionActions jobCompletionActions;
+
+ /** The execution graph of this job */
+ private volatile ExecutionGraph executionGraph;
+
+ /** The checkpoint recovery factory used by this job */
+ private CheckpointRecoveryFactory checkpointRecoveryFactory;
+
+ private ClassLoader userCodeLoader;
+
+ private RestartStrategy restartStrategy;
+
+ private MetricGroup jobMetrics;
- /**
- * The JM's Constructor
- *
- * @param jobGraph The representation of the job's execution plan
- * @param configuration The job's configuration
- * @param rpcService The RPC service at which the JM serves
- * @param highAvailabilityService The cluster's HA service from the JM can elect and retrieve leaders.
- */
public JobMaster(
JobGraph jobGraph,
Configuration configuration,
RpcService rpcService,
- HighAvailabilityServices highAvailabilityService) {
-
+ HighAvailabilityServices highAvailabilityService,
+ BlobLibraryCacheManager libraryCacheManager,
+ RestartStrategyFactory restartStrategyFactory,
+ SavepointStore savepointStore,
+ Time timeout,
+ Scheduler scheduler,
+ JobManagerMetricGroup jobManagerMetricGroup,
+ OnCompletionActions jobCompletionActions)
+ {
super(rpcService);
- this.jobGraph = Preconditions.checkNotNull(jobGraph);
- this.jobID = Preconditions.checkNotNull(jobGraph.getJobID());
-
- this.configuration = Preconditions.checkNotNull(configuration);
-
- this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityService);
+ this.jobGraph = checkNotNull(jobGraph);
+ this.configuration = checkNotNull(configuration);
+ this.highAvailabilityServices = checkNotNull(highAvailabilityService);
+ this.libraryCacheManager = checkNotNull(libraryCacheManager);
+ this.restartStrategyFactory = checkNotNull(restartStrategyFactory);
+ this.savepointStore = checkNotNull(savepointStore);
+ this.timeout = checkNotNull(timeout);
+ this.scheduler = checkNotNull(scheduler);
+ this.jobManagerMetricGroup = checkNotNull(jobManagerMetricGroup);
+ this.executionContext = checkNotNull(rpcService.getExecutor());
+ this.jobCompletionActions = checkNotNull(jobCompletionActions);
}
public ResourceManagerGateway getResourceManager() {
@@ -93,93 +156,294 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
}
//----------------------------------------------------------------------------------------------
- // Initialization methods
+ // Lifecycle management
//----------------------------------------------------------------------------------------------
+
+ /**
+ * Initializing the job execution environment, should be called before start. Any error occurred during
+ * initialization will be treated as job submission failure.
+ *
+ * @throws JobSubmissionException
+ */
+ public void init() throws JobSubmissionException {
+ log.info("Initializing job {} ({}).", jobGraph.getName(), jobGraph.getJobID());
+
+ try {
+ // IMPORTANT: We need to make sure that the library registration is the first action,
+ // because this makes sure that the uploaded jar files are removed in case of
+ // unsuccessful
+ try {
+ libraryCacheManager.registerJob(jobGraph.getJobID(), jobGraph.getUserJarBlobKeys(),
+ jobGraph.getClasspaths());
+ } catch (Throwable t) {
+ throw new JobSubmissionException(jobGraph.getJobID(),
+ "Cannot set up the user code libraries: " + t.getMessage(), t);
+ }
+
+ userCodeLoader = libraryCacheManager.getClassLoader(jobGraph.getJobID());
+ if (userCodeLoader == null) {
+ throw new JobSubmissionException(jobGraph.getJobID(),
+ "The user code class loader could not be initialized.");
+ }
+
+ if (jobGraph.getNumberOfVertices() == 0) {
+ throw new JobSubmissionException(jobGraph.getJobID(), "The given job is empty");
+ }
+
+ final RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration =
+ jobGraph.getSerializedExecutionConfig()
+ .deserializeValue(userCodeLoader)
+ .getRestartStrategy();
+ if (restartStrategyConfiguration != null) {
+ restartStrategy = RestartStrategyFactory.createRestartStrategy(restartStrategyConfiguration);
+ } else {
+ restartStrategy = restartStrategyFactory.createRestartStrategy();
+ }
+
+ log.info("Using restart strategy {} for {} ({}).", restartStrategy, jobGraph.getName(), jobGraph.getJobID());
+
+ if (jobManagerMetricGroup != null) {
+ jobMetrics = jobManagerMetricGroup.addJob(jobGraph);
+ }
+ if (jobMetrics == null) {
+ jobMetrics = new UnregisteredMetricsGroup();
+ }
+
+ try {
+ checkpointRecoveryFactory = highAvailabilityServices.getCheckpointRecoveryFactory();
+ } catch (Exception e) {
+ log.error("Could not get the checkpoint recovery factory.", e);
+ throw new JobSubmissionException(jobGraph.getJobID(), "Could not get the checkpoint recovery factory.", e);
+ }
+
+ } catch (Throwable t) {
+ log.error("Failed to initializing job {} ({})", jobGraph.getName(), jobGraph.getJobID(), t);
+
+ libraryCacheManager.unregisterJob(jobGraph.getJobID());
+
+ if (t instanceof JobSubmissionException) {
+ throw (JobSubmissionException) t;
+ } else {
+ throw new JobSubmissionException(jobGraph.getJobID(), "Failed to initialize job " +
+ jobGraph.getName() + " (" + jobGraph.getJobID() + ")", t);
+ }
+ }
+ }
+
+ @Override
public void start() {
super.start();
-
- // register at the election once the JM starts
- registerAtElectionService();
}
+ @Override
+ public void shutDown() {
+ super.shutDown();
+
+ suspendJob(new Exception("JobManager is shutting down."));
+ }
//----------------------------------------------------------------------------------------------
- // JobMaster Leadership methods
+ // RPC methods
//----------------------------------------------------------------------------------------------
/**
- * Retrieves the election service and contend for the leadership.
+ * Start to run the job, runtime data structures like ExecutionGraph will be constructed now and checkpoint
+ * being recovered. After this, we will begin to schedule the job.
*/
- private void registerAtElectionService() {
- try {
- leaderElectionService = highAvailabilityServices.getJobMasterLeaderElectionService(jobID);
- leaderElectionService.start(new JobMasterLeaderContender());
- } catch (Exception e) {
- throw new RuntimeException("Fail to register at the election of JobMaster", e);
+ @RpcMethod
+ public void startJob() {
+ log.info("Starting job {} ({}).", jobGraph.getName(), jobGraph.getJobID());
+
+ if (executionGraph != null) {
+ executionGraph = new ExecutionGraph(
+ ExecutionContext$.MODULE$.fromExecutor(executionContext),
+ jobGraph.getJobID(),
+ jobGraph.getName(),
+ jobGraph.getJobConfiguration(),
+ jobGraph.getSerializedExecutionConfig(),
+ new FiniteDuration(timeout.getSize(), timeout.getUnit()),
+ restartStrategy,
+ jobGraph.getUserJarBlobKeys(),
+ jobGraph.getClasspaths(),
+ userCodeLoader,
+ jobMetrics);
+ } else {
+ // TODO: update last active time in JobInfo
}
- }
- /**
- * Start the execution when the leadership is granted.
- *
- * @param newLeaderSessionID The identifier of the new leadership session
- */
- public void grantJobMasterLeadership(final UUID newLeaderSessionID) {
- runAsync(new Runnable() {
- @Override
- public void run() {
- log.info("JobManager {} grants leadership with session id {}.", getAddress(), newLeaderSessionID);
+ try {
+ executionGraph.setScheduleMode(jobGraph.getScheduleMode());
+ executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling());
+
+ try {
+ executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph));
+ } catch (Exception e) {
+ log.warn("Cannot create JSON plan for job {} ({})", jobGraph.getJobID(), jobGraph.getName(), e);
+ executionGraph.setJsonPlan("{}");
+ }
- // The operation may be blocking, but since JM is idle before it grants the leadership, it's okay that
- // JM waits here for the operation's completeness.
- leaderSessionID = newLeaderSessionID;
- leaderElectionService.confirmLeaderSessionID(newLeaderSessionID);
+ // initialize the vertices that have a master initialization hook
+ // file output formats create directories here, input formats create splits
+ if (log.isDebugEnabled()) {
+ log.debug("Running initialization on master for job {} ({}).", jobGraph.getJobID(), jobGraph.getName());
+ }
+ for (JobVertex vertex : jobGraph.getVertices()) {
+ final String executableClass = vertex.getInvokableClassName();
+ if (executableClass == null || executableClass.length() == 0) {
+ throw new JobExecutionException(jobGraph.getJobID(),
+ "The vertex " + vertex.getID() + " (" + vertex.getName() + ") has no invokable class.");
+ }
+ if (vertex.getParallelism() == ExecutionConfig.PARALLELISM_AUTO_MAX) {
+ vertex.setParallelism(scheduler.getTotalNumberOfSlots());
+ }
+
+ try {
+ vertex.initializeOnMaster(userCodeLoader);
+ } catch (Throwable t) {
+ throw new JobExecutionException(jobGraph.getJobID(),
+ "Cannot initialize task '" + vertex.getName() + "': " + t.getMessage(), t);
+ }
+ }
- // TODO:: execute the job when the leadership is granted.
+ // topologically sort the job vertices and attach the graph to the existing one
+ final List<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources();
+ if (log.isDebugEnabled()) {
+ log.debug("Adding {} vertices from job graph {} ({}).", sortedTopology.size(),
+ jobGraph.getJobID(), jobGraph.getName());
}
- });
- }
+ executionGraph.attachJobGraph(sortedTopology);
- /**
- * Stop the execution when the leadership is revoked.
- */
- public void revokeJobMasterLeadership() {
- runAsync(new Runnable() {
- @Override
- public void run() {
- log.info("JobManager {} was revoked leadership.", getAddress());
+ if (log.isDebugEnabled()) {
+ log.debug("Successfully created execution graph from job graph {} ({}).",
+ jobGraph.getJobID(), jobGraph.getName());
+ }
- // TODO:: cancel the job's execution and notify all listeners
- cancelAndClearEverything(new Exception("JobManager is no longer the leader."));
+ final JobSnapshottingSettings snapshotSettings = jobGraph.getSnapshotSettings();
+ if (snapshotSettings != null) {
+ List<ExecutionJobVertex> triggerVertices = getExecutionJobVertexWithId(
+ executionGraph, snapshotSettings.getVerticesToTrigger());
+
+ List<ExecutionJobVertex> ackVertices = getExecutionJobVertexWithId(
+ executionGraph, snapshotSettings.getVerticesToAcknowledge());
+
+ List<ExecutionJobVertex> confirmVertices = getExecutionJobVertexWithId(
+ executionGraph, snapshotSettings.getVerticesToConfirm());
+
+ CompletedCheckpointStore completedCheckpoints = checkpointRecoveryFactory.createCheckpointStore(
+ jobGraph.getJobID(), userCodeLoader);
+
+ CheckpointIDCounter checkpointIdCounter = checkpointRecoveryFactory.createCheckpointIDCounter(
+ jobGraph.getJobID());
+
+ // Checkpoint stats tracker
+ boolean isStatsDisabled = configuration.getBoolean(
+ ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_DISABLE,
+ ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_DISABLE);
+
+ final CheckpointStatsTracker checkpointStatsTracker;
+ if (isStatsDisabled) {
+ checkpointStatsTracker = new DisabledCheckpointStatsTracker();
+ } else {
+ int historySize = configuration.getInteger(
+ ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE,
+ ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE);
+ checkpointStatsTracker = new SimpleCheckpointStatsTracker(historySize, ackVertices, jobMetrics);
+ }
+
+ executionGraph.enableSnapshotCheckpointing(
+ snapshotSettings.getCheckpointInterval(),
+ snapshotSettings.getCheckpointTimeout(),
+ snapshotSettings.getMinPauseBetweenCheckpoints(),
+ snapshotSettings.getMaxConcurrentCheckpoints(),
+ triggerVertices,
+ ackVertices,
+ confirmVertices,
+ checkpointIdCounter,
+ completedCheckpoints,
+ savepointStore,
+ checkpointStatsTracker);
+ }
+
+ // TODO: register this class to execution graph as job status change listeners
+
+ // TODO: register client as job / execution status change listeners if they are interested
+
+ /*
+ TODO: decide whether we should take the savepoint before recovery
+
+ if (isRecovery) {
+ // this is a recovery of a master failure (this master takes over)
+ executionGraph.restoreLatestCheckpointedState();
+ } else {
+ if (snapshotSettings != null) {
+ String savepointPath = snapshotSettings.getSavepointPath();
+ if (savepointPath != null) {
+ // got a savepoint
+ log.info("Starting job from savepoint {}.", savepointPath);
+
+ // load the savepoint as a checkpoint into the system
+ final CompletedCheckpoint savepoint = SavepointLoader.loadAndValidateSavepoint(
+ jobGraph.getJobID(), executionGraph.getAllVertices(), savepointStore, savepointPath);
+ executionGraph.getCheckpointCoordinator().getCheckpointStore().addCheckpoint(savepoint);
+
+ // Reset the checkpoint ID counter
+ long nextCheckpointId = savepoint.getCheckpointID() + 1;
+ log.info("Reset the checkpoint ID to " + nextCheckpointId);
+ executionGraph.getCheckpointCoordinator().getCheckpointIdCounter().setCount(nextCheckpointId);
- leaderSessionID = null;
+ executionGraph.restoreLatestCheckpointedState();
+ }
+ }
}
- });
- }
+ */
- /**
- * Handles error occurring in the leader election service
- *
- * @param exception Exception thrown in the leader election service
- */
- public void onJobMasterElectionError(final Exception exception) {
- runAsync(new Runnable() {
- @Override
- public void run() {
- log.error("Received an error from the LeaderElectionService.", exception);
+ } catch (Throwable t) {
+ log.error("Failed to start job {} ({})", jobGraph.getName(), jobGraph.getJobID(), t);
- // TODO:: cancel the job's execution and shutdown the JM
- cancelAndClearEverything(exception);
+ executionGraph.fail(t);
+ executionGraph = null;
- leaderSessionID = null;
+ final Throwable rt;
+ if (t instanceof JobExecutionException) {
+ rt = (JobExecutionException) t;
+ } else {
+ rt = new JobExecutionException(jobGraph.getJobID(),
+ "Failed to start job " + jobGraph.getJobID() + " (" + jobGraph.getName() + ")", t);
}
- });
+ // TODO: notify client about this failure
+
+ jobCompletionActions.jobFailed(rt);
+ return;
+ }
+
+ // start scheduling job in another thread
+ executionContext.execute(new Runnable() {
+ @Override
+ public void run() {
+ if (executionGraph != null) {
+ try {
+ executionGraph.scheduleForExecution(scheduler);
+ } catch (Throwable t) {
+ executionGraph.fail(t);
+ }
+ }
+ }
+ });
}
- //----------------------------------------------------------------------------------------------
- // RPC methods
- //----------------------------------------------------------------------------------------------
+ /**
+ * Suspending job, all the running tasks will be cancelled, and runtime data will be cleared.
+ *
+ * @param cause The reason of why this job been suspended.
+ */
+ @RpcMethod
+ public void suspendJob(final Throwable cause) {
+ if (executionGraph != null) {
+ executionGraph.suspend(cause);
+ executionGraph = null;
+ }
+ }
/**
* Updates the task execution state for a given task.
@@ -208,37 +472,26 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
//----------------------------------------------------------------------------------------------
/**
- * Cancel the current job and notify all listeners the job's cancellation.
+ * Converts JobVertexIDs to corresponding ExecutionJobVertexes
*
- * @param cause Cause for the cancelling.
+ * @param executionGraph The execution graph that holds the relationship
+ * @param vertexIDs The vertexIDs need to be converted
+ * @return The corresponding ExecutionJobVertexes
+ * @throws JobExecutionException
*/
- private void cancelAndClearEverything(Throwable cause) {
- // currently, nothing to do here
- }
-
- // ------------------------------------------------------------------------
- // Utility classes
- // ------------------------------------------------------------------------
- private class JobMasterLeaderContender implements LeaderContender {
-
- @Override
- public void grantLeadership(UUID leaderSessionID) {
- JobMaster.this.grantJobMasterLeadership(leaderSessionID);
- }
-
- @Override
- public void revokeLeadership() {
- JobMaster.this.revokeJobMasterLeadership();
- }
-
- @Override
- public String getAddress() {
- return JobMaster.this.getAddress();
- }
-
- @Override
- public void handleError(Exception exception) {
- onJobMasterElectionError(exception);
+ private static List<ExecutionJobVertex> getExecutionJobVertexWithId(
+ final ExecutionGraph executionGraph, final List<JobVertexID> vertexIDs)
+ throws JobExecutionException
+ {
+ final List<ExecutionJobVertex> ret = new ArrayList<>(vertexIDs.size());
+ for (JobVertexID vertexID : vertexIDs) {
+ final ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(vertexID);
+ if (executionJobVertex == null) {
+ throw new JobExecutionException(executionGraph.getJobID(),
+ "The snapshot checkpointing settings refer to non-existent vertex " + vertexID);
+ }
+ ret.add(executionJobVertex);
}
+ return ret;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d9c4814c/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index 86bf17c..b281ea8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -29,6 +29,19 @@ import org.apache.flink.runtime.taskmanager.TaskExecutionState;
public interface JobMasterGateway extends RpcGateway {
/**
+ * Making this job begins to run.
+ */
+ void startJob();
+
+ /**
+ * Suspending job, all the running tasks will be cancelled, and runtime status will be cleared. Should re-submit
+ * the job before restarting it.
+ *
+ * @param cause The reason of why this job been suspended.
+ */
+ void suspendJob(final Throwable cause);
+
+ /**
* Updates the task execution state for a given task.
*
* @param taskExecutionState New task execution state for a given task
http://git-wip-us.apache.org/repos/asf/flink/blob/d9c4814c/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java
new file mode 100644
index 0000000..792bfd5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.rpc.RpcService;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The dispatcher that runs in the mini cluster, waits for jobs, and starts job masters
+ * upon receiving jobs.
+ */
+public class MiniClusterJobDispatcher {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MiniClusterJobDispatcher.class);
+
+ // ------------------------------------------------------------------------
+
+ /** lock to ensure that this dispatcher executes only one job at a time */
+ private final Object lock = new Object();
+
+ /** the configuration with which the mini cluster was started */
+ private final Configuration configuration;
+
+ /** the RPC service to use by the job managers */
+ private final RpcService rpcService;
+
+ /** services for discovery, leader election, and recovery */
+ private final HighAvailabilityServices haServices;
+
+ /** al the services that the JobManager needs, such as BLOB service, factories, etc */
+ private final JobManagerServices jobManagerServices;
+
+ /** The number of JobManagers to launch (more than one simulates a high-availability setup) */
+ private final int numJobManagers;
+
+ /** The runner for the job and master. non-null if a job is currently running */
+ private volatile JobManagerRunner[] runners;
+
+ /** flag marking the dispatcher as hut down */
+ private volatile boolean shutdown;
+
+
+ /**
+ * Starts a mini cluster job dispatcher.
+ *
+ * <p>The dispatcher kicks off one JobManager per job, a behavior similar to a
+ * non-highly-available setup.
+ *
+ * @param config The configuration of the mini cluster
+ * @param haServices Access to the discovery, leader election, and recovery services
+ *
+ * @throws Exception Thrown, if the services for the JobMaster could not be started.
+ */
+ public MiniClusterJobDispatcher(
+ Configuration config,
+ RpcService rpcService,
+ HighAvailabilityServices haServices) throws Exception {
+ this(config, rpcService, haServices, 1);
+ }
+
+ /**
+ * Starts a mini cluster job dispatcher.
+ *
+ * <p>The dispatcher may kick off more than one JobManager per job, thus simulating
+ * a highly-available setup.
+ *
+ * @param config The configuration of the mini cluster
+ * @param haServices Access to the discovery, leader election, and recovery services
+ * @param numJobManagers The number of JobMasters to start for each job.
+ *
+ * @throws Exception Thrown, if the services for the JobMaster could not be started.
+ */
+ public MiniClusterJobDispatcher(
+ Configuration config,
+ RpcService rpcService,
+ HighAvailabilityServices haServices,
+ int numJobManagers) throws Exception {
+
+ checkArgument(numJobManagers >= 1);
+ this.configuration = checkNotNull(config);
+ this.rpcService = checkNotNull(rpcService);
+ this.haServices = checkNotNull(haServices);
+ this.numJobManagers = numJobManagers;
+
+ LOG.info("Creating JobMaster services");
+ this.jobManagerServices = JobManagerServices.fromConfiguration(config);
+ }
+
+ // ------------------------------------------------------------------------
+ // life cycle
+ // ------------------------------------------------------------------------
+
+ /**
+ * Shuts down the mini cluster dispatcher. If a job is currently running, that job will be
+ * terminally failed.
+ */
+ public void shutdown() {
+ synchronized (lock) {
+ if (!shutdown) {
+ shutdown = true;
+
+ LOG.info("Shutting down the dispatcher");
+
+ // in this shutdown code we copy the references to the stack first,
+ // to avoid concurrent modification
+
+ JobManagerRunner[] runners = this.runners;
+ if (runners != null) {
+ this.runners = null;
+
+ Exception shutdownException = new Exception("The MiniCluster is shutting down");
+ for (JobManagerRunner runner : runners) {
+ runner.shutdown(shutdownException);
+ }
+ }
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // submitting jobs
+ // ------------------------------------------------------------------------
+
+ /**
+ * This method executes a job in detached mode. The method returns immediately after the job
+ * has been added to the
+ *
+ * @param job The Flink job to execute
+ *
+ * @throws JobExecutionException Thrown if anything went amiss during initial job launch,
+ * or if the job terminally failed.
+ */
+ public void runDetached(JobGraph job) throws JobExecutionException {
+ checkNotNull(job);
+
+ LOG.info("Received job for detached execution {} ({})", job.getName(), job.getJobID());
+
+ synchronized (lock) {
+ checkState(!shutdown, "mini cluster is shut down");
+ checkState(runners == null, "mini cluster can only execute one job at a time");
+
+ OnCompletionActions onJobCompletion = new DetachedFinalizer(numJobManagers);
+
+ this.runners = startJobRunners(job, onJobCompletion);
+ }
+ }
+
+ /**
+ * This method runs a job in blocking mode. The method returns only after the job
+ * completed successfully, or after it failed terminally.
+ *
+ * @param job The Flink job to execute
+ * @return The result of the job execution
+ *
+ * @throws JobExecutionException Thrown if anything went amiss during initial job lauch,
+ * or if the job terminally failed.
+ */
+ public JobExecutionResult runJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException {
+ checkNotNull(job);
+
+ LOG.info("Received job for blocking execution {} ({})", job.getName(), job.getJobID());
+ final BlockingJobSync onJobCompletion = new BlockingJobSync(job.getJobID(), numJobManagers);
+
+ synchronized (lock) {
+ checkState(!shutdown, "mini cluster is shut down");
+ checkState(runners == null, "mini cluster can only execute one job at a time");
+
+ this.runners = startJobRunners(job, onJobCompletion);
+ }
+
+ try {
+ return onJobCompletion.getResult();
+ }
+ finally {
+ // always clear the status for the next job
+ runners = null;
+ }
+ }
+
+ private JobManagerRunner[] startJobRunners(JobGraph job, OnCompletionActions onCompletion) throws JobExecutionException {
+ LOG.info("Starting {} JobMaster(s) for job {} ({})", numJobManagers, job.getName(), job.getJobID());
+
+ JobManagerRunner[] runners = new JobManagerRunner[numJobManagers];
+ for (int i = 0; i < numJobManagers; i++) {
+ try {
+ runners[i] = new JobManagerRunner(job, configuration,
+ rpcService, haServices, jobManagerServices, onCompletion);
+ runners[i].start();
+ }
+ catch (Throwable t) {
+ // shut down all the ones so far
+ Exception shutdownCause = new Exception("Failed to properly start all mini cluster JobManagers", t);
+
+ for (int k = 0; k <= i; k++) {
+ try {
+ if (runners[i] != null) {
+ runners[i].shutdown(shutdownCause);
+ }
+ } catch (Throwable ignored) {
+ // silent shutdown
+ }
+ }
+
+ throw new JobExecutionException(job.getJobID(), "Could not start the JobManager(s) for the job", t);
+ }
+ }
+
+ return runners;
+ }
+
+ // ------------------------------------------------------------------------
+ // test methods to simulate job master failures
+ // ------------------------------------------------------------------------
+
+ public void killJobMaster(int which) {
+ checkArgument(which >= 0 && which < numJobManagers, "no such job master");
+ checkState(!shutdown, "mini cluster is shut down");
+
+ JobManagerRunner[] runners = this.runners;
+ checkState(runners != null, "mini cluster it not executing a job right now");
+
+ runners[which].shutdown(new Throwable("kill JobManager"));
+ }
+
+ // ------------------------------------------------------------------------
+ // utility classes
+ // ------------------------------------------------------------------------
+
+ /**
+ * Simple class that waits for all runners to have reported that they are done.
+ * In the case of a high-availability test setup, there may be multiple runners.
+ * After that, it marks the mini cluster as ready to receive new jobs.
+ */
+ private class DetachedFinalizer implements OnCompletionActions {
+
+ private final AtomicInteger numJobManagersToWaitFor;
+
+ private DetachedFinalizer(int numJobManagersToWaitFor) {
+ this.numJobManagersToWaitFor = new AtomicInteger(numJobManagersToWaitFor);
+ }
+
+ @Override
+ public void jobFinished(JobExecutionResult result) {
+ decrementCheckAndCleanup();
+ }
+
+ @Override
+ public void jobFailed(Throwable cause) {
+ decrementCheckAndCleanup();
+ }
+
+ @Override
+ public void jobFinishedByOther() {
+ decrementCheckAndCleanup();
+ }
+
+ @Override
+ public void onFatalError(Throwable exception) {
+ decrementCheckAndCleanup();
+ }
+
+ private void decrementCheckAndCleanup() {
+ if (numJobManagersToWaitFor.decrementAndGet() == 0) {
+ MiniClusterJobDispatcher.this.runners = null;
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * This class is used to sync on blocking jobs across multiple runners.
+ * Only after all runners reported back that they are finished, the
+ * result will be released.
+ *
+ * That way it is guaranteed that after the blocking job submit call returns,
+ * the dispatcher is immediately free to accept another job.
+ */
+ private static class BlockingJobSync implements OnCompletionActions {
+
+ private final JobID jobId;
+
+ private final CountDownLatch jobMastersToWaitFor;
+
+ private volatile Throwable jobException;
+
+ private volatile Throwable runnerException;
+
+ private volatile JobExecutionResult result;
+
+ BlockingJobSync(JobID jobId, int numJobMastersToWaitFor) {
+ this.jobId = jobId;
+ this.jobMastersToWaitFor = new CountDownLatch(numJobMastersToWaitFor);
+ }
+
+ @Override
+ public void jobFinished(JobExecutionResult jobResult) {
+ this.result = jobResult;
+ jobMastersToWaitFor.countDown();
+ }
+
+ @Override
+ public void jobFailed(Throwable cause) {
+ jobException = cause;
+ jobMastersToWaitFor.countDown();
+ }
+
+ @Override
+ public void jobFinishedByOther() {
+ this.jobMastersToWaitFor.countDown();
+ }
+
+ @Override
+ public void onFatalError(Throwable exception) {
+ if (runnerException == null) {
+ runnerException = exception;
+ }
+ }
+
+ public JobExecutionResult getResult() throws JobExecutionException, InterruptedException {
+ jobMastersToWaitFor.await();
+
+ final Throwable jobFailureCause = this.jobException;
+ final Throwable runnerException = this.runnerException;
+ final JobExecutionResult result = this.result;
+
+ // (1) we check if teh job terminated with an exception
+ // (2) we check whether the job completed successfully
+ // (3) we check if we have exceptions from the JobManagers. the job may still have
+ // completed successfully in that case, if multiple JobMasters were running
+ // and other took over. only if all encounter a fatal error, the job cannot finish
+
+ if (jobFailureCause != null) {
+ if (jobFailureCause instanceof JobExecutionException) {
+ throw (JobExecutionException) jobFailureCause;
+ }
+ else {
+ throw new JobExecutionException(jobId, "The job execution failed", jobFailureCause);
+ }
+ }
+ else if (result != null) {
+ return result;
+ }
+ else if (runnerException != null) {
+ throw new JobExecutionException(jobId,
+ "The job execution failed because all JobManagers encountered fatal errors", runnerException);
+ }
+ else {
+ throw new IllegalStateException("Bug: Job finished with neither error nor result.");
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/d9c4814c/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FatalErrorHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FatalErrorHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FatalErrorHandler.java
new file mode 100644
index 0000000..7721117
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FatalErrorHandler.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+public interface FatalErrorHandler {
+
+ void onFatalError(Throwable exception);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/d9c4814c/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index cf709c8..9e3c3b9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.taskexecutor;
import akka.actor.ActorSystem;
import com.typesafe.config.Config;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.LocalConnectionManager;
@@ -28,6 +29,7 @@ import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats;
import org.apache.flink.runtime.query.netty.KvStateServer;
@@ -340,6 +342,16 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
public LeaderElectionService getJobMasterLeaderElectionService(JobID jobID) throws Exception {
return null;
}
+
+ @Override
+ public CheckpointRecoveryFactory getCheckpointRecoveryFactory() throws Exception {
+ return null;
+ }
+
+ @Override
+ public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception {
+ return null;
+ }
};
// start all the TaskManager services (network stack, library cache, ...)
http://git-wip-us.apache.org/repos/asf/flink/blob/d9c4814c/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
index 2ac43be..1a5450d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
@@ -19,10 +19,13 @@
package org.apache.flink.runtime.highavailability;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.hadoop.shaded.org.jboss.netty.util.internal.ConcurrentHashMap;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import java.util.concurrent.ConcurrentHashMap;
+
/**
* A variant of the HighAvailabilityServices for testing. Each individual service can be set
* to an arbitrary implementation, such as a mock or default service.
@@ -37,6 +40,9 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
private volatile LeaderElectionService resourceManagerLeaderElectionService;
+ private volatile CheckpointRecoveryFactory checkpointRecoveryFactory;
+
+ private volatile SubmittedJobGraphStore submittedJobGraphStore;
// ------------------------------------------------------------------------
// Setters for mock / testing implementations
@@ -58,6 +64,14 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
this.resourceManagerLeaderElectionService = leaderElectionService;
}
+ public void setCheckpointRecoveryFactory(CheckpointRecoveryFactory checkpointRecoveryFactory) {
+ this.checkpointRecoveryFactory = checkpointRecoveryFactory;
+ }
+
+ public void setSubmittedJobGraphStore(SubmittedJobGraphStore submittedJobGraphStore) {
+ this.submittedJobGraphStore = submittedJobGraphStore;
+ }
+
// ------------------------------------------------------------------------
// HA Services Methods
// ------------------------------------------------------------------------
@@ -103,4 +117,27 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
throw new IllegalStateException("ResourceManagerLeaderElectionService has not been set");
}
}
+
+ @Override
+ public CheckpointRecoveryFactory getCheckpointRecoveryFactory() throws Exception {
+ CheckpointRecoveryFactory factory = checkpointRecoveryFactory;
+
+ if (factory != null) {
+ return factory;
+ } else {
+ throw new IllegalStateException("CheckpointRecoveryFactory has not been set");
+ }
+ }
+
+ @Override
+ public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception {
+ SubmittedJobGraphStore store = submittedJobGraphStore;
+
+ if (store != null) {
+ return store;
+ } else {
+ throw new IllegalStateException("SubmittedJobGraphStore has not been set");
+
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d9c4814c/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
new file mode 100644
index 0000000..dc3b5fd
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.UUID;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(JobManagerRunner.class)
+public class JobManagerRunnerMockTest {
+
+ private JobManagerRunner runner;
+
+ private JobMaster jobManager;
+
+ private JobMasterGateway jobManagerGateway;
+
+ private LeaderElectionService leaderElectionService;
+
+ private TestingOnCompletionActions jobCompletion;
+
+ @Before
+ public void setUp() throws Exception {
+ jobManager = mock(JobMaster.class);
+ jobManagerGateway = mock(JobMasterGateway.class);
+ when(jobManager.getSelf()).thenReturn(jobManagerGateway);
+
+ PowerMockito.whenNew(JobMaster.class).withAnyArguments().thenReturn(jobManager);
+
+ jobCompletion = new TestingOnCompletionActions();
+
+ leaderElectionService = mock(LeaderElectionService.class);
+ when(leaderElectionService.hasLeadership()).thenReturn(true);
+
+ HighAvailabilityServices haServices = mock(HighAvailabilityServices.class);
+ when(haServices.getJobMasterLeaderElectionService(any(JobID.class))).thenReturn(leaderElectionService);
+
+ runner = PowerMockito.spy(new JobManagerRunner(
+ new JobGraph("test"),
+ mock(Configuration.class),
+ mock(RpcService.class),
+ haServices,
+ mock(JobManagerServices.class),
+ jobCompletion));
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ @Test
+ public void testStartAndShutdown() throws Exception {
+ runner.start();
+ verify(jobManager).init();
+ verify(jobManager).start();
+ verify(leaderElectionService).start(runner);
+
+ assertTrue(!jobCompletion.isJobFinished());
+ assertTrue(!jobCompletion.isJobFailed());
+
+ runner.shutdown();
+ verify(leaderElectionService).stop();
+ verify(jobManager).shutDown();
+ }
+
+ @Test
+ public void testShutdownBeforeGrantLeadership() throws Exception {
+ runner.start();
+ verify(jobManager).init();
+ verify(jobManager).start();
+ verify(leaderElectionService).start(runner);
+
+ runner.shutdown();
+ verify(leaderElectionService).stop();
+ verify(jobManager).shutDown();
+
+ assertTrue(!jobCompletion.isJobFinished());
+ assertTrue(!jobCompletion.isJobFailed());
+
+ runner.grantLeadership(UUID.randomUUID());
+ assertTrue(!jobCompletion.isJobFinished());
+ assertTrue(!jobCompletion.isJobFailed());
+
+ }
+
+ @Test
+ public void testJobFinishedByOtherBeforeGrantLeadership() throws Exception {
+ runner.start();
+
+ when(runner.isJobFinishedByOthers()).thenReturn(true);
+ runner.grantLeadership(UUID.randomUUID());
+
+ // runner should shutdown automatic and informed the job completion
+ verify(leaderElectionService).stop();
+ verify(jobManager).shutDown();
+
+ assertTrue(jobCompletion.isJobFinished());
+ assertTrue(jobCompletion.isJobFinishedByOther());
+ }
+
+ @Test
+ public void testJobFinished() throws Exception {
+ runner.start();
+
+ runner.grantLeadership(UUID.randomUUID());
+ verify(jobManagerGateway).startJob();
+ assertTrue(!jobCompletion.isJobFinished());
+
+ // runner been told by JobManager that job is finished
+ runner.jobFinished(mock(JobExecutionResult.class));
+
+ assertTrue(jobCompletion.isJobFinished());
+ assertFalse(jobCompletion.isJobFinishedByOther());
+ verify(leaderElectionService).stop();
+ verify(jobManager).shutDown();
+ assertTrue(runner.isShutdown());
+ }
+
+ @Test
+ public void testJobFailed() throws Exception {
+ runner.start();
+
+ runner.grantLeadership(UUID.randomUUID());
+ verify(jobManagerGateway).startJob();
+ assertTrue(!jobCompletion.isJobFinished());
+
+ // runner been told by JobManager that job is failed
+ runner.jobFailed(new Exception("failed manually"));
+
+ assertTrue(jobCompletion.isJobFailed());
+ verify(leaderElectionService).stop();
+ verify(jobManager).shutDown();
+ assertTrue(runner.isShutdown());
+ }
+
+ @Test
+ public void testLeadershipRevoked() throws Exception {
+ runner.start();
+
+ runner.grantLeadership(UUID.randomUUID());
+ verify(jobManagerGateway).startJob();
+ assertTrue(!jobCompletion.isJobFinished());
+
+ runner.revokeLeadership();
+ verify(jobManagerGateway).suspendJob(any(Throwable.class));
+ assertFalse(runner.isShutdown());
+ }
+
+ @Test
+ public void testRegainLeadership() throws Exception {
+ runner.start();
+
+ runner.grantLeadership(UUID.randomUUID());
+ verify(jobManagerGateway).startJob();
+ assertTrue(!jobCompletion.isJobFinished());
+
+ runner.revokeLeadership();
+ verify(jobManagerGateway).suspendJob(any(Throwable.class));
+ assertFalse(runner.isShutdown());
+
+ runner.grantLeadership(UUID.randomUUID());
+ verify(jobManagerGateway, times(2)).startJob();
+ }
+
+ private static class TestingOnCompletionActions implements OnCompletionActions {
+
+ private volatile JobExecutionResult result;
+
+ private volatile Throwable failedCause;
+
+ private volatile boolean finishedByOther;
+
+ @Override
+ public void jobFinished(JobExecutionResult result) {
+ checkArgument(!isJobFinished(), "job finished already");
+ checkArgument(!isJobFailed(), "job failed already");
+
+ this.result = result;
+ }
+
+ @Override
+ public void jobFailed(Throwable cause) {
+ checkArgument(!isJobFinished(), "job finished already");
+ checkArgument(!isJobFailed(), "job failed already");
+
+ this.failedCause = cause;
+ }
+
+ @Override
+ public void jobFinishedByOther() {
+ checkArgument(!isJobFinished(), "job finished already");
+ checkArgument(!isJobFailed(), "job failed already");
+
+ this.finishedByOther = true;
+ }
+
+ @Override
+ public void onFatalError(Throwable exception) {
+ jobFailed(exception);
+ }
+
+ boolean isJobFinished() {
+ return result != null || finishedByOther;
+ }
+
+ boolean isJobFinishedByOther() {
+ return finishedByOther;
+ }
+
+ boolean isJobFailed() {
+ return failedCause != null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/d9c4814c/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
index 6363662..e05c8d8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
@@ -19,23 +19,21 @@
package org.apache.flink.runtime.rpc;
import akka.actor.ActorSystem;
-import akka.util.Timeout;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
-import org.junit.AfterClass;
import org.junit.Test;
import scala.Option;
import scala.Tuple2;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -57,19 +55,20 @@ public class RpcConnectionTest {
// we start the RPC service with a very long timeout to ensure that the test
// can only pass if the connection problem is not recognized merely via a timeout
- rpcService = new AkkaRpcService(actorSystem, new Timeout(10000000, TimeUnit.SECONDS));
+ rpcService = new AkkaRpcService(actorSystem, Time.of(10000000, TimeUnit.SECONDS));
Future<TaskExecutorGateway> future = rpcService.connect("foo.bar.com.test.invalid", TaskExecutorGateway.class);
- Await.result(future, new FiniteDuration(10000000, TimeUnit.SECONDS));
+ future.get(10000000, TimeUnit.SECONDS);
fail("should never complete normally");
}
catch (TimeoutException e) {
fail("should not fail with a generic timeout exception");
}
- catch (RpcConnectionException e) {
+ catch (ExecutionException e) {
// that is what we want
- assertTrue("wrong error message", e.getMessage().contains("foo.bar.com.test.invalid"));
+ assertTrue(e.getCause() instanceof RpcConnectionException);
+ assertTrue("wrong error message", e.getCause().getMessage().contains("foo.bar.com.test.invalid"));
}
catch (Throwable t) {
fail("wrong exception: " + t);