You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/10/02 21:43:07 UTC
flink git commit: [FLINK-4657] Implement HighAvailabilityServices
based on ZooKeeper
Repository: flink
Updated Branches:
refs/heads/flip-6 626e67276 -> 96a5cb2dc
[FLINK-4657] Implement HighAvailabilityServices based on ZooKeeper
[FLINK-4657] Implement a few rpc calls for JobMaster
[FLINK-4657][cluster management] Address review comments
[FLINK-4657][cluster management] Throw exception when error occurred when request input split
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/96a5cb2d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/96a5cb2d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/96a5cb2d
Branch: refs/heads/flip-6
Commit: 96a5cb2dc3032448571ebbb8989dfb42f1908009
Parents: 626e672
Author: Kurt Young <yk...@gmail.com>
Authored: Mon Sep 26 10:59:16 2016 +0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Sun Oct 2 23:42:12 2016 +0200
----------------------------------------------------------------------
.../HighAvailabilityServices.java | 7 +-
.../runtime/highavailability/NonHaServices.java | 4 +-
.../highavailability/ZookeeperHaServices.java | 82 ++++++++++
.../StandaloneSubmittedJobGraphStore.java | 5 +
.../jobmanager/SubmittedJobGraphStore.java | 8 +
.../ZooKeeperSubmittedJobGraphStore.java | 7 +
.../runtime/jobmaster/JobManagerRunner.java | 18 +--
.../flink/runtime/jobmaster/JobMaster.java | 161 ++++++++++++++++++-
.../runtime/jobmaster/JobMasterGateway.java | 54 ++++++-
.../jobmaster/message/NextInputSplit.java | 39 +++++
.../jobmaster/message/PartitionState.java | 69 ++++++++
.../resourcemanager/ResourceManager.java | 6 +-
.../flink/runtime/util/ZooKeeperUtils.java | 82 ++++++++--
.../TestingHighAvailabilityServices.java | 20 +--
.../jobmanager/JobManagerHARecoveryTest.java | 3 +-
.../jobmaster/JobManagerRunnerMockTest.java | 11 +-
.../slotmanager/SlotProtocolTest.java | 2 +-
17 files changed, 520 insertions(+), 58 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/96a5cb2d/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
index d67e927..a26886a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
* <li>JobManager leader election and leader retrieval</li>
* <li>Persistence for checkpoint metadata</li>
* <li>Registering the latest completed checkpoint(s)</li>
+ * <li>Persistence for submitted job graph</li>
* </ul>
*/
public interface HighAvailabilityServices {
@@ -48,12 +49,10 @@ public interface HighAvailabilityServices {
* @return
* @throws Exception
*/
- LeaderRetrievalService getJobMasterLeaderRetriever(JobID jobID) throws Exception;
+ LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) throws Exception;
/**
* Gets the leader election service for the cluster's resource manager.
- * @return
- * @throws Exception
*/
LeaderElectionService getResourceManagerLeaderElectionService() throws Exception;
@@ -62,7 +61,7 @@ public interface HighAvailabilityServices {
*
* @param jobID The identifier of the job running the election.
*/
- LeaderElectionService getJobMasterLeaderElectionService(JobID jobID) throws Exception;
+ LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) throws Exception;
/**
* Gets the checkpoint recovery factory for the job manager
http://git-wip-us.apache.org/repos/asf/flink/blob/96a5cb2d/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
index a2c9cc4..2c6295c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
@@ -79,7 +79,7 @@ public class NonHaServices implements HighAvailabilityServices {
}
@Override
- public LeaderRetrievalService getJobMasterLeaderRetriever(JobID jobID) throws Exception {
+ public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) throws Exception {
return new StandaloneLeaderRetrievalService(jobMastersAddress.get(jobID), new UUID(0, 0));
}
@@ -89,7 +89,7 @@ public class NonHaServices implements HighAvailabilityServices {
}
@Override
- public LeaderElectionService getJobMasterLeaderElectionService(JobID jobID) throws Exception {
+ public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) throws Exception {
return new StandaloneLeaderElectionService();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/96a5cb2d/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
new file mode 100644
index 0000000..d26b668
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointRecoveryFactory;
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
+
+/**
+ * An implementation of the {@link HighAvailabilityServices} with zookeeper.
+ */
+public class ZookeeperHaServices implements HighAvailabilityServices {
+
+ private static final String DEFAULT_RESOURCE_MANAGER_PATH_SUFFIX = "/resource-manager";
+
+ /** The ZooKeeper client to use */
+ private final CuratorFramework client;
+
+ /** The runtime configuration */
+ private final Configuration configuration;
+
+ public ZookeeperHaServices(final CuratorFramework client, final Configuration configuration) {
+ this.client = client;
+ this.configuration = configuration;
+ }
+
+ @Override
+ public LeaderRetrievalService getResourceManagerLeaderRetriever() throws Exception {
+ return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, DEFAULT_RESOURCE_MANAGER_PATH_SUFFIX);
+ }
+
+ @Override
+ public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) throws Exception {
+ return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, getPathSuffixForJob(jobID));
+ }
+
+ @Override
+ public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception {
+ return ZooKeeperUtils.createLeaderElectionService(client, configuration, DEFAULT_RESOURCE_MANAGER_PATH_SUFFIX);
+ }
+
+ @Override
+ public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) throws Exception {
+ return ZooKeeperUtils.createLeaderElectionService(client, configuration, getPathSuffixForJob(jobID));
+ }
+
+ @Override
+ public CheckpointRecoveryFactory getCheckpointRecoveryFactory() throws Exception {
+ return new ZooKeeperCheckpointRecoveryFactory(client, configuration);
+ }
+
+ @Override
+ public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception {
+ return ZooKeeperUtils.createSubmittedJobGraphs(client, configuration);
+ }
+
+ private static String getPathSuffixForJob(final JobID jobID) {
+ return String.format("/job-managers/%s", jobID);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/96a5cb2d/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
index 3041cde..00df935 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
@@ -62,4 +62,9 @@ public class StandaloneSubmittedJobGraphStore implements SubmittedJobGraphStore
public List<SubmittedJobGraph> recoverJobGraphs() throws Exception {
return Collections.emptyList();
}
+
+ @Override
+ public boolean contains(JobID jobId) throws Exception {
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/96a5cb2d/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java
index bd628cd..4d544ae 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java
@@ -64,6 +64,14 @@ public interface SubmittedJobGraphStore {
void removeJobGraph(JobID jobId) throws Exception;
/**
+ * Check whether the given {@link JobID} is exist.
+ *
+ * <p>It's also a flag indicates whether we should recover this job before we can do anything else, since all
+ * global terminated job will be removed from this store.
+ */
+ boolean contains(final JobID jobId) throws Exception;
+
+ /**
* A listener for {@link SubmittedJobGraph} instances. This is used to react to races between
* multiple running {@link SubmittedJobGraphStore} instances (on multiple job managers).
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/96a5cb2d/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
index ec05f1e..92093c5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
@@ -266,6 +266,13 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
}
}
+ @Override
+ public boolean contains(JobID jobId) throws Exception {
+ checkNotNull(jobId, "Job ID");
+ String path = getPathForJob(jobId);
+ return jobGraphsInZooKeeper.exists(path) != -1;
+ }
+
/**
* Monitors ZooKeeper for changes.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/96a5cb2d/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index 6944d85..a096932 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -21,20 +21,18 @@ package org.apache.flink.runtime.jobmaster;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.leaderelection.LeaderContender;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.rpc.RpcService;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.UUID;
-import java.util.concurrent.Executor;
/**
* The runner for the job manager. It deals with job level leader election and make underlying job manager
@@ -52,11 +50,8 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions {
private final OnCompletionActions toNotify;
- /** The execution context which is used to execute futures */
- private final Executor executionContext;
-
- // TODO: use this to decide whether the job is finished by other
- private final CheckpointRecoveryFactory checkpointRecoveryFactory;
+ /** Used to check whether a job needs to be run */
+ private final SubmittedJobGraphStore submittedJobGraphStore;
/** Leader election for this job */
private final LeaderElectionService leaderElectionService;
@@ -87,9 +82,8 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions {
{
this.jobGraph = jobGraph;
this.toNotify = toNotify;
- this.executionContext = rpcService.getExecutor();
- this.checkpointRecoveryFactory = haServices.getCheckpointRecoveryFactory();
- this.leaderElectionService = haServices.getJobMasterLeaderElectionService(jobGraph.getJobID());
+ this.submittedJobGraphStore = haServices.getSubmittedJobGraphStore();
+ this.leaderElectionService = haServices.getJobManagerLeaderElectionService(jobGraph.getJobID());
this.jobManager = new JobMaster(
jobGraph, configuration, rpcService, haServices,
@@ -271,7 +265,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions {
@VisibleForTesting
boolean isJobFinishedByOthers() {
- // TODO
+ // TODO: Fix
return false;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/96a5cb2d/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 1e01c55..64619c7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -24,6 +24,8 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
@@ -35,23 +37,32 @@ import org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.stats.SimpleCheckpointStatsTracker;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.client.JobSubmissionException;
+import org.apache.flink.runtime.client.SerializedJobExecutionResult;
+import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.instance.Slot;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
import org.apache.flink.runtime.jobmanager.OnCompletionActions;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import org.apache.flink.runtime.jobmaster.message.NextInputSplit;
+import org.apache.flink.runtime.jobmaster.message.PartitionState;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.registration.RegisteredRpcConnection;
import org.apache.flink.runtime.registration.RegistrationResponse;
@@ -61,13 +72,18 @@ import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcMethod;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.util.SerializedThrowable;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
import scala.concurrent.ExecutionContext$;
import scala.concurrent.duration.FiniteDuration;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Executor;
@@ -491,9 +507,12 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
* @return Acknowledge the task execution state update
*/
@RpcMethod
- public Acknowledge updateTaskExecutionState(TaskExecutionState taskExecutionState) {
- System.out.println("TaskExecutionState: " + taskExecutionState);
- return Acknowledge.get();
+ public boolean updateTaskExecutionState(final TaskExecutionState taskExecutionState) {
+ if (taskExecutionState == null) {
+ return false;
+ } else {
+ return executionGraph.updateState(taskExecutionState);
+ }
}
//----------------------------------------------------------------------------------------------\u2028
@@ -511,6 +530,140 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
});
}
+ @RpcMethod
+ public NextInputSplit requestNextInputSplit(
+ final JobVertexID vertexID,
+ final ExecutionAttemptID executionAttempt) throws Exception
+ {
+ final Execution execution = executionGraph.getRegisteredExecutions().get(executionAttempt);
+ if (execution == null) {
+ // can happen when JobManager had already unregistered this execution upon on task failure,
+ // but TaskManager get some delay to aware of that situation
+ if (log.isDebugEnabled()) {
+ log.debug("Can not find Execution for attempt {}.", executionAttempt);
+ }
+ // but we should TaskManager be aware of this
+ throw new Exception("Can not find Execution for attempt " + executionAttempt);
+ }
+
+ final ExecutionJobVertex vertex = executionGraph.getJobVertex(vertexID);
+ if (vertex == null) {
+ log.error("Cannot find execution vertex for vertex ID {}.", vertexID);
+ throw new Exception("Cannot find execution vertex for vertex ID " + vertexID);
+ }
+
+ final InputSplitAssigner splitAssigner = vertex.getSplitAssigner();
+ if (splitAssigner == null) {
+ log.error("No InputSplitAssigner for vertex ID {}.", vertexID);
+ throw new Exception("No InputSplitAssigner for vertex ID " + vertexID);
+ }
+
+ final Slot slot = execution.getAssignedResource();
+ final int taskId = execution.getVertex().getParallelSubtaskIndex();
+ final String host = slot != null ? slot.getTaskManagerLocation().getHostname() : null;
+ final InputSplit nextInputSplit = splitAssigner.getNextInputSplit(host, taskId);
+
+ if (log.isDebugEnabled()) {
+ log.debug("Send next input split {}.", nextInputSplit);
+ }
+
+ try {
+ final byte[] serializedInputSplit = InstantiationUtil.serializeObject(nextInputSplit);
+ return new NextInputSplit(serializedInputSplit);
+ } catch (Exception ex) {
+ log.error("Could not serialize the next input split of class {}.", nextInputSplit.getClass(), ex);
+ IOException reason = new IOException("Could not serialize the next input split of class " +
+ nextInputSplit.getClass() + ".", ex);
+ vertex.fail(reason);
+ throw reason;
+ }
+ }
+
+ @RpcMethod
+ public PartitionState requestPartitionState(
+ final ResultPartitionID partitionId,
+ final ExecutionAttemptID taskExecutionId,
+ final IntermediateDataSetID taskResultId)
+ {
+ final Execution execution = executionGraph.getRegisteredExecutions().get(partitionId.getProducerId());
+ final ExecutionState state = execution != null ? execution.getState() : null;
+ return new PartitionState(taskExecutionId, taskResultId, partitionId.getPartitionId(), state);
+ }
+
+ @RpcMethod
+ public void scheduleOrUpdateConsumers(final ResultPartitionID partitionID) {
+ executionGraph.scheduleOrUpdateConsumers(partitionID);
+ }
+
+ //----------------------------------------------------------------------------------------------
+ // Internal methods
+ //----------------------------------------------------------------------------------------------
+
+ // TODO - wrap this as StatusListenerMessenger's callback with rpc main thread
+ private void jobStatusChanged(final JobStatus newJobStatus, long timestamp, final Throwable error) {
+ final JobID jobID = executionGraph.getJobID();
+ final String jobName = executionGraph.getJobName();
+ log.info("Status of job {} ({}) changed to {}.", jobID, jobName, newJobStatus, error);
+
+ if (newJobStatus.isGloballyTerminalState()) {
+ // TODO set job end time in JobInfo
+
+ /*
+ TODO
+ if (jobInfo.sessionAlive) {
+ jobInfo.setLastActive()
+ val lastActivity = jobInfo.lastActive
+ context.system.scheduler.scheduleOnce(jobInfo.sessionTimeout seconds) {
+ // remove only if no activity occurred in the meantime
+ if (lastActivity == jobInfo.lastActive) {
+ self ! decorateMessage(RemoveJob(jobID, removeJobFromStateBackend = true))
+ }
+ }(context.dispatcher)
+ } else {
+ self ! decorateMessage(RemoveJob(jobID, removeJobFromStateBackend = true))
+ }
+ */
+
+ if (newJobStatus == JobStatus.FINISHED) {
+ try {
+ final Map<String, SerializedValue<Object>> accumulatorResults =
+ executionGraph.getAccumulatorsSerialized();
+ final SerializedJobExecutionResult result = new SerializedJobExecutionResult(
+ jobID, 0, accumulatorResults // TODO get correct job duration
+ );
+ jobCompletionActions.jobFinished(result.toJobExecutionResult(userCodeLoader));
+ } catch (Exception e) {
+ log.error("Cannot fetch final accumulators for job {} ({})", jobName, jobID, e);
+ final JobExecutionException exception = new JobExecutionException(
+ jobID, "Failed to retrieve accumulator results.", e);
+ // TODO should we also notify client?
+ jobCompletionActions.jobFailed(exception);
+ }
+ }
+ else if (newJobStatus == JobStatus.CANCELED) {
+ final Throwable unpackedError = SerializedThrowable.get(error, userCodeLoader);
+ final JobExecutionException exception = new JobExecutionException(
+ jobID, "Job was cancelled.", unpackedError);
+ // TODO should we also notify client?
+ jobCompletionActions.jobFailed(exception);
+ }
+ else if (newJobStatus == JobStatus.FAILED) {
+ final Throwable unpackedError = SerializedThrowable.get(error, userCodeLoader);
+ final JobExecutionException exception = new JobExecutionException(
+ jobID, "Job execution failed.", unpackedError);
+ // TODO should we also notify client?
+ jobCompletionActions.jobFailed(exception);
+ }
+ else {
+ final JobExecutionException exception = new JobExecutionException(
+ jobID, newJobStatus + " is not a terminal state.");
+ // TODO should we also notify client?
+ jobCompletionActions.jobFailed(exception);
+ throw new RuntimeException(exception);
+ }
+ }
+ }
+
private void notifyOfNewResourceManagerLeader(
final String resourceManagerAddress, final UUID resourceManagerLeaderId)
{
http://git-wip-us.apache.org/repos/asf/flink/blob/96a5cb2d/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index 6587ccb..fe4c041 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -19,7 +19,15 @@
package org.apache.flink.runtime.jobmaster;
import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.io.network.partition.ResultPartition;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmaster.message.NextInputSplit;
+import org.apache.flink.runtime.jobmaster.message.PartitionState;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
@@ -47,7 +55,47 @@ public interface JobMasterGateway extends RpcGateway {
* Updates the task execution state for a given task.
*
* @param taskExecutionState New task execution state for a given task
- * @return Future acknowledge of the task execution state update
+ * @return Future flag of the task execution state update result
*/
- Future<Acknowledge> updateTaskExecutionState(TaskExecutionState taskExecutionState);
+ Future<Boolean> updateTaskExecutionState(TaskExecutionState taskExecutionState);
+
+ /**
+ * Requesting next input split for the {@link ExecutionJobVertex}. The next input split is sent back to the sender
+ * as a {@link NextInputSplit} message.
+ *
+ * @param vertexID The job vertex id
+ * @param executionAttempt The execution attempt id
+ * @return The future of the input split. If there is no further input split, will return an empty object.
+ * @throws Exception if some error occurred or information mismatch.
+ */
+ Future<NextInputSplit> requestNextInputSplit(
+ final JobVertexID vertexID,
+ final ExecutionAttemptID executionAttempt) throws Exception;
+
+ /**
+ * Requests the current state of the partition.
+ * The state of a partition is currently bound to the state of the producing execution.
+ *
+ * @param partitionId The partition ID of the partition to request the state of.
+ * @param taskExecutionId The execution attempt ID of the task requesting the partition state.
+ * @param taskResultId The input gate ID of the task requesting the partition state.
+ * @return The future of the partition state
+ */
+ Future<PartitionState> requestPartitionState(
+ final ResultPartitionID partitionId,
+ final ExecutionAttemptID taskExecutionId,
+ final IntermediateDataSetID taskResultId);
+
+ /**
+ * Notifies the JobManager about available data for a produced partition.
+ * <p>
+ * There is a call to this method for each {@link ExecutionVertex} instance once per produced
+ * {@link ResultPartition} instance, either when first producing data (for pipelined executions)
+ * or when all data has been produced (for staged executions).
+ * <p>
+ * The JobManager then can decide when to schedule the partition consumers of the given session.
+ *
+ * @param partitionID The partition which has already produced data
+ */
+ void scheduleOrUpdateConsumers(final ResultPartitionID partitionID);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/96a5cb2d/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/NextInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/NextInputSplit.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/NextInputSplit.java
new file mode 100644
index 0000000..fe511ed
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/NextInputSplit.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.message;
+
+import java.io.Serializable;
+
+/**
+ * Contains the next input split for a task.
+ */
+public class NextInputSplit implements Serializable {
+
+ private static final long serialVersionUID = -1355784074565856240L;
+
+ private final byte[] splitData;
+
+ public NextInputSplit(final byte[] splitData) {
+ this.splitData = splitData;
+ }
+
+ public byte[] getSplitData() {
+ return splitData;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/96a5cb2d/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/PartitionState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/PartitionState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/PartitionState.java
new file mode 100644
index 0000000..66630ee
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/PartitionState.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.message;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+
+import java.io.Serializable;
+
+/**
+ * Answer to a [[RequestPartitionState]] with the state of the respective partition.
+ */
+public class PartitionState implements Serializable {
+ private static final long serialVersionUID = -3346485077312144516L;
+
+ private final ExecutionAttemptID taskExecutionId;
+
+ private final IntermediateDataSetID taskResultId;
+
+ private final IntermediateResultPartitionID partitionId;
+
+ private final ExecutionState state;
+
+ public PartitionState(
+ final ExecutionAttemptID taskExecutionId,
+ final IntermediateDataSetID taskResultId,
+ final IntermediateResultPartitionID partitionId,
+ final ExecutionState state)
+ {
+ this.taskExecutionId = taskExecutionId;
+ this.taskResultId = taskResultId;
+ this.partitionId = partitionId;
+ this.state = state;
+ }
+
+ public ExecutionAttemptID getTaskExecutionId() {
+ return taskExecutionId;
+ }
+
+ public IntermediateDataSetID getTaskResultId() {
+ return taskResultId;
+ }
+
+ public IntermediateResultPartitionID getPartitionId() {
+ return partitionId;
+ }
+
+ public ExecutionState getState() {
+ return state;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/96a5cb2d/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index f695de4..f45afa3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -129,7 +129,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
try {
leaderElectionService.stop();
for (JobID jobID : jobMasterGateways.keySet()) {
- highAvailabilityServices.getJobMasterLeaderRetriever(jobID).stop();
+ highAvailabilityServices.getJobManagerLeaderRetriever(jobID).stop();
}
super.shutDown();
} catch (Throwable e) {
@@ -179,7 +179,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
final LeaderConnectionInfo jobMasterLeaderInfo;
try {
jobMasterLeaderInfo = LeaderRetrievalUtils.retrieveLeaderConnectionInfo(
- highAvailabilityServices.getJobMasterLeaderRetriever(jobID), new FiniteDuration(5, TimeUnit.SECONDS));
+ highAvailabilityServices.getJobManagerLeaderRetriever(jobID), new FiniteDuration(5, TimeUnit.SECONDS));
} catch (Exception e) {
log.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID);
throw new Exception("Failed to retrieve JobMasterLeaderRetriever");
@@ -203,7 +203,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
if (!jobMasterLeaderRetrievalListeners.containsKey(jobID)) {
JobMasterLeaderListener jobMasterLeaderListener = new JobMasterLeaderListener(jobID);
try {
- LeaderRetrievalService jobMasterLeaderRetriever = highAvailabilityServices.getJobMasterLeaderRetriever(jobID);
+ LeaderRetrievalService jobMasterLeaderRetriever = highAvailabilityServices.getJobManagerLeaderRetriever(jobID);
jobMasterLeaderRetriever.start(jobMasterLeaderListener);
} catch (Exception e) {
log.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID);
http://git-wip-us.apache.org/repos/asf/flink/blob/96a5cb2d/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
index 91db564..5e69875 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
@@ -162,13 +162,46 @@ public class ZooKeeperUtils {
* @throws Exception
*/
public static ZooKeeperLeaderRetrievalService createLeaderRetrievalService(
- Configuration configuration) throws Exception {
- CuratorFramework client = startCuratorFramework(configuration);
+ final Configuration configuration) throws Exception
+ {
+ final CuratorFramework client = startCuratorFramework(configuration);
+ return createLeaderRetrievalService(client, configuration);
+ }
+
+ /**
+ * Creates a {@link ZooKeeperLeaderRetrievalService} instance.
+ *
+ * @param client The {@link CuratorFramework} ZooKeeper client to use
+ * @param configuration {@link Configuration} object containing the configuration values
+ * @return {@link ZooKeeperLeaderRetrievalService} instance.
+ * @throws Exception
+ */
+ public static ZooKeeperLeaderRetrievalService createLeaderRetrievalService(
+ final CuratorFramework client,
+ final Configuration configuration) throws Exception
+ {
+ return createLeaderRetrievalService(client, configuration, "");
+ }
+
+ /**
+ * Creates a {@link ZooKeeperLeaderRetrievalService} instance.
+ *
+ * @param client The {@link CuratorFramework} ZooKeeper client to use
+ * @param configuration {@link Configuration} object containing the configuration values
+ * @param pathSuffix The path suffix which we want to append
+ * @return {@link ZooKeeperLeaderRetrievalService} instance.
+ * @throws Exception
+ */
+ public static ZooKeeperLeaderRetrievalService createLeaderRetrievalService(
+ final CuratorFramework client,
+ final Configuration configuration,
+ final String pathSuffix) throws Exception
+ {
String leaderPath = ConfigurationUtil.getStringWithDeprecatedKeys(
- configuration,
- ConfigConstants.HA_ZOOKEEPER_LEADER_PATH,
- ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PATH,
- ConfigConstants.ZOOKEEPER_LEADER_PATH);
+ configuration,
+ ConfigConstants.HA_ZOOKEEPER_LEADER_PATH,
+ ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PATH,
+ ConfigConstants.ZOOKEEPER_LEADER_PATH) + pathSuffix;
return new ZooKeeperLeaderRetrievalService(client, leaderPath);
}
@@ -201,16 +234,33 @@ public class ZooKeeperUtils {
CuratorFramework client,
Configuration configuration) throws Exception {
- String latchPath = ConfigurationUtil.getStringWithDeprecatedKeys(
- configuration,
- ConfigConstants.HA_ZOOKEEPER_LATCH_PATH,
- ConfigConstants.DEFAULT_ZOOKEEPER_LATCH_PATH,
- ConfigConstants.ZOOKEEPER_LATCH_PATH);
- String leaderPath = ConfigurationUtil.getStringWithDeprecatedKeys(
- configuration,
- ConfigConstants.HA_ZOOKEEPER_LEADER_PATH,
- ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PATH,
- ConfigConstants.ZOOKEEPER_LEADER_PATH);
+ return createLeaderElectionService(client, configuration, "");
+ }
+
+ /**
+ * Creates a {@link ZooKeeperLeaderElectionService} instance.
+ *
+ * @param client The {@link CuratorFramework} ZooKeeper client to use
+ * @param configuration {@link Configuration} object containing the configuration values
+ * @param pathSuffix The path suffix which we want to append
+ * @return {@link ZooKeeperLeaderElectionService} instance.
+ * @throws Exception
+ */
+ public static ZooKeeperLeaderElectionService createLeaderElectionService(
+ final CuratorFramework client,
+ final Configuration configuration,
+ final String pathSuffix) throws Exception
+ {
+ final String latchPath = ConfigurationUtil.getStringWithDeprecatedKeys(
+ configuration,
+ ConfigConstants.HA_ZOOKEEPER_LATCH_PATH,
+ ConfigConstants.DEFAULT_ZOOKEEPER_LATCH_PATH,
+ ConfigConstants.ZOOKEEPER_LATCH_PATH) + pathSuffix;
+ final String leaderPath = ConfigurationUtil.getStringWithDeprecatedKeys(
+ configuration,
+ ConfigConstants.HA_ZOOKEEPER_LEADER_PATH,
+ ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PATH,
+ ConfigConstants.ZOOKEEPER_LEADER_PATH) + pathSuffix;
return new ZooKeeperLeaderElectionService(client, latchPath, leaderPath);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/96a5cb2d/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
index 1a5450d..faf69cc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
@@ -36,7 +36,7 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
private ConcurrentHashMap<JobID, LeaderRetrievalService> jobMasterLeaderRetrievers = new ConcurrentHashMap<>();
- private volatile LeaderElectionService jobMasterLeaderElectionService;
+ private ConcurrentHashMap<JobID, LeaderElectionService> jobManagerLeaderElectionServices = new ConcurrentHashMap<>();
private volatile LeaderElectionService resourceManagerLeaderElectionService;
@@ -56,8 +56,8 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
this.jobMasterLeaderRetrievers.put(jobID, jobMasterLeaderRetriever);
}
- public void setJobMasterLeaderElectionService(LeaderElectionService leaderElectionService) {
- this.jobMasterLeaderElectionService = leaderElectionService;
+ public void setJobMasterLeaderElectionService(JobID jobID, LeaderElectionService leaderElectionService) {
+ this.jobManagerLeaderElectionServices.put(jobID, leaderElectionService);
}
public void setResourceManagerLeaderElectionService(LeaderElectionService leaderElectionService) {
@@ -87,7 +87,7 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
}
@Override
- public LeaderRetrievalService getJobMasterLeaderRetriever(JobID jobID) throws Exception {
+ public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) throws Exception {
LeaderRetrievalService service = this.jobMasterLeaderRetrievers.get(jobID);
if (service != null) {
return service;
@@ -97,24 +97,24 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
}
@Override
- public LeaderElectionService getJobMasterLeaderElectionService(JobID jobID) throws Exception {
- LeaderElectionService service = jobMasterLeaderElectionService;
+ public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception {
+ LeaderElectionService service = resourceManagerLeaderElectionService;
if (service != null) {
return service;
} else {
- throw new IllegalStateException("JobMasterLeaderElectionService has not been set");
+ throw new IllegalStateException("ResourceManagerLeaderElectionService has not been set");
}
}
@Override
- public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception {
- LeaderElectionService service = resourceManagerLeaderElectionService;
+ public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) throws Exception {
+ LeaderElectionService service = this.jobManagerLeaderElectionServices.get(jobID);
if (service != null) {
return service;
} else {
- throw new IllegalStateException("ResourceManagerLeaderElectionService has not been set");
+ throw new IllegalStateException("JobMasterLeaderElectionService has not been set");
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/96a5cb2d/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index ef8e3bd..dc8de57 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -402,7 +402,8 @@ public class JobManagerHARecoveryTest {
storedJobs.remove(jobId);
}
- boolean contains(JobID jobId) {
+ @Override
+ public boolean contains(JobID jobId) {
return storedJobs.containsKey(jobId);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/96a5cb2d/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
index bfe5f55..3a769bb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.rpc.RpcService;
import org.junit.After;
@@ -57,6 +58,8 @@ public class JobManagerRunnerMockTest {
private LeaderElectionService leaderElectionService;
+ private SubmittedJobGraphStore submittedJobGraphStore;
+
private TestingOnCompletionActions jobCompletion;
@Before
@@ -72,8 +75,12 @@ public class JobManagerRunnerMockTest {
leaderElectionService = mock(LeaderElectionService.class);
when(leaderElectionService.hasLeadership()).thenReturn(true);
+ submittedJobGraphStore = mock(SubmittedJobGraphStore.class);
+ when(submittedJobGraphStore.contains(any(JobID.class))).thenReturn(true);
+
HighAvailabilityServices haServices = mock(HighAvailabilityServices.class);
- when(haServices.getJobMasterLeaderElectionService(any(JobID.class))).thenReturn(leaderElectionService);
+ when(haServices.getJobManagerLeaderElectionService(any(JobID.class))).thenReturn(leaderElectionService);
+ when(haServices.getSubmittedJobGraphStore()).thenReturn(submittedJobGraphStore);
runner = PowerMockito.spy(new JobManagerRunner(
new JobGraph("test"),
@@ -127,7 +134,7 @@ public class JobManagerRunnerMockTest {
public void testJobFinishedByOtherBeforeGrantLeadership() throws Exception {
runner.start();
- when(runner.isJobFinishedByOthers()).thenReturn(true);
+ when(submittedJobGraphStore.contains(any(JobID.class))).thenReturn(false);
runner.grantLeadership(UUID.randomUUID());
// runner should shutdown automatic and informed the job completion
http://git-wip-us.apache.org/repos/asf/flink/blob/96a5cb2d/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
index e3018c9..805ea71 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
@@ -237,7 +237,7 @@ public class SlotProtocolTest extends TestLogger {
testingHA.setResourceManagerLeaderRetriever(rmLeaderRetrievalService);
final TestingLeaderElectionService jmLeaderElectionService = new TestingLeaderElectionService();
- testingHA.setJobMasterLeaderElectionService(jmLeaderElectionService);
+ testingHA.setJobMasterLeaderElectionService(jobID, jmLeaderElectionService);
final TestingLeaderRetrievalService jmLeaderRetrievalService = new TestingLeaderRetrievalService(jmAddress, jmID);
testingHA.setJobMasterLeaderRetriever(jobID, jmLeaderRetrievalService);