You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/10/02 21:58:55 UTC

[50/50] [abbrv] flink git commit: [FLINK-4657] Implement HighAvailabilityServices based on ZooKeeper

[FLINK-4657] Implement HighAvailabilityServices based on ZooKeeper

[FLINK-4657] Implement a few rpc calls for JobMaster

[FLINK-4657][cluster management] Address review comments

[FLINK-4657][cluster management] Throw exception when error occurred when request input split


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f7cfc1bb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f7cfc1bb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f7cfc1bb

Branch: refs/heads/flip-6
Commit: f7cfc1bb66affab43bb9e2643d48ac5bfc28fe3e
Parents: ba2b590
Author: Kurt Young <yk...@gmail.com>
Authored: Mon Sep 26 10:59:16 2016 +0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Sun Oct 2 23:54:52 2016 +0200

----------------------------------------------------------------------
 .../HighAvailabilityServices.java               |   7 +-
 .../runtime/highavailability/NonHaServices.java |   4 +-
 .../highavailability/ZookeeperHaServices.java   |  82 ++++++++++
 .../StandaloneSubmittedJobGraphStore.java       |   5 +
 .../jobmanager/SubmittedJobGraphStore.java      |   8 +
 .../ZooKeeperSubmittedJobGraphStore.java        |   7 +
 .../runtime/jobmaster/JobManagerRunner.java     |  18 +--
 .../flink/runtime/jobmaster/JobMaster.java      | 161 ++++++++++++++++++-
 .../runtime/jobmaster/JobMasterGateway.java     |  54 ++++++-
 .../jobmaster/message/NextInputSplit.java       |  39 +++++
 .../resourcemanager/ResourceManager.java        |   6 +-
 .../flink/runtime/util/ZooKeeperUtils.java      |  82 ++++++++--
 .../flink/runtime/taskmanager/TaskManager.scala |   6 +-
 .../TestingHighAvailabilityServices.java        |  20 +--
 .../jobmanager/JobManagerHARecoveryTest.java    |   3 +-
 .../jobmaster/JobManagerRunnerMockTest.java     |  11 +-
 .../slotmanager/SlotProtocolTest.java           |   2 +-
 17 files changed, 455 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f7cfc1bb/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
index d67e927..a26886a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
  *     <li>JobManager leader election and leader retrieval</li>
  *     <li>Persistence for checkpoint metadata</li>
  *     <li>Registering the latest completed checkpoint(s)</li>
+ *     <li>Persistence for submitted job graph</li>
  * </ul>
  */
 public interface HighAvailabilityServices {
@@ -48,12 +49,10 @@ public interface HighAvailabilityServices {
 	 * @return
 	 * @throws Exception
 	 */
-	LeaderRetrievalService getJobMasterLeaderRetriever(JobID jobID) throws Exception;
+	LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) throws Exception;
 
 	/**
 	 * Gets the leader election service for the cluster's resource manager.
-	 * @return
-	 * @throws Exception
 	 */
 	LeaderElectionService getResourceManagerLeaderElectionService() throws Exception;
 
@@ -62,7 +61,7 @@ public interface HighAvailabilityServices {
 	 *
 	 * @param jobID The identifier of the job running the election.
 	 */
-	LeaderElectionService getJobMasterLeaderElectionService(JobID jobID) throws Exception;
+	LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) throws Exception;
 
 	/**
 	 * Gets the checkpoint recovery factory for the job manager

http://git-wip-us.apache.org/repos/asf/flink/blob/f7cfc1bb/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
index a2c9cc4..2c6295c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
@@ -79,7 +79,7 @@ public class NonHaServices implements HighAvailabilityServices {
 	}
 
 	@Override
-	public LeaderRetrievalService getJobMasterLeaderRetriever(JobID jobID) throws Exception {
+	public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) throws Exception {
 		return new StandaloneLeaderRetrievalService(jobMastersAddress.get(jobID), new UUID(0, 0));
 	}
 
@@ -89,7 +89,7 @@ public class NonHaServices implements HighAvailabilityServices {
 	}
 
 	@Override
-	public LeaderElectionService getJobMasterLeaderElectionService(JobID jobID) throws Exception {
+	public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) throws Exception {
 		return new StandaloneLeaderElectionService();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f7cfc1bb/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
new file mode 100644
index 0000000..d26b668
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointRecoveryFactory;
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
+
+/**
+ * An implementation of the {@link HighAvailabilityServices} with zookeeper.
+ */
+public class ZookeeperHaServices implements HighAvailabilityServices {
+
+	private static final String DEFAULT_RESOURCE_MANAGER_PATH_SUFFIX = "/resource-manager";
+
+	/** The ZooKeeper client to use */
+	private final CuratorFramework client;
+
+	/** The runtime configuration */
+	private final Configuration configuration;
+
+	public ZookeeperHaServices(final CuratorFramework client, final Configuration configuration) {
+		this.client = client;
+		this.configuration = configuration;
+	}
+
+	@Override
+	public LeaderRetrievalService getResourceManagerLeaderRetriever() throws Exception {
+		return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, DEFAULT_RESOURCE_MANAGER_PATH_SUFFIX);
+	}
+
+	@Override
+	public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) throws Exception {
+		return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, getPathSuffixForJob(jobID));
+	}
+
+	@Override
+	public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception {
+		return ZooKeeperUtils.createLeaderElectionService(client, configuration, DEFAULT_RESOURCE_MANAGER_PATH_SUFFIX);
+	}
+
+	@Override
+	public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) throws Exception {
+		return ZooKeeperUtils.createLeaderElectionService(client, configuration, getPathSuffixForJob(jobID));
+	}
+
+	@Override
+	public CheckpointRecoveryFactory getCheckpointRecoveryFactory() throws Exception {
+		return new ZooKeeperCheckpointRecoveryFactory(client, configuration);
+	}
+
+	@Override
+	public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception {
+		return ZooKeeperUtils.createSubmittedJobGraphs(client, configuration);
+	}
+
+	private static String getPathSuffixForJob(final JobID jobID) {
+		return String.format("/job-managers/%s", jobID);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f7cfc1bb/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
index 3041cde..00df935 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
@@ -62,4 +62,9 @@ public class StandaloneSubmittedJobGraphStore implements SubmittedJobGraphStore
 	public List<SubmittedJobGraph> recoverJobGraphs() throws Exception {
 		return Collections.emptyList();
 	}
+
+	@Override
+	public boolean contains(JobID jobId) throws Exception {
+		return false;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f7cfc1bb/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java
index bd628cd..4d544ae 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java
@@ -64,6 +64,14 @@ public interface SubmittedJobGraphStore {
 	void removeJobGraph(JobID jobId) throws Exception;
 
 	/**
+	 * Check whether the given {@link JobID} is exist.
+	 *
+	 * <p>It's also a flag indicates whether we should recover this job before we can do anything else, since all
+	 * global terminated job will be removed from this store.
+	 */
+	boolean contains(final JobID jobId) throws Exception;
+
+	/**
 	 * A listener for {@link SubmittedJobGraph} instances. This is used to react to races between
 	 * multiple running {@link SubmittedJobGraphStore} instances (on multiple job managers).
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/f7cfc1bb/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
index ec05f1e..92093c5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
@@ -266,6 +266,13 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
 		}
 	}
 
+	@Override
+	public boolean contains(JobID jobId) throws Exception {
+		checkNotNull(jobId, "Job ID");
+		String path = getPathForJob(jobId);
+		return jobGraphsInZooKeeper.exists(path) != -1;
+	}
+
 	/**
 	 * Monitors ZooKeeper for changes.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/f7cfc1bb/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index 6944d85..a096932 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -21,20 +21,18 @@ package org.apache.flink.runtime.jobmaster;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.rpc.RpcService;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.UUID;
-import java.util.concurrent.Executor;
 
 /**
  * The runner for the job manager. It deals with job level leader election and make underlying job manager
@@ -52,11 +50,8 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions {
 
 	private final OnCompletionActions toNotify;
 
-	/** The execution context which is used to execute futures */
-	private final Executor executionContext;
-
-	// TODO: use this to decide whether the job is finished by other
-	private final CheckpointRecoveryFactory checkpointRecoveryFactory;
+	/** Used to check whether a job needs to be run */
+	private final SubmittedJobGraphStore submittedJobGraphStore;
 
 	/** Leader election for this job */
 	private final LeaderElectionService leaderElectionService;
@@ -87,9 +82,8 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions {
 	{
 		this.jobGraph = jobGraph;
 		this.toNotify = toNotify;
-		this.executionContext = rpcService.getExecutor();
-		this.checkpointRecoveryFactory = haServices.getCheckpointRecoveryFactory();
-		this.leaderElectionService = haServices.getJobMasterLeaderElectionService(jobGraph.getJobID());
+		this.submittedJobGraphStore = haServices.getSubmittedJobGraphStore();
+		this.leaderElectionService = haServices.getJobManagerLeaderElectionService(jobGraph.getJobID());
 
 		this.jobManager = new JobMaster(
 			jobGraph, configuration, rpcService, haServices,
@@ -271,7 +265,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions {
 
 	@VisibleForTesting
 	boolean isJobFinishedByOthers() {
-		// TODO
+		// TODO: Fix
 		return false;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f7cfc1bb/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 1e01c55..e67a167 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -24,6 +24,8 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
@@ -35,23 +37,32 @@ import org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.stats.SimpleCheckpointStatsTracker;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.client.JobSubmissionException;
+import org.apache.flink.runtime.client.SerializedJobExecutionResult;
+import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.instance.Slot;
+import org.apache.flink.runtime.io.network.PartitionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
 import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import org.apache.flink.runtime.jobmaster.message.NextInputSplit;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.registration.RegisteredRpcConnection;
 import org.apache.flink.runtime.registration.RegistrationResponse;
@@ -61,13 +72,18 @@ import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.util.SerializedThrowable;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.SerializedValue;
 import org.slf4j.Logger;
 
 import scala.concurrent.ExecutionContext$;
 import scala.concurrent.duration.FiniteDuration;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.Executor;
 
@@ -491,9 +507,12 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 	 * @return Acknowledge the task execution state update
 	 */
 	@RpcMethod
-	public Acknowledge updateTaskExecutionState(TaskExecutionState taskExecutionState) {
-		System.out.println("TaskExecutionState: " + taskExecutionState);
-		return Acknowledge.get();
+	public boolean updateTaskExecutionState(final TaskExecutionState taskExecutionState) {
+		if (taskExecutionState == null) {
+			return false;
+		} else {
+			return executionGraph.updateState(taskExecutionState);
+		}
 	}
 
 	//----------------------------------------------------------------------------------------------\u2028
@@ -511,6 +530,140 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 		});
 	}
 
+	@RpcMethod
+	public NextInputSplit requestNextInputSplit(
+		final JobVertexID vertexID,
+		final ExecutionAttemptID executionAttempt) throws Exception
+	{
+		final Execution execution = executionGraph.getRegisteredExecutions().get(executionAttempt);
+		if (execution == null) {
+			// can happen when JobManager had already unregistered this execution upon on task failure,
+			// but TaskManager get some delay to aware of that situation
+			if (log.isDebugEnabled()) {
+				log.debug("Can not find Execution for attempt {}.", executionAttempt);
+			}
+			// but we should TaskManager be aware of this
+			throw new Exception("Can not find Execution for attempt " + executionAttempt);
+		}
+
+		final ExecutionJobVertex vertex = executionGraph.getJobVertex(vertexID);
+		if (vertex == null) {
+			log.error("Cannot find execution vertex for vertex ID {}.", vertexID);
+			throw new Exception("Cannot find execution vertex for vertex ID " + vertexID);
+		}
+
+		final InputSplitAssigner splitAssigner = vertex.getSplitAssigner();
+		if (splitAssigner == null) {
+			log.error("No InputSplitAssigner for vertex ID {}.", vertexID);
+			throw new Exception("No InputSplitAssigner for vertex ID " + vertexID);
+		}
+
+		final Slot slot = execution.getAssignedResource();
+		final int taskId = execution.getVertex().getParallelSubtaskIndex();
+		final String host = slot != null ? slot.getTaskManagerLocation().getHostname() : null;
+		final InputSplit nextInputSplit = splitAssigner.getNextInputSplit(host, taskId);
+
+		if (log.isDebugEnabled()) {
+			log.debug("Send next input split {}.", nextInputSplit);
+		}
+
+		try {
+			final byte[] serializedInputSplit = InstantiationUtil.serializeObject(nextInputSplit);
+			return new NextInputSplit(serializedInputSplit);
+		} catch (Exception ex) {
+			log.error("Could not serialize the next input split of class {}.", nextInputSplit.getClass(), ex);
+			IOException reason = new IOException("Could not serialize the next input split of class " +
+				nextInputSplit.getClass() + ".", ex);
+			vertex.fail(reason);
+			throw reason;
+		}
+	}
+
+	@RpcMethod
+	public PartitionState requestPartitionState(
+		final ResultPartitionID partitionId,
+		final ExecutionAttemptID taskExecutionId,
+		final IntermediateDataSetID taskResultId)
+	{
+		final Execution execution = executionGraph.getRegisteredExecutions().get(partitionId.getProducerId());
+		final ExecutionState state = execution != null ? execution.getState() : null;
+		return new PartitionState(taskResultId, partitionId.getPartitionId(), state);
+	}
+
+	@RpcMethod
+	public void scheduleOrUpdateConsumers(final ResultPartitionID partitionID) {
+		executionGraph.scheduleOrUpdateConsumers(partitionID);
+	}
+
+	//----------------------------------------------------------------------------------------------
+	// Internal methods
+	//----------------------------------------------------------------------------------------------
+
+	// TODO - wrap this as StatusListenerMessenger's callback with rpc main thread
+	private void jobStatusChanged(final JobStatus newJobStatus, long timestamp, final Throwable error) {
+		final JobID jobID = executionGraph.getJobID();
+		final String jobName = executionGraph.getJobName();
+		log.info("Status of job {} ({}) changed to {}.", jobID, jobName, newJobStatus, error);
+
+		if (newJobStatus.isGloballyTerminalState()) {
+			// TODO set job end time in JobInfo
+
+			/*
+			  TODO
+			  if (jobInfo.sessionAlive) {
+                jobInfo.setLastActive()
+                val lastActivity = jobInfo.lastActive
+                context.system.scheduler.scheduleOnce(jobInfo.sessionTimeout seconds) {
+                  // remove only if no activity occurred in the meantime
+                  if (lastActivity == jobInfo.lastActive) {
+                    self ! decorateMessage(RemoveJob(jobID, removeJobFromStateBackend = true))
+                  }
+                }(context.dispatcher)
+              } else {
+                self ! decorateMessage(RemoveJob(jobID, removeJobFromStateBackend = true))
+              }
+			 */
+
+			if (newJobStatus == JobStatus.FINISHED) {
+				try {
+					final Map<String, SerializedValue<Object>> accumulatorResults =
+						executionGraph.getAccumulatorsSerialized();
+					final SerializedJobExecutionResult result = new SerializedJobExecutionResult(
+						jobID, 0, accumulatorResults // TODO get correct job duration
+					);
+					jobCompletionActions.jobFinished(result.toJobExecutionResult(userCodeLoader));
+				} catch (Exception e) {
+					log.error("Cannot fetch final accumulators for job {} ({})", jobName, jobID, e);
+					final JobExecutionException exception = new JobExecutionException(
+						jobID, "Failed to retrieve accumulator results.", e);
+					// TODO should we also notify client?
+					jobCompletionActions.jobFailed(exception);
+				}
+			}
+			else if (newJobStatus == JobStatus.CANCELED) {
+				final Throwable unpackedError = SerializedThrowable.get(error, userCodeLoader);
+				final JobExecutionException exception = new JobExecutionException(
+					jobID, "Job was cancelled.", unpackedError);
+				// TODO should we also notify client?
+				jobCompletionActions.jobFailed(exception);
+			}
+			else if (newJobStatus == JobStatus.FAILED) {
+				final Throwable unpackedError = SerializedThrowable.get(error, userCodeLoader);
+				final JobExecutionException exception = new JobExecutionException(
+					jobID, "Job execution failed.", unpackedError);
+				// TODO should we also notify client?
+				jobCompletionActions.jobFailed(exception);
+			}
+			else {
+				final JobExecutionException exception = new JobExecutionException(
+					jobID, newJobStatus + " is not a terminal state.");
+				// TODO should we also notify client?
+				jobCompletionActions.jobFailed(exception);
+				throw new RuntimeException(exception);
+			}
+		}
+	}
+
 	private void notifyOfNewResourceManagerLeader(
 		final String resourceManagerAddress, final UUID resourceManagerLeaderId)
 	{

http://git-wip-us.apache.org/repos/asf/flink/blob/f7cfc1bb/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index 6587ccb..686a3f3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -19,7 +19,15 @@
 package org.apache.flink.runtime.jobmaster;
 
 import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.io.network.PartitionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartition;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmaster.message.NextInputSplit;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 
@@ -47,7 +55,47 @@ public interface JobMasterGateway extends RpcGateway {
 	 * Updates the task execution state for a given task.
 	 *
 	 * @param taskExecutionState New task execution state for a given task
-	 * @return Future acknowledge of the task execution state update
+	 * @return Future flag of the task execution state update result
 	 */
-	Future<Acknowledge> updateTaskExecutionState(TaskExecutionState taskExecutionState);
+	Future<Boolean> updateTaskExecutionState(TaskExecutionState taskExecutionState);
+
+	/**
+	 * Requesting next input split for the {@link ExecutionJobVertex}. The next input split is sent back to the sender
+	 * as a {@link NextInputSplit} message.
+	 *
+	 * @param vertexID         The job vertex id
+	 * @param executionAttempt The execution attempt id
+	 * @return The future of the input split. If there is no further input split, will return an empty object.
+	 * @throws Exception if some error occurred or information mismatch.
+	 */
+	Future<NextInputSplit> requestNextInputSplit(
+		final JobVertexID vertexID,
+		final ExecutionAttemptID executionAttempt) throws Exception;
+
+	/**
+	 * Requests the current state of the partition.
+	 * The state of a partition is currently bound to the state of the producing execution.
+	 *
+	 * @param partitionId     The partition ID of the partition to request the state of.
+	 * @param taskExecutionId The execution attempt ID of the task requesting the partition state.
+	 * @param taskResultId    The input gate ID of the task requesting the partition state.
+	 * @return The future of the partition state
+	 */
+	Future<PartitionState> requestPartitionState(
+		final ResultPartitionID partitionId,
+		final ExecutionAttemptID taskExecutionId,
+		final IntermediateDataSetID taskResultId);
+
+	/**
+	 * Notifies the JobManager about available data for a produced partition.
+	 * <p>
+	 * There is a call to this method for each {@link ExecutionVertex} instance once per produced
+	 * {@link ResultPartition} instance, either when first producing data (for pipelined executions)
+	 * or when all data has been produced (for staged executions).
+	 * <p>
+	 * The JobManager then can decide when to schedule the partition consumers of the given session.
+	 *
+	 * @param partitionID The partition which has already produced data
+	 */
+	void scheduleOrUpdateConsumers(final ResultPartitionID partitionID);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f7cfc1bb/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/NextInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/NextInputSplit.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/NextInputSplit.java
new file mode 100644
index 0000000..fe511ed
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/NextInputSplit.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.message;
+
+import java.io.Serializable;
+
+/**
+ * Contains the next input split for a task.
+ */
+public class NextInputSplit implements Serializable {
+
+	private static final long serialVersionUID = -1355784074565856240L;
+
+	private final byte[] splitData;
+
+	public NextInputSplit(final byte[] splitData) {
+		this.splitData = splitData;
+	}
+
+	public byte[] getSplitData() {
+		return splitData;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f7cfc1bb/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index f695de4..f45afa3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -129,7 +129,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 		try {
 			leaderElectionService.stop();
 			for (JobID jobID : jobMasterGateways.keySet()) {
-				highAvailabilityServices.getJobMasterLeaderRetriever(jobID).stop();
+				highAvailabilityServices.getJobManagerLeaderRetriever(jobID).stop();
 			}
 			super.shutDown();
 		} catch (Throwable e) {
@@ -179,7 +179,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 					final LeaderConnectionInfo jobMasterLeaderInfo;
 					try {
 						jobMasterLeaderInfo = LeaderRetrievalUtils.retrieveLeaderConnectionInfo(
-							highAvailabilityServices.getJobMasterLeaderRetriever(jobID), new FiniteDuration(5, TimeUnit.SECONDS));
+							highAvailabilityServices.getJobManagerLeaderRetriever(jobID), new FiniteDuration(5, TimeUnit.SECONDS));
 					} catch (Exception e) {
 						log.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID);
 						throw new Exception("Failed to retrieve JobMasterLeaderRetriever");
@@ -203,7 +203,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 						if (!jobMasterLeaderRetrievalListeners.containsKey(jobID)) {
 							JobMasterLeaderListener jobMasterLeaderListener = new JobMasterLeaderListener(jobID);
 							try {
-								LeaderRetrievalService jobMasterLeaderRetriever = highAvailabilityServices.getJobMasterLeaderRetriever(jobID);
+								LeaderRetrievalService jobMasterLeaderRetriever = highAvailabilityServices.getJobManagerLeaderRetriever(jobID);
 								jobMasterLeaderRetriever.start(jobMasterLeaderListener);
 							} catch (Exception e) {
 								log.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID);

http://git-wip-us.apache.org/repos/asf/flink/blob/f7cfc1bb/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
index 91db564..5e69875 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
@@ -162,13 +162,46 @@ public class ZooKeeperUtils {
 	 * @throws Exception
 	 */
 	public static ZooKeeperLeaderRetrievalService createLeaderRetrievalService(
-			Configuration configuration) throws Exception {
-		CuratorFramework client = startCuratorFramework(configuration);
+		final Configuration configuration) throws Exception
+	{
+		final CuratorFramework client = startCuratorFramework(configuration);
+		return createLeaderRetrievalService(client, configuration);
+	}
+
+	/**
+	 * Creates a {@link ZooKeeperLeaderRetrievalService} instance.
+	 *
+	 * @param client        The {@link CuratorFramework} ZooKeeper client to use
+	 * @param configuration {@link Configuration} object containing the configuration values
+	 * @return {@link ZooKeeperLeaderRetrievalService} instance.
+	 * @throws Exception
+	 */
+	public static ZooKeeperLeaderRetrievalService createLeaderRetrievalService(
+		final CuratorFramework client,
+		final Configuration configuration) throws Exception
+	{
+		return createLeaderRetrievalService(client, configuration, "");
+	}
+
+	/**
+	 * Creates a {@link ZooKeeperLeaderRetrievalService} instance.
+	 *
+	 * @param client        The {@link CuratorFramework} ZooKeeper client to use
+	 * @param configuration {@link Configuration} object containing the configuration values
+	 * @param pathSuffix    The path suffix which we want to append
+	 * @return {@link ZooKeeperLeaderRetrievalService} instance.
+	 * @throws Exception
+	 */
+	public static ZooKeeperLeaderRetrievalService createLeaderRetrievalService(
+		final CuratorFramework client,
+		final Configuration configuration,
+		final String pathSuffix) throws Exception
+	{
 		String leaderPath = ConfigurationUtil.getStringWithDeprecatedKeys(
-				configuration,
-				ConfigConstants.HA_ZOOKEEPER_LEADER_PATH,
-				ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PATH,
-				ConfigConstants.ZOOKEEPER_LEADER_PATH);
+			configuration,
+			ConfigConstants.HA_ZOOKEEPER_LEADER_PATH,
+			ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PATH,
+			ConfigConstants.ZOOKEEPER_LEADER_PATH) + pathSuffix;
 
 		return new ZooKeeperLeaderRetrievalService(client, leaderPath);
 	}
@@ -201,16 +234,33 @@ public class ZooKeeperUtils {
 			CuratorFramework client,
 			Configuration configuration) throws Exception {
 
-		String latchPath = ConfigurationUtil.getStringWithDeprecatedKeys(
-				configuration,
-				ConfigConstants.HA_ZOOKEEPER_LATCH_PATH,
-				ConfigConstants.DEFAULT_ZOOKEEPER_LATCH_PATH,
-				ConfigConstants.ZOOKEEPER_LATCH_PATH);
-		String leaderPath = ConfigurationUtil.getStringWithDeprecatedKeys(
-				configuration,
-				ConfigConstants.HA_ZOOKEEPER_LEADER_PATH,
-				ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PATH,
-				ConfigConstants.ZOOKEEPER_LEADER_PATH);
+		return createLeaderElectionService(client, configuration, "");
+	}
+
+	/**
+	 * Creates a {@link ZooKeeperLeaderElectionService} instance.
+	 *
+	 * @param client        The {@link CuratorFramework} ZooKeeper client to use
+	 * @param configuration {@link Configuration} object containing the configuration values
+	 * @param pathSuffix    The path suffix which we want to append
+	 * @return {@link ZooKeeperLeaderElectionService} instance.
+	 * @throws Exception
+	 */
+	public static ZooKeeperLeaderElectionService createLeaderElectionService(
+		final CuratorFramework client,
+		final Configuration configuration,
+		final String pathSuffix) throws Exception
+	{
+		final String latchPath = ConfigurationUtil.getStringWithDeprecatedKeys(
+			configuration,
+			ConfigConstants.HA_ZOOKEEPER_LATCH_PATH,
+			ConfigConstants.DEFAULT_ZOOKEEPER_LATCH_PATH,
+			ConfigConstants.ZOOKEEPER_LATCH_PATH) + pathSuffix;
+		final String leaderPath = ConfigurationUtil.getStringWithDeprecatedKeys(
+			configuration,
+			ConfigConstants.HA_ZOOKEEPER_LEADER_PATH,
+			ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PATH,
+			ConfigConstants.ZOOKEEPER_LEADER_PATH) + pathSuffix;
 
 		return new ZooKeeperLeaderElectionService(client, latchPath, leaderPath);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/f7cfc1bb/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index d16c1b0..7a764ca 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -52,6 +52,8 @@ import org.apache.flink.runtime.filecache.FileCache
 import org.apache.flink.runtime.instance.{AkkaActorGateway, HardwareDescription, InstanceID}
 import org.apache.flink.runtime.io.disk.iomanager.IOManager
 import org.apache.flink.runtime.io.network.NetworkEnvironment
+import org.apache.flink.runtime.io.network.netty.PartitionStateChecker
+import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID
 import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, LeaderRetrievalService}
 import org.apache.flink.runtime.memory.MemoryManager
@@ -944,12 +946,12 @@ class TaskManager(
 
     val partitionStateChecker = new ActorGatewayPartitionStateChecker(
       jobManagerGateway,
-      config.timeout)
+      new FiniteDuration(config.getTimeout().toMilliseconds, TimeUnit.MILLISECONDS))
 
     val resultPartitionConsumableNotifier = new ActorGatewayResultPartitionConsumableNotifier(
       context.dispatcher,
       jobManagerGateway,
-      config.timeout)
+      new FiniteDuration(config.getTimeout().toMilliseconds, TimeUnit.MILLISECONDS))
 
     connectionUtils = Some(
       (checkpointResponder,

http://git-wip-us.apache.org/repos/asf/flink/blob/f7cfc1bb/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
index 1a5450d..faf69cc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
@@ -36,7 +36,7 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
 
 	private ConcurrentHashMap<JobID, LeaderRetrievalService> jobMasterLeaderRetrievers = new ConcurrentHashMap<>();
 
-	private volatile LeaderElectionService jobMasterLeaderElectionService;
+	private ConcurrentHashMap<JobID, LeaderElectionService> jobManagerLeaderElectionServices = new ConcurrentHashMap<>();
 
 	private volatile LeaderElectionService resourceManagerLeaderElectionService;
 
@@ -56,8 +56,8 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
 		this.jobMasterLeaderRetrievers.put(jobID, jobMasterLeaderRetriever);
 	}
 
-	public void setJobMasterLeaderElectionService(LeaderElectionService leaderElectionService) {
-		this.jobMasterLeaderElectionService = leaderElectionService;
+	public void setJobMasterLeaderElectionService(JobID jobID, LeaderElectionService leaderElectionService) {
+		this.jobManagerLeaderElectionServices.put(jobID, leaderElectionService);
 	}
 
 	public void setResourceManagerLeaderElectionService(LeaderElectionService leaderElectionService) {
@@ -87,7 +87,7 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
 	}
 
 	@Override
-	public LeaderRetrievalService getJobMasterLeaderRetriever(JobID jobID) throws Exception {
+	public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) throws Exception {
 		LeaderRetrievalService service = this.jobMasterLeaderRetrievers.get(jobID);
 		if (service != null) {
 			return service;
@@ -97,24 +97,24 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
 	}
 
 	@Override
-	public LeaderElectionService getJobMasterLeaderElectionService(JobID jobID) throws Exception {
-		LeaderElectionService service = jobMasterLeaderElectionService;
+	public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception {
+		LeaderElectionService service = resourceManagerLeaderElectionService;
 
 		if (service != null) {
 			return service;
 		} else {
-			throw new IllegalStateException("JobMasterLeaderElectionService has not been set");
+			throw new IllegalStateException("ResourceManagerLeaderElectionService has not been set");
 		}
 	}
 
 	@Override
-	public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception {
-		LeaderElectionService service = resourceManagerLeaderElectionService;
+	public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) throws Exception {
+		LeaderElectionService service = this.jobManagerLeaderElectionServices.get(jobID);
 
 		if (service != null) {
 			return service;
 		} else {
-			throw new IllegalStateException("ResourceManagerLeaderElectionService has not been set");
+			throw new IllegalStateException("JobMasterLeaderElectionService has not been set");
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f7cfc1bb/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index 9b12cac..78bc15a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -406,7 +406,8 @@ public class JobManagerHARecoveryTest {
 			storedJobs.remove(jobId);
 		}
 
-		boolean contains(JobID jobId) {
+		@Override
+		public boolean contains(JobID jobId) {
 			return storedJobs.containsKey(jobId);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/f7cfc1bb/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
index bfe5f55..3a769bb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.junit.After;
@@ -57,6 +58,8 @@ public class JobManagerRunnerMockTest {
 
 	private LeaderElectionService leaderElectionService;
 
+	private SubmittedJobGraphStore submittedJobGraphStore;
+
 	private TestingOnCompletionActions jobCompletion;
 
 	@Before
@@ -72,8 +75,12 @@ public class JobManagerRunnerMockTest {
 		leaderElectionService = mock(LeaderElectionService.class);
 		when(leaderElectionService.hasLeadership()).thenReturn(true);
 
+		submittedJobGraphStore = mock(SubmittedJobGraphStore.class);
+		when(submittedJobGraphStore.contains(any(JobID.class))).thenReturn(true);
+
 		HighAvailabilityServices haServices = mock(HighAvailabilityServices.class);
-		when(haServices.getJobMasterLeaderElectionService(any(JobID.class))).thenReturn(leaderElectionService);
+		when(haServices.getJobManagerLeaderElectionService(any(JobID.class))).thenReturn(leaderElectionService);
+		when(haServices.getSubmittedJobGraphStore()).thenReturn(submittedJobGraphStore);
 
 		runner = PowerMockito.spy(new JobManagerRunner(
 			new JobGraph("test"),
@@ -127,7 +134,7 @@ public class JobManagerRunnerMockTest {
 	public void testJobFinishedByOtherBeforeGrantLeadership() throws Exception {
 		runner.start();
 
-		when(runner.isJobFinishedByOthers()).thenReturn(true);
+		when(submittedJobGraphStore.contains(any(JobID.class))).thenReturn(false);
 		runner.grantLeadership(UUID.randomUUID());
 
 		// runner should shutdown automatic and informed the job completion

http://git-wip-us.apache.org/repos/asf/flink/blob/f7cfc1bb/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
index e3018c9..805ea71 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
@@ -237,7 +237,7 @@ public class SlotProtocolTest extends TestLogger {
 		testingHA.setResourceManagerLeaderRetriever(rmLeaderRetrievalService);
 
 		final TestingLeaderElectionService jmLeaderElectionService = new TestingLeaderElectionService();
-		testingHA.setJobMasterLeaderElectionService(jmLeaderElectionService);
+		testingHA.setJobMasterLeaderElectionService(jobID, jmLeaderElectionService);
 		final TestingLeaderRetrievalService jmLeaderRetrievalService = new TestingLeaderRetrievalService(jmAddress, jmID);
 		testingHA.setJobMasterLeaderRetriever(jobID, jmLeaderRetrievalService);