You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2016/09/21 20:02:21 UTC

[1/4] flink git commit: [FLINK-4537] [cluster management] ResourceManager registration with JobManager

Repository: flink
Updated Branches:
  refs/heads/flip-6 b5f6a06b0 -> 4076bd748


[FLINK-4537] [cluster management] ResourceManager registration with JobManager


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8b2271bd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8b2271bd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8b2271bd

Branch: refs/heads/flip-6
Commit: 8b2271bdd8197e4ca051bc4c9c3d90de88c986ce
Parents: b5f6a06
Author: beyond1920 <be...@126.com>
Authored: Thu Sep 1 15:27:20 2016 +0800
Committer: Maximilian Michels <mx...@apache.org>
Committed: Wed Sep 21 21:53:16 2016 +0200

----------------------------------------------------------------------
 .../HighAvailabilityServices.java               |   9 ++
 .../runtime/highavailability/NonHaServices.java |  19 +++
 .../jobmaster/JobMasterRegistrationSuccess.java |  49 ++++++
 .../resourcemanager/JobMasterRegistration.java  |  39 ++++-
 .../resourcemanager/ResourceManager.java        | 125 +++++++++++++--
 .../resourcemanager/ResourceManagerGateway.java |  34 ++--
 .../exceptions/LeaderSessionIDException.java    |  60 +++++++
 .../runtime/taskexecutor/TaskExecutor.java      |   5 +
 .../TestingHighAvailabilityServices.java        |  17 ++
 .../resourcemanager/ResourceManagerTest.java    | 160 +++++++++++++++++++
 10 files changed, 483 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8b2271bd/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
index 298147c..7634176 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
@@ -40,6 +40,15 @@ public interface HighAvailabilityServices {
 	LeaderRetrievalService getResourceManagerLeaderRetriever() throws Exception;
 
 	/**
+	 * Gets the leader retriever for the job JobMaster which is responsible for the given job
+	 *
+	 * @param jobID The identifier of the job.
+	 * @return
+	 * @throws Exception
+	 */
+	LeaderRetrievalService getJobMasterLeaderRetriever(JobID jobID) throws Exception;
+
+	/**
 	 * Gets the leader election service for the cluster's resource manager.
 	 * @return
 	 * @throws Exception

http://git-wip-us.apache.org/repos/asf/flink/blob/8b2271bd/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
index 292a404..33dc2d7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.highavailability;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.hadoop.shaded.org.jboss.netty.util.internal.ConcurrentHashMap;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderelection.StandaloneLeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -42,6 +43,8 @@ public class NonHaServices implements HighAvailabilityServices {
 	/** The fix address of the ResourceManager */
 	private final String resourceManagerAddress;
 
+	private final ConcurrentHashMap<JobID, String> jobMastersAddress;
+
 	/**
 	 * Creates a new services class for the fix pre-defined leaders.
 	 * 
@@ -49,6 +52,17 @@ public class NonHaServices implements HighAvailabilityServices {
 	 */
 	public NonHaServices(String resourceManagerAddress) {
 		this.resourceManagerAddress = checkNotNull(resourceManagerAddress);
+		this.jobMastersAddress = new ConcurrentHashMap<>(16);
+	}
+
+	/**
+	 * Binds address of a specified job master
+	 *
+	 * @param jobID            JobID for the specified job master
+	 * @param jobMasterAddress address for the specified job master
+	 */
+	public void bindJobMasterLeaderAddress(JobID jobID, String jobMasterAddress) {
+		jobMastersAddress.put(jobID, jobMasterAddress);
 	}
 
 	// ------------------------------------------------------------------------
@@ -61,6 +75,11 @@ public class NonHaServices implements HighAvailabilityServices {
 	}
 
 	@Override
+	public LeaderRetrievalService getJobMasterLeaderRetriever(JobID jobID) throws Exception {
+		return new StandaloneLeaderRetrievalService(jobMastersAddress.get(jobID), new UUID(0, 0));
+	}
+
+	@Override
 	public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception {
 		return new StandaloneLeaderElectionService();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b2271bd/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRegistrationSuccess.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRegistrationSuccess.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRegistrationSuccess.java
new file mode 100644
index 0000000..031c38e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRegistrationSuccess.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.runtime.registration.RegistrationResponse;
+
+/**
+ * Base class for responses from the ResourceManager to a registration attempt by a JobMaster.
+ */
+public class JobMasterRegistrationSuccess extends RegistrationResponse.Success {
+
+	private static final long serialVersionUID = 5577641250204140415L;
+
+	private final long heartbeatInterval;
+
+	public JobMasterRegistrationSuccess(long heartbeatInterval) {
+		this.heartbeatInterval = heartbeatInterval;
+	}
+
+	/**
+	 * Gets the interval in which the ResourceManager will heartbeat the JobMaster.
+	 *
+	 * @return the interval in which the ResourceManager will heartbeat the JobMaster
+	 */
+	public long getHeartbeatInterval() {
+		return heartbeatInterval;
+	}
+
+	@Override
+	public String toString() {
+		return "JobMasterRegistrationSuccess(" + heartbeatInterval + ')';
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b2271bd/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java
index 439e56b..7b8ec70 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java
@@ -18,23 +18,56 @@
 
 package org.apache.flink.runtime.resourcemanager;
 
+<<<<<<< HEAD
 import org.apache.flink.api.common.JobID;
+=======
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+>>>>>>> db98efb... rsourceManager registration with JobManager
 
 import java.io.Serializable;
+import java.util.UUID;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class is responsible for group the JobMasterGateway and the LeaderSessionID of a registered job master
+ */
 public class JobMasterRegistration implements Serializable {
-	private static final long serialVersionUID = 8411214999193765202L;
 
+<<<<<<< HEAD
 	private final String address;
 	private final JobID jobID;
 
 	public JobMasterRegistration(String address, JobID jobID) {
 		this.address = address;
 		this.jobID = jobID;
+=======
+	private static final long serialVersionUID = -2316627821716999527L;
+
+	private final JobMasterGateway jobMasterGateway;
+
+	private UUID jobMasterLeaderSessionID;
+
+	public JobMasterRegistration(JobMasterGateway jobMasterGateway) {
+		this.jobMasterGateway = checkNotNull(jobMasterGateway);
+	}
+
+	public JobMasterRegistration(JobMasterGateway jobMasterGateway, UUID jobMasterLeaderSessionID) {
+		this.jobMasterGateway = checkNotNull(jobMasterGateway);
+		this.jobMasterLeaderSessionID = jobMasterLeaderSessionID;
+	}
+
+	public JobMasterGateway getJobMasterGateway() {
+		return jobMasterGateway;
+	}
+
+	public void setJobMasterLeaderSessionID(UUID leaderSessionID) {
+		this.jobMasterLeaderSessionID = jobMasterLeaderSessionID;
+>>>>>>> db98efb... rsourceManager registration with JobManager
 	}
 
-	public String getAddress() {
-		return address;
+	public UUID getJobMasterLeaderSessionID() {
+		return jobMasterLeaderSessionID;
 	}
 
 	public JobID getJobID() {

http://git-wip-us.apache.org/repos/asf/flink/blob/8b2271bd/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 5370710..8be1455 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.resourcemanager;
 
+import akka.dispatch.Futures;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -25,15 +26,22 @@ import org.apache.flink.runtime.concurrent.ApplyFunction;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
 import org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+<<<<<<< HEAD
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+=======
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+>>>>>>> db98efb... rsourceManager registration with JobManager
 import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.jobmaster.JobMaster;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.rpc.exceptions.LeaderSessionIDException;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
+import org.apache.flink.runtime.registration.RegistrationResponse;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,15 +58,21 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  *
  * It offers the following methods as part of its rpc interface to interact with the him remotely:
  * <ul>
- *     <li>{@link #registerJobMaster(JobMasterRegistration)} registers a {@link JobMaster} at the resource manager</li>
+ *     <li>{@link #registerJobMaster(UUID, String, JobID)} registers a {@link JobMaster} at the resource manager</li>
  *     <li>{@link #requestSlot(SlotRequest)} requests a slot from the resource manager</li>
  * </ul>
  */
+<<<<<<< HEAD
 public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> implements LeaderContender {
 
 	private final Logger LOG = LoggerFactory.getLogger(getClass());
 
 	private final Map<JobID, JobMasterGateway> jobMasterGateways;
+=======
+public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
+	/** the mapping relationship of JobID and JobMasterGateway */
+	private final Map<JobID, JobMasterRegistration> jobMasters;
+>>>>>>> db98efb... rsourceManager registration with JobManager
 
 	private final HighAvailabilityServices highAvailabilityServices;
 
@@ -74,8 +88,12 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> impleme
 			SlotManager slotManager) {
 		super(rpcService);
 		this.highAvailabilityServices = checkNotNull(highAvailabilityServices);
+<<<<<<< HEAD
 		this.jobMasterGateways = new HashMap<>();
 		this.slotManager = slotManager;
+=======
+		this.jobMasters = new HashMap<>(16);
+>>>>>>> db98efb... rsourceManager registration with JobManager
 	}
 
 	@Override
@@ -95,8 +113,11 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> impleme
 	public void shutDown() {
 		try {
 			leaderElectionService.stop();
+			for(JobID jobID : jobMasters.keySet()) {
+				highAvailabilityServices.getJobMasterLeaderRetriever(jobID).stop();
+			}
 			super.shutDown();
-		} catch(Throwable e) {
+		} catch (Throwable e) {
 			log.error("A fatal error happened when shutdown the ResourceManager", e);
 			throw new RuntimeException("A fatal error happened when shutdown the ResourceManager", e);
 		}
@@ -115,24 +136,58 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> impleme
 	/**
 	 * Register a {@link JobMaster} at the resource manager.
 	 *
-	 * @param jobMasterRegistration Job master registration information
+	 * @param resourceManagerLeaderId The fencing token for the ResourceManager leader
+	 * @param jobMasterAddress        The address of the JobMaster that registers
+	 * @param jobID                   The Job ID of the JobMaster that registers
 	 * @return Future registration response
 	 */
 	@RpcMethod
+<<<<<<< HEAD
 	public Future<RegistrationResponse> registerJobMaster(JobMasterRegistration jobMasterRegistration) {
 		final Future<JobMasterGateway> jobMasterFuture =
 			getRpcService().connect(jobMasterRegistration.getAddress(), JobMasterGateway.class);
 		final JobID jobID = jobMasterRegistration.getJobID();
+=======
+	public Future<RegistrationResponse> registerJobMaster(UUID resourceManagerLeaderId, final String jobMasterAddress, final JobID jobID) {
+
+		if(!leaderSessionID.equals(resourceManagerLeaderId)) {
+			log.warn("Discard registration from JobMaster {} at ({}) because the expected leader session ID {} did not equal the received leader session ID  {}",
+				jobID, jobMasterAddress, leaderSessionID, resourceManagerLeaderId);
+			return Futures.failed(new LeaderSessionIDException(leaderSessionID, resourceManagerLeaderId));
+		}
+
+		Future<JobMasterGateway> jobMasterFuture = getRpcService().connect(jobMasterAddress, JobMasterGateway.class);
+>>>>>>> db98efb... rsourceManager registration with JobManager
 
 		return jobMasterFuture.thenApplyAsync(new ApplyFunction<JobMasterGateway, RegistrationResponse>() {
 			@Override
 			public RegistrationResponse apply(JobMasterGateway jobMasterGateway) {
+<<<<<<< HEAD
 				final JobMasterGateway existingGateway = jobMasterGateways.put(jobID, jobMasterGateway);
 				if (existingGateway != null) {
 					LOG.info("Replacing existing gateway {} for JobID {} with  {}.",
 						existingGateway, jobID, jobMasterGateway);
 				}
 				return new RegistrationResponse(true);
+=======
+				if (jobMasters.containsKey(jobID)) {
+					JobMasterRegistration jobMasterRegistration = new JobMasterRegistration(jobMasterGateway, jobMasters.get(jobID).getJobMasterLeaderSessionID());
+					jobMasters.put(jobID, jobMasterRegistration);
+					log.info("Replacing gateway for registered JobID {}.", jobID);
+				} else {
+					JobMasterRegistration jobMasterRegistration = new JobMasterRegistration(jobMasterGateway);
+					jobMasters.put(jobID, jobMasterRegistration);
+					try {
+						highAvailabilityServices.getJobMasterLeaderRetriever(jobID).start(new JobMasterLeaderListener(jobID));
+					} catch(Throwable e) {
+						log.warn("Decline registration from JobMaster {} at ({}) because fail to get the leader retriever for the given job JobMaster",
+							jobID, jobMasterAddress);
+						return new RegistrationResponse.Decline("Fail to get the leader retriever for the given JobMaster");
+					}
+				}
+
+				return new JobMasterRegistrationSuccess(5000);
+>>>>>>> db98efb... rsourceManager registration with JobManager
 			}
 		}, getMainThreadExecutor());
 	}
@@ -158,26 +213,41 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> impleme
 
 
 	/**
-	 *
-	 * @param resourceManagerLeaderId  The fencing token for the ResourceManager leader 
-	 * @param taskExecutorAddress      The address of the TaskExecutor that registers
-	 * @param resourceID               The resource ID of the TaskExecutor that registers
-	 *
+	 * @param resourceManagerLeaderId The fencing token for the ResourceManager leader
+	 * @param taskExecutorAddress     The address of the TaskExecutor that registers
+	 * @param resourceID              The resource ID of the TaskExecutor that registers
 	 * @return The response by the ResourceManager.
 	 */
 	@RpcMethod
-	public org.apache.flink.runtime.registration.RegistrationResponse registerTaskExecutor(
-			UUID resourceManagerLeaderId,
-			String taskExecutorAddress,
-			ResourceID resourceID) {
+	public RegistrationResponse registerTaskExecutor(
+		UUID resourceManagerLeaderId,
+		String taskExecutorAddress,
+		ResourceID resourceID) {
 
 		return new TaskExecutorRegistrationSuccess(new InstanceID(), 5000);
 	}
 
 
+<<<<<<< HEAD
 	// ------------------------------------------------------------------------
 	//  Leader Contender
 	// ------------------------------------------------------------------------
+=======
+		/**
+		 * Callback method when current resourceManager lose leadership.
+		 */
+		@Override
+		public void revokeLeadership() {
+			runAsync(new Runnable() {
+				@Override
+				public void run() {
+					log.info("ResourceManager {} was revoked leadership.", getAddress());
+					jobMasters.clear();
+					leaderSessionID = null;
+				}
+			});
+		}
+>>>>>>> db98efb... rsourceManager registration with JobManager
 
 	/**
 	 * Callback method when current resourceManager is granted leadership
@@ -232,4 +302,35 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> impleme
 			}
 		});
 	}
+
+	private class JobMasterLeaderListener implements LeaderRetrievalListener {
+		private final JobID jobID;
+
+		private JobMasterLeaderListener(JobID jobID) {
+			this.jobID = jobID;
+		}
+
+		@Override
+		public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
+			runAsync(new Runnable() {
+				@Override
+				public void run() {
+					log.info("A new leader for JobMaster {} is elected, address is {}, leaderSessionID is {}", jobID, leaderAddress, leaderSessionID);
+					// update job master leader session id
+					JobMasterRegistration jobMasterRegistration = jobMasters.get(jobID);
+					jobMasterRegistration.setJobMasterLeaderSessionID(leaderSessionID);
+				}
+			});
+		}
+
+		@Override
+		public void handleError(final Exception exception) {
+			runAsync(new Runnable() {
+				@Override
+				public void run() {
+					log.error("JobMasterLeaderListener received an error from the LeaderRetrievalService", exception);
+				}
+			});
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8b2271bd/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index 5c8786c..1ee11a1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -18,12 +18,13 @@
 
 package org.apache.flink.runtime.resourcemanager;
 
-import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.registration.RegistrationResponse;
 
 import java.util.UUID;
 
@@ -35,21 +36,18 @@ public interface ResourceManagerGateway extends RpcGateway {
 	/**
 	 * Register a {@link JobMaster} at the resource manager.
 	 *
-	 * @param jobMasterRegistration Job master registration information
-	 * @param timeout Timeout for the future to complete
+	 * @param resourceManagerLeaderId The fencing token for the ResourceManager leader
+	 * @param jobMasterAddress        The address of the JobMaster that registers
+	 * @param jobID                   The Job ID of the JobMaster that registers
+	 * @param timeout                 Timeout for the future to complete
 	 * @return Future registration response
 	 */
 	Future<RegistrationResponse> registerJobMaster(
-		JobMasterRegistration jobMasterRegistration,
-		@RpcTimeout Time timeout);
+		UUID resourceManagerLeaderId,
+		String jobMasterAddress,
+		JobID jobID,
+				@RpcTimeout Time timeout);
 
-	/**
-	 * Register a {@link JobMaster} at the resource manager.
-	 *
-	 * @param jobMasterRegistration Job master registration information
-	 * @return Future registration response
-	 */
-	Future<RegistrationResponse> registerJobMaster(JobMasterRegistration jobMasterRegistration);
 
 	/**
 	 * Requests a slot from the resource manager.
@@ -60,15 +58,13 @@ public interface ResourceManagerGateway extends RpcGateway {
 	Future<SlotRequestReply> requestSlot(SlotRequest slotRequest);
 
 	/**
-	 * 
-	 * @param resourceManagerLeaderId  The fencing token for the ResourceManager leader 
-	 * @param taskExecutorAddress      The address of the TaskExecutor that registers
-	 * @param resourceID               The resource ID of the TaskExecutor that registers
-	 * @param timeout                  The timeout for the response.
-	 * 
+	 * @param resourceManagerLeaderId The fencing token for the ResourceManager leader
+	 * @param taskExecutorAddress     The address of the TaskExecutor that registers
+	 * @param resourceID              The resource ID of the TaskExecutor that registers
+	 * @param timeout                 The timeout for the response.
 	 * @return The future to the response by the ResourceManager.
 	 */
-	Future<org.apache.flink.runtime.registration.RegistrationResponse> registerTaskExecutor(
+	Future<RegistrationResponse> registerTaskExecutor(
 			UUID resourceManagerLeaderId,
 			String taskExecutorAddress,
 			ResourceID resourceID,

http://git-wip-us.apache.org/repos/asf/flink/blob/8b2271bd/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/LeaderSessionIDException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/LeaderSessionIDException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/LeaderSessionIDException.java
new file mode 100644
index 0000000..cd14a0d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/LeaderSessionIDException.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.exceptions;
+
+import java.util.UUID;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An exception specifying that the received leader session ID is not the same as expected.
+ */
+public class LeaderSessionIDException extends Exception {
+
+	private static final long serialVersionUID = -3276145308053264636L;
+
+	/** expected leader session id */
+	private final UUID expectedLeaderSessionID;
+
+	/** actual leader session id */
+	private final UUID actualLeaderSessionID;
+
+	public LeaderSessionIDException(UUID expectedLeaderSessionID, UUID actualLeaderSessionID) {
+		super("Unmatched leader session ID : expected " + expectedLeaderSessionID + ", actual " + actualLeaderSessionID);
+		this.expectedLeaderSessionID =  checkNotNull(expectedLeaderSessionID);
+		this.actualLeaderSessionID = checkNotNull(actualLeaderSessionID);
+	}
+
+	/**
+	 * Get expected leader session id
+	 *
+	 * @return expect leader session id
+	 */
+	public UUID getExpectedLeaderSessionID() {
+		return expectedLeaderSessionID;
+	}
+
+	/**
+	 * Get actual leader session id
+	 *
+	 * @return actual leader session id
+	 */
+	public UUID getActualLeaderSessionID() {
+		return actualLeaderSessionID;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b2271bd/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index d84a6a9..cf709c8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -327,6 +327,11 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 			}
 
 			@Override
+			public LeaderRetrievalService getJobMasterLeaderRetriever(JobID jobID) throws Exception {
+				return null;
+			}
+
+			@Override
 			public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception {
 				return null;
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b2271bd/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
index 3162f40..2ac43be 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.highavailability;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.hadoop.shaded.org.jboss.netty.util.internal.ConcurrentHashMap;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 
@@ -30,6 +31,8 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
 
 	private volatile LeaderRetrievalService resourceManagerLeaderRetriever;
 
+	private ConcurrentHashMap<JobID, LeaderRetrievalService> jobMasterLeaderRetrievers = new ConcurrentHashMap<>();
+
 	private volatile LeaderElectionService jobMasterLeaderElectionService;
 
 	private volatile LeaderElectionService resourceManagerLeaderElectionService;
@@ -43,6 +46,10 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
 		this.resourceManagerLeaderRetriever = resourceManagerLeaderRetriever;
 	}
 
+	public void setJobMasterLeaderRetriever(JobID jobID, LeaderRetrievalService jobMasterLeaderRetriever) {
+		this.jobMasterLeaderRetrievers.put(jobID, jobMasterLeaderRetriever);
+	}
+
 	public void setJobMasterLeaderElectionService(LeaderElectionService leaderElectionService) {
 		this.jobMasterLeaderElectionService = leaderElectionService;
 	}
@@ -66,6 +73,16 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
 	}
 
 	@Override
+	public LeaderRetrievalService getJobMasterLeaderRetriever(JobID jobID) throws Exception {
+		LeaderRetrievalService service = this.jobMasterLeaderRetrievers.get(jobID);
+		if (service != null) {
+			return service;
+		} else {
+			throw new IllegalStateException("JobMasterLeaderRetriever has not been set");
+		}
+	}
+
+	@Override
 	public LeaderElectionService getJobMasterLeaderElectionService(JobID jobID) throws Exception {
 		LeaderElectionService service = jobMasterLeaderElectionService;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8b2271bd/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
new file mode 100644
index 0000000..4d04001
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.rpc.TestingSerialRpcService;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.rpc.exceptions.LeaderSessionIDException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.*;
+
+public class ResourceManagerTest {
+
+	private TestingSerialRpcService rpcService;
+
+	@Before
+	public void setup() throws Exception {
+		rpcService = new TestingSerialRpcService();
+	}
+
+	@After
+	public void teardown() throws Exception {
+		rpcService.stopService();
+	}
+
+	/**
+	 * Test receive normal registration from job master and receive duplicate registration from job master
+	 *
+	 * @throws Exception
+	 */
+	@Test
+	public void testRegisterJobMaster() throws Exception {
+		String jobMasterAddress = "/jobMasterAddress1";
+		JobID jobID = mockJobMaster(jobMasterAddress);
+		TestingLeaderElectionService resourceManagerLeaderElectionService = new TestingLeaderElectionService();
+		TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService();
+		final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService);
+		final UUID leaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
+
+		// test response successful
+		Future<RegistrationResponse> successfulFuture = resourceManager.registerJobMaster(leaderSessionId, jobMasterAddress, jobID);
+		RegistrationResponse response = Await.result(successfulFuture, new FiniteDuration(0, TimeUnit.SECONDS));
+		assertTrue(response instanceof JobMasterRegistrationSuccess);
+	}
+
+	/**
+	 * Test receive registration with unmatched leadershipId from job master
+	 *
+	 * @throws Exception
+	 */
+	@Test(expected = LeaderSessionIDException.class)
+	public void testRegisterJobMasterWithUnmatchedLeaderSessionId() throws Exception {
+		String jobMasterAddress = "/jobMasterAddress1";
+		JobID jobID = mockJobMaster(jobMasterAddress);
+		TestingLeaderElectionService resourceManagerLeaderElectionService = new TestingLeaderElectionService();
+		TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService();
+		final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService);
+		final UUID leaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
+
+		// test throw exception when receive a registration from job master which takes unmatched leaderSessionId
+		UUID differentLeaderSessionID = UUID.randomUUID();
+		Future<RegistrationResponse> unMatchedLeaderFuture = resourceManager.registerJobMaster(differentLeaderSessionID, jobMasterAddress, jobID);
+		Await.result(unMatchedLeaderFuture, new FiniteDuration(200, TimeUnit.MILLISECONDS));
+	}
+
+	/**
+	 * Test receive registration with invalid address from job master
+	 *
+	 * @throws Exception
+	 */
+	@Test(expected = Exception.class)
+	public void testRegisterJobMasterFromInvalidAddress() throws Exception {
+		String jobMasterAddress = "/jobMasterAddress1";
+		JobID jobID = mockJobMaster(jobMasterAddress);
+		TestingLeaderElectionService resourceManagerLeaderElectionService = new TestingLeaderElectionService();
+		TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService();
+		final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService);
+		final UUID leaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
+
+		// test throw exception when receive a registration from job master which takes invalid address
+		String invalidAddress = "/jobMasterAddress2";
+		Future<RegistrationResponse> invalidAddressFuture = resourceManager.registerJobMaster(leaderSessionId, invalidAddress, jobID);
+		Await.result(invalidAddressFuture, new FiniteDuration(200, TimeUnit.MILLISECONDS));
+	}
+
+	/**
+	 * Check and verify return RegistrationResponse.Decline when failed to start a job master Leader retrieval listener
+	 *
+	 * @throws Exception
+	 */
+	@Test
+	public void testRegisterJobMasterWithFailureLeaderListener() throws Exception {
+		String jobMasterAddress = "/jobMasterAddress1";
+		JobID jobID = mockJobMaster(jobMasterAddress);
+		TestingLeaderElectionService resourceManagerLeaderElectionService = new TestingLeaderElectionService();
+		TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService();
+		final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService);
+		final UUID leaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
+
+		JobID unknownJobIDToHAServices = new JobID();
+		// verify return RegistrationResponse.Decline when failed to start a job master Leader retrieval listener
+		Future<RegistrationResponse> declineFuture = resourceManager.registerJobMaster(leaderSessionId, jobMasterAddress, unknownJobIDToHAServices);
+		RegistrationResponse response = Await.result(declineFuture, new FiniteDuration(0, TimeUnit.SECONDS));
+		assertTrue(response instanceof RegistrationResponse.Decline);
+	}
+
+	private JobID mockJobMaster(String jobMasterAddress) {
+		JobID jobID = new JobID();
+		JobMasterGateway jobMasterGateway = mock(JobMasterGateway.class);
+		rpcService.registerGateway(jobMasterAddress, jobMasterGateway);
+		return jobID;
+	}
+
+	private ResourceManager createAndStartResourceManager(TestingLeaderElectionService resourceManagerLeaderElectionService, JobID jobID, TestingLeaderRetrievalService jobMasterLeaderRetrievalService) {
+		TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
+		highAvailabilityServices.setResourceManagerLeaderElectionService(resourceManagerLeaderElectionService);
+		highAvailabilityServices.setJobMasterLeaderRetriever(jobID, jobMasterLeaderRetrievalService);
+		ResourceManager resourceManager = new ResourceManager(rpcService, highAvailabilityServices);
+		resourceManager.start();
+		return resourceManager;
+	}
+
+	private UUID grantResourceManagerLeadership(TestingLeaderElectionService resourceManagerLeaderElectionService) {
+		UUID leaderSessionId = UUID.randomUUID();
+		resourceManagerLeaderElectionService.isLeader(leaderSessionId);
+		return leaderSessionId;
+	}
+
+}


[3/4] flink git commit: [FLINK-4535] [cluster management] resourceManager process the registration from TaskExecutor

Posted by mx...@apache.org.
[FLINK-4535] [cluster management] resourceManager process the registration from TaskExecutor


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a31cf004
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a31cf004
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a31cf004

Branch: refs/heads/flip-6
Commit: a31cf00438e018d05c5890af142d00b4a8b51b78
Parents: b905fdb
Author: beyond1920 <be...@126.com>
Authored: Thu Sep 1 11:14:00 2016 +0800
Committer: Maximilian Michels <mx...@apache.org>
Committed: Wed Sep 21 21:57:04 2016 +0200

----------------------------------------------------------------------
 .../resourcemanager/RegistrationResponse.java   |  36 ---
 .../resourcemanager/ResourceManager.java        | 288 ++++++++-----------
 .../resourcemanager/ResourceManagerGateway.java |  45 +--
 .../TaskExecutorRegistration.java               |  51 ++++
 .../exceptions/LeaderSessionIDException.java    |   1 +
 .../resourcemanager/ResourceManagerTest.java    | 119 ++++----
 6 files changed, 241 insertions(+), 299 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a31cf004/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/RegistrationResponse.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/RegistrationResponse.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/RegistrationResponse.java
deleted file mode 100644
index 796e634..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/RegistrationResponse.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager;
-
-import java.io.Serializable;
-
-public class RegistrationResponse implements Serializable {
-	private static final long serialVersionUID = -2379003255993119993L;
-
-	private final boolean isSuccess;
-
-	public RegistrationResponse(boolean isSuccess) {
-		this.isSuccess = isSuccess;
-	}
-
-	public boolean isSuccess() {
-		return isSuccess;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a31cf004/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index aae4874..15692b6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -18,41 +18,29 @@
 
 package org.apache.flink.runtime.resourcemanager;
 
+import akka.dispatch.Futures;
+import akka.dispatch.Mapper;
+
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.ApplyFunction;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.instance.InstanceID;
-import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
 import org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.jobmaster.JobMaster;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.rpc.exceptions.LeaderSessionIDException;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
 import org.apache.flink.runtime.registration.RegistrationResponse;
-
-import org.apache.flink.runtime.concurrent.Future;
-
-import org.apache.flink.runtime.util.LeaderConnectionInfo;
-import org.apache.flink.runtime.util.LeaderRetrievalUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.duration.FiniteDuration;
+import scala.concurrent.Future;
 
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
-import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -62,35 +50,25 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  *
  * It offers the following methods as part of its rpc interface to interact with the him remotely:
  * <ul>
- *     <li>{@link #registerJobMaster(UUID, String, JobID)} registers a {@link JobMaster} at the resource manager</li>
+ *     <li>{@link #registerJobMaster(JobMasterRegistration)} registers a {@link JobMaster} at the resource manager</li>
  *     <li>{@link #requestSlot(SlotRequest)} requests a slot from the resource manager</li>
  * </ul>
  */
-public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> implements LeaderContender {
-
-	private final Logger LOG = LoggerFactory.getLogger(getClass());
+public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
+	private final Map<JobMasterGateway, InstanceID> jobMasterGateways;
 
-	private final Map<JobID, JobMasterGateway> jobMasterGateways;
-
-	private final Set<LeaderRetrievalListener> jobMasterLeaderRetrievalListeners;
+	/** ResourceID and TaskExecutorRegistration mapping relationship of registered taskExecutors */
+	private final Map<ResourceID, TaskExecutorRegistration>  startedTaskExecutorGateways;
 
 	private final HighAvailabilityServices highAvailabilityServices;
+	private LeaderElectionService leaderElectionService = null;
+	private UUID leaderSessionID = null;
 
-	private LeaderElectionService leaderElectionService;
-
-	private final SlotManager slotManager;
-
-	private UUID leaderSessionID;
-
-	public ResourceManager(
-			RpcService rpcService,
-			HighAvailabilityServices highAvailabilityServices,
-			SlotManager slotManager) {
+	public ResourceManager(RpcService rpcService, HighAvailabilityServices highAvailabilityServices) {
 		super(rpcService);
 		this.highAvailabilityServices = checkNotNull(highAvailabilityServices);
-		this.jobMasterGateways = new HashMap<>();
-		this.slotManager = slotManager;
-		this.jobMasterLeaderRetrievalListeners = new HashSet<>();
+		this.jobMasterGateways = new HashMap<>(16);
+		this.startedTaskExecutorGateways = new HashMap<>(16);
 	}
 
 	@Override
@@ -99,7 +77,7 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> impleme
 		try {
 			super.start();
 			leaderElectionService = highAvailabilityServices.getResourceManagerLeaderElectionService();
-			leaderElectionService.start(this);
+			leaderElectionService.start(new ResourceManagerLeaderContender());
 		} catch (Throwable e) {
 			log.error("A fatal error happened when starting the ResourceManager", e);
 			throw new RuntimeException("A fatal error happened when starting the ResourceManager", e);
@@ -110,11 +88,8 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> impleme
 	public void shutDown() {
 		try {
 			leaderElectionService.stop();
-			for(JobID jobID : jobMasterGateways.keySet()) {
-				highAvailabilityServices.getJobMasterLeaderRetriever(jobID).stop();
-			}
 			super.shutDown();
-		} catch (Throwable e) {
+		} catch(Throwable e) {
 			log.error("A fatal error happened when shutdown the ResourceManager", e);
 			throw new RuntimeException("A fatal error happened when shutdown the ResourceManager", e);
 		}
@@ -127,78 +102,34 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> impleme
 	 */
 	@VisibleForTesting
 	UUID getLeaderSessionID() {
-		return this.leaderSessionID;
+		return leaderSessionID;
 	}
 
 	/**
 	 * Register a {@link JobMaster} at the resource manager.
 	 *
-	 * @param resourceManagerLeaderId The fencing token for the ResourceManager leader
-	 * @param jobMasterAddress        The address of the JobMaster that registers
-	 * @param jobID                   The Job ID of the JobMaster that registers
+	 * @param jobMasterRegistration Job master registration information
 	 * @return Future registration response
 	 */
 	@RpcMethod
-	public Future<RegistrationResponse> registerJobMaster(
-		final UUID resourceManagerLeaderId, final UUID jobMasterLeaderId,
-		final String jobMasterAddress, final JobID jobID) {
-
-		checkNotNull(resourceManagerLeaderId);
-		checkNotNull(jobMasterAddress);
-		checkNotNull(jobID);
+	public Future<RegistrationResponse> registerJobMaster(JobMasterRegistration jobMasterRegistration) {
+		Future<JobMasterGateway> jobMasterFuture = getRpcService().connect(jobMasterRegistration.getAddress(), JobMasterGateway.class);
 
-		// TODO mxm The leader retrieval needs to be split up in an async part which runs outside the main execution thread
-		// The state updates should be performed inside the main thread
-
-		final FlinkCompletableFuture<RegistrationResponse> future = new FlinkCompletableFuture<>();
-
-		if(!leaderSessionID.equals(resourceManagerLeaderId)) {
-			log.warn("Discard registration from JobMaster {} at ({}) because the expected leader session ID {}" +
-					" did not equal the received leader session ID  {}",
-				jobID, jobMasterAddress, leaderSessionID, resourceManagerLeaderId);
-			future.complete(new RegistrationResponse.Decline("Invalid leader session id"));
-			return future;
-		}
-
-		final LeaderConnectionInfo jobMasterLeaderInfo;
-		try {
-			jobMasterLeaderInfo = LeaderRetrievalUtils.retrieveLeaderConnectionInfo(
-				highAvailabilityServices.getJobMasterLeaderRetriever(jobID), new FiniteDuration(5, TimeUnit.SECONDS));
-		} catch (Exception e) {
-			LOG.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID);
-			future.complete(new RegistrationResponse.Decline("Failed to retrieve JobMasterLeaderRetriever"));
-			return future;
-		}
-
-		if (!jobMasterLeaderId.equals(jobMasterLeaderInfo.getLeaderSessionID())) {
-			LOG.info("Declining registration request from non-leading JobManager {}", jobMasterAddress);
-			future.complete(new RegistrationResponse.Decline("JobManager is not leading"));
-			return future;
-		}
-
-		Future<JobMasterGateway> jobMasterGatewayFuture =
-			getRpcService().connect(jobMasterAddress, JobMasterGateway.class);
-
-		return jobMasterGatewayFuture.thenApplyAsync(new ApplyFunction<JobMasterGateway, RegistrationResponse>() {
+		return jobMasterFuture.map(new Mapper<JobMasterGateway, RegistrationResponse>() {
 			@Override
-			public RegistrationResponse apply(JobMasterGateway jobMasterGateway) {
-
-				final JobMasterLeaderListener jobMasterLeaderListener = new JobMasterLeaderListener(jobID);
-				try {
-					LeaderRetrievalService jobMasterLeaderRetriever = highAvailabilityServices.getJobMasterLeaderRetriever(jobID);
-					jobMasterLeaderRetriever.start(jobMasterLeaderListener);
-				} catch (Exception e) {
-					LOG.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID);
-					return new RegistrationResponse.Decline("Failed to retrieve JobMasterLeaderRetriever");
-				}
-				jobMasterLeaderRetrievalListeners.add(jobMasterLeaderListener);
-				final JobMasterGateway existingGateway = jobMasterGateways.put(jobID, jobMasterGateway);
-				if (existingGateway != null) {
-					log.info("Replacing gateway for registered JobID {}.", jobID);
+			public RegistrationResponse apply(final JobMasterGateway jobMasterGateway) {
+				InstanceID instanceID;
+
+				if (jobMasterGateways.containsKey(jobMasterGateway)) {
+					instanceID = jobMasterGateways.get(jobMasterGateway);
+				} else {
+					instanceID = new InstanceID();
+					jobMasterGateways.put(jobMasterGateway, instanceID);
 				}
-				return new JobMasterRegistrationSuccess(5000);
+
+				return new TaskExecutorRegistrationSuccess(instanceID, 5000);
 			}
-		}, getMainThreadExecutor());
+		}, getMainThreadExecutionContext());
 	}
 
 	/**
@@ -208,104 +139,111 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> impleme
 	 * @return Slot assignment
 	 */
 	@RpcMethod
-	public SlotRequestReply requestSlot(SlotRequest slotRequest) {
-		final JobID jobId = slotRequest.getJobId();
-		final JobMasterGateway jobMasterGateway = jobMasterGateways.get(jobId);
-
-		if (jobMasterGateway != null) {
-			return slotManager.requestSlot(slotRequest);
-		} else {
-			LOG.info("Ignoring slot request for unknown JobMaster with JobID {}", jobId);
-			return new SlotRequestRejected(slotRequest.getAllocationId());
-		}
+	public SlotAssignment requestSlot(SlotRequest slotRequest) {
+		System.out.println("SlotRequest: " + slotRequest);
+		return new SlotAssignment();
 	}
 
 
 	/**
-	 * @param resourceManagerLeaderId The fencing token for the ResourceManager leader
-	 * @param taskExecutorAddress     The address of the TaskExecutor that registers
-	 * @param resourceID              The resource ID of the TaskExecutor that registers
+	 * Register a {@link org.apache.flink.runtime.taskexecutor.TaskExecutor} at the resource manager
+	 *
+	 * @param resourceManagerLeaderId  The fencing token for the ResourceManager leader
+	 * @param taskExecutorAddress      The address of the TaskExecutor that registers
+	 * @param resourceID               The resource ID of the TaskExecutor that registers
+	 *
 	 * @return The response by the ResourceManager.
 	 */
 	@RpcMethod
-	public RegistrationResponse registerTaskExecutor(
-		UUID resourceManagerLeaderId,
-		String taskExecutorAddress,
-		ResourceID resourceID) {
+	public Future<RegistrationResponse> registerTaskExecutor(
+		final UUID resourceManagerLeaderId,
+		final String taskExecutorAddress,
+		final ResourceID resourceID) {
 
-		return new TaskExecutorRegistrationSuccess(new InstanceID(), 5000);
-	}
+		if(!leaderSessionID.equals(resourceManagerLeaderId)) {
+			log.warn("Discard registration from TaskExecutor {} at ({}) because the expected leader session ID {} did not equal the received leader session ID  {}",
+				resourceID, taskExecutorAddress, leaderSessionID, resourceManagerLeaderId);
+			return Futures.failed(new LeaderSessionIDException(leaderSessionID, resourceManagerLeaderId));
+		}
 
+		Future<TaskExecutorGateway> taskExecutorGatewayFuture = getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class);
 
-	// ------------------------------------------------------------------------
-	//  Leader Contender
-	// ------------------------------------------------------------------------
+		return taskExecutorGatewayFuture.map(new Mapper<TaskExecutorGateway, RegistrationResponse>() {
 
-	/**
-	 * Callback method when current resourceManager is granted leadership
-	 *
-	 * @param leaderSessionID unique leadershipID
-	 */
-	@Override
-	public void grantLeadership(final UUID leaderSessionID) {
-		runAsync(new Runnable() {
 			@Override
-			public void run() {
-				log.info("ResourceManager {} was granted leadership with leader session ID {}", getAddress(), leaderSessionID);
-				// confirming the leader session ID might be blocking,
-				leaderElectionService.confirmLeaderSessionID(leaderSessionID);
-				// notify SlotManager
-				slotManager.setLeaderUUID(leaderSessionID);
-				ResourceManager.this.leaderSessionID = leaderSessionID;
-			}
-		});
-	}
+			public RegistrationResponse apply(final TaskExecutorGateway taskExecutorGateway) {
+				InstanceID instanceID = null;
+				TaskExecutorRegistration taskExecutorRegistration = startedTaskExecutorGateways.get(resourceID);
+				if(taskExecutorRegistration != null) {
+					log.warn("Receive a duplicate registration from TaskExecutor {} at ({})", resourceID, taskExecutorAddress);
+					instanceID = taskExecutorRegistration.getInstanceID();
+				} else {
+					instanceID = new InstanceID();
+					startedTaskExecutorGateways.put(resourceID, new TaskExecutorRegistration(taskExecutorGateway, instanceID));
+				}
 
-	/**
-	 * Callback method when current resourceManager lose leadership.
-	 */
-	@Override
-	public void revokeLeadership() {
-		runAsync(new Runnable() {
-			@Override
-			public void run() {
-				log.info("ResourceManager {} was revoked leadership.", getAddress());
-				jobMasterGateways.clear();
-				slotManager.clearState();
-				leaderSessionID = null;
+				return new TaskExecutorRegistrationSuccess(instanceID, 5000);
 			}
-		});
+		}, getMainThreadExecutionContext());
 	}
 
-	/**
-	 * Handles error occurring in the leader election service
-	 *
-	 * @param exception Exception being thrown in the leader election service
-	 */
-	@Override
-	public void handleError(final Exception exception) {
-		log.error("ResourceManager received an error from the LeaderElectionService.", exception);
-		// terminate ResourceManager in case of an error
-		shutDown();
-	}
 
-	private static class JobMasterLeaderListener implements LeaderRetrievalListener {
+	private class ResourceManagerLeaderContender implements LeaderContender {
 
-		private final JobID jobID;
-		private UUID leaderID;
+		/**
+		 * Callback method when current resourceManager is granted leadership
+		 *
+		 * @param leaderSessionID unique leadershipID
+		 */
+		@Override
+		public void grantLeadership(final UUID leaderSessionID) {
+			runAsync(new Runnable() {
+				@Override
+				public void run() {
+					log.info("ResourceManager {} was granted leadership with leader session ID {}", getAddress(), leaderSessionID);
+					ResourceManager.this.leaderSessionID = leaderSessionID;
+					// confirming the leader session ID might be blocking,
+					leaderElectionService.confirmLeaderSessionID(leaderSessionID);
+				}
+			});
+		}
 
-		private JobMasterLeaderListener(JobID jobID) {
-			this.jobID = jobID;
+		/**
+		 * Callback method when current resourceManager lose leadership.
+		 */
+		@Override
+		public void revokeLeadership() {
+			runAsync(new Runnable() {
+				@Override
+				public void run() {
+					log.info("ResourceManager {} was revoked leadership.", getAddress());
+					jobMasterGateways.clear();
+					startedTaskExecutorGateways.clear();
+					leaderSessionID = null;
+				}
+			});
 		}
 
 		@Override
-		public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
-			this.leaderID = leaderSessionID;
+		public String getAddress() {
+			return ResourceManager.this.getAddress();
 		}
 
+		/**
+		 * Handles error occurring in the leader election service
+		 *
+		 * @param exception Exception being thrown in the leader election service
+		 */
 		@Override
 		public void handleError(final Exception exception) {
-			// TODO
+			runAsync(new Runnable() {
+				@Override
+				public void run() {
+					log.error("ResourceManager received an error from the LeaderElectionService.", exception);
+					// terminate ResourceManager in case of an error
+					shutDown();
+				}
+			});
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a31cf004/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index 1ee11a1..30a096f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -18,13 +18,14 @@
 
 package org.apache.flink.runtime.resourcemanager;
 
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.jobmaster.JobMaster;
+
 import org.apache.flink.runtime.registration.RegistrationResponse;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
 
 import java.util.UUID;
 
@@ -36,18 +37,21 @@ public interface ResourceManagerGateway extends RpcGateway {
 	/**
 	 * Register a {@link JobMaster} at the resource manager.
 	 *
-	 * @param resourceManagerLeaderId The fencing token for the ResourceManager leader
-	 * @param jobMasterAddress        The address of the JobMaster that registers
-	 * @param jobID                   The Job ID of the JobMaster that registers
-	 * @param timeout                 Timeout for the future to complete
+	 * @param jobMasterRegistration Job master registration information
+	 * @param timeout Timeout for the future to complete
 	 * @return Future registration response
 	 */
 	Future<RegistrationResponse> registerJobMaster(
-		UUID resourceManagerLeaderId,
-		String jobMasterAddress,
-		JobID jobID,
-				@RpcTimeout Time timeout);
+		JobMasterRegistration jobMasterRegistration,
+		@RpcTimeout FiniteDuration timeout);
 
+	/**
+	 * Register a {@link JobMaster} at the resource manager.
+	 *
+	 * @param jobMasterRegistration Job master registration information
+	 * @return Future registration response
+	 */
+	Future<RegistrationResponse> registerJobMaster(JobMasterRegistration jobMasterRegistration);
 
 	/**
 	 * Requests a slot from the resource manager.
@@ -55,18 +59,21 @@ public interface ResourceManagerGateway extends RpcGateway {
 	 * @param slotRequest Slot request
 	 * @return Future slot assignment
 	 */
-	Future<SlotRequestReply> requestSlot(SlotRequest slotRequest);
+	Future<SlotAssignment> requestSlot(SlotRequest slotRequest);
 
 	/**
-	 * @param resourceManagerLeaderId The fencing token for the ResourceManager leader
-	 * @param taskExecutorAddress     The address of the TaskExecutor that registers
-	 * @param resourceID              The resource ID of the TaskExecutor that registers
-	 * @param timeout                 The timeout for the response.
+	 * Register a {@link org.apache.flink.runtime.taskexecutor.TaskExecutor} at the resource manager.
+	 *
+	 * @param resourceManagerLeaderId  The fencing token for the ResourceManager leader
+	 * @param taskExecutorAddress      The address of the TaskExecutor that registers
+	 * @param resourceID               The resource ID of the TaskExecutor that registers
+	 * @param timeout                  The timeout for the response.
+	 *
 	 * @return The future to the response by the ResourceManager.
 	 */
 	Future<RegistrationResponse> registerTaskExecutor(
-			UUID resourceManagerLeaderId,
-			String taskExecutorAddress,
-			ResourceID resourceID,
-			@RpcTimeout Time timeout);
+		UUID resourceManagerLeaderId,
+		String taskExecutorAddress,
+		ResourceID resourceID,
+		@RpcTimeout FiniteDuration timeout);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a31cf004/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java
new file mode 100644
index 0000000..bd78a47
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+
+import java.io.Serializable;
+
+/**
+ * This class is responsible for group the TaskExecutorGateway and the InstanceID of a registered task executor.
+ */
+public class TaskExecutorRegistration implements Serializable {
+
+	private static final long serialVersionUID = -2062957799469434614L;
+
+	private TaskExecutorGateway taskExecutorGateway;
+
+	private InstanceID instanceID;
+
+	public TaskExecutorRegistration(TaskExecutorGateway taskExecutorGateway,
+		InstanceID instanceID) {
+		this.taskExecutorGateway = taskExecutorGateway;
+		this.instanceID = instanceID;
+	}
+
+	public InstanceID getInstanceID() {
+		return instanceID;
+	}
+
+	public TaskExecutorGateway getTaskExecutorGateway() {
+		return taskExecutorGateway;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a31cf004/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/LeaderSessionIDException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/LeaderSessionIDException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/LeaderSessionIDException.java
index cd14a0d..d3ba9a9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/LeaderSessionIDException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/LeaderSessionIDException.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.rpc.exceptions;
 
 import java.util.UUID;
+
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/a31cf004/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
index 4d04001..b75d9b8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
@@ -18,15 +18,14 @@
 
 package org.apache.flink.runtime.resourcemanager;
 
-import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
-import org.apache.flink.runtime.jobmaster.JobMasterGateway;
-import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
-import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.runtime.rpc.TestingSerialRpcService;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.rpc.exceptions.LeaderSessionIDException;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -37,8 +36,9 @@ import scala.concurrent.duration.FiniteDuration;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.mock;
 
 public class ResourceManagerTest {
 
@@ -55,105 +55,86 @@ public class ResourceManagerTest {
 	}
 
 	/**
-	 * Test receive normal registration from job master and receive duplicate registration from job master
+	 * Test receive normal registration from task executor and receive duplicate registration from task executor
 	 *
 	 * @throws Exception
 	 */
 	@Test
-	public void testRegisterJobMaster() throws Exception {
-		String jobMasterAddress = "/jobMasterAddress1";
-		JobID jobID = mockJobMaster(jobMasterAddress);
-		TestingLeaderElectionService resourceManagerLeaderElectionService = new TestingLeaderElectionService();
-		TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService();
-		final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService);
-		final UUID leaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
+	public void testRegisterTaskExecutor() throws Exception {
+		String taskExecutorAddress = "/taskExecutor1";
+		ResourceID taskExecutorResourceID = mockTaskExecutor(taskExecutorAddress);
+		TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService();
+		final ResourceManager resourceManager = createAndStartResourceManager(leaderElectionService);
+		final UUID leaderSessionId = grantResourceManagerLeadership(leaderElectionService);
 
 		// test response successful
-		Future<RegistrationResponse> successfulFuture = resourceManager.registerJobMaster(leaderSessionId, jobMasterAddress, jobID);
+		Future<RegistrationResponse> successfulFuture = resourceManager.registerTaskExecutor(leaderSessionId, taskExecutorAddress, taskExecutorResourceID);
 		RegistrationResponse response = Await.result(successfulFuture, new FiniteDuration(0, TimeUnit.SECONDS));
-		assertTrue(response instanceof JobMasterRegistrationSuccess);
+		assertTrue(response instanceof TaskExecutorRegistrationSuccess);
+
+		// test response successful with previous instanceID when receive duplicate registration from taskExecutor
+		Future<RegistrationResponse> duplicateFuture = resourceManager.registerTaskExecutor(leaderSessionId, taskExecutorAddress, taskExecutorResourceID);
+		RegistrationResponse duplicateResponse = Await.result(duplicateFuture, new FiniteDuration(0, TimeUnit.SECONDS));
+		assertTrue(duplicateResponse instanceof TaskExecutorRegistrationSuccess);
+		assertEquals(((TaskExecutorRegistrationSuccess) response).getRegistrationId(), ((TaskExecutorRegistrationSuccess) duplicateResponse).getRegistrationId());
 	}
 
 	/**
-	 * Test receive registration with unmatched leadershipId from job master
+	 * Test receive registration with unmatched leadershipId from task executor
 	 *
 	 * @throws Exception
 	 */
 	@Test(expected = LeaderSessionIDException.class)
-	public void testRegisterJobMasterWithUnmatchedLeaderSessionId() throws Exception {
-		String jobMasterAddress = "/jobMasterAddress1";
-		JobID jobID = mockJobMaster(jobMasterAddress);
-		TestingLeaderElectionService resourceManagerLeaderElectionService = new TestingLeaderElectionService();
-		TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService();
-		final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService);
-		final UUID leaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
-
-		// test throw exception when receive a registration from job master which takes unmatched leaderSessionId
+	public void testRegisterTaskExecutorWithUnmatchedLeaderSessionId() throws Exception {
+		String taskExecutorAddress = "/taskExecutor1";
+		ResourceID taskExecutorResourceID = mockTaskExecutor(taskExecutorAddress);
+		TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService();
+		final ResourceManager resourceManager = createAndStartResourceManager(leaderElectionService);
+		final UUID leaderSessionId = grantResourceManagerLeadership(leaderElectionService);
+
+		// test throw exception when receive a registration from taskExecutor which takes unmatched leaderSessionId
 		UUID differentLeaderSessionID = UUID.randomUUID();
-		Future<RegistrationResponse> unMatchedLeaderFuture = resourceManager.registerJobMaster(differentLeaderSessionID, jobMasterAddress, jobID);
+		Future<RegistrationResponse> unMatchedLeaderFuture = resourceManager.registerTaskExecutor(differentLeaderSessionID, taskExecutorAddress, taskExecutorResourceID);
 		Await.result(unMatchedLeaderFuture, new FiniteDuration(200, TimeUnit.MILLISECONDS));
 	}
 
 	/**
-	 * Test receive registration with invalid address from job master
+	 * Test receive registration with invalid address from task executor
 	 *
 	 * @throws Exception
 	 */
 	@Test(expected = Exception.class)
-	public void testRegisterJobMasterFromInvalidAddress() throws Exception {
-		String jobMasterAddress = "/jobMasterAddress1";
-		JobID jobID = mockJobMaster(jobMasterAddress);
-		TestingLeaderElectionService resourceManagerLeaderElectionService = new TestingLeaderElectionService();
-		TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService();
-		final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService);
-		final UUID leaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
-
-		// test throw exception when receive a registration from job master which takes invalid address
-		String invalidAddress = "/jobMasterAddress2";
-		Future<RegistrationResponse> invalidAddressFuture = resourceManager.registerJobMaster(leaderSessionId, invalidAddress, jobID);
+	public void testRegisterTaskExecutorFromInvalidAddress() throws Exception {
+		String taskExecutorAddress = "/taskExecutor1";
+		ResourceID taskExecutorResourceID = mockTaskExecutor(taskExecutorAddress);
+		TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService();
+		final ResourceManager resourceManager = createAndStartResourceManager(leaderElectionService);
+		final UUID leaderSessionId = grantResourceManagerLeadership(leaderElectionService);
+
+		// test throw exception when receive a registration from taskExecutor which takes invalid address
+		String invalidAddress = "/taskExecutor2";
+		Future<RegistrationResponse> invalidAddressFuture = resourceManager.registerTaskExecutor(leaderSessionId, invalidAddress, taskExecutorResourceID);
 		Await.result(invalidAddressFuture, new FiniteDuration(200, TimeUnit.MILLISECONDS));
 	}
 
-	/**
-	 * Check and verify return RegistrationResponse.Decline when failed to start a job master Leader retrieval listener
-	 *
-	 * @throws Exception
-	 */
-	@Test
-	public void testRegisterJobMasterWithFailureLeaderListener() throws Exception {
-		String jobMasterAddress = "/jobMasterAddress1";
-		JobID jobID = mockJobMaster(jobMasterAddress);
-		TestingLeaderElectionService resourceManagerLeaderElectionService = new TestingLeaderElectionService();
-		TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService();
-		final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService);
-		final UUID leaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
-
-		JobID unknownJobIDToHAServices = new JobID();
-		// verify return RegistrationResponse.Decline when failed to start a job master Leader retrieval listener
-		Future<RegistrationResponse> declineFuture = resourceManager.registerJobMaster(leaderSessionId, jobMasterAddress, unknownJobIDToHAServices);
-		RegistrationResponse response = Await.result(declineFuture, new FiniteDuration(0, TimeUnit.SECONDS));
-		assertTrue(response instanceof RegistrationResponse.Decline);
-	}
-
-	private JobID mockJobMaster(String jobMasterAddress) {
-		JobID jobID = new JobID();
-		JobMasterGateway jobMasterGateway = mock(JobMasterGateway.class);
-		rpcService.registerGateway(jobMasterAddress, jobMasterGateway);
-		return jobID;
+	private ResourceID mockTaskExecutor(String taskExecutorAddress) {
+		TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
+		ResourceID taskExecutorResourceID = ResourceID.generate();
+		rpcService.registerGateway(taskExecutorAddress, taskExecutorGateway);
+		return taskExecutorResourceID;
 	}
 
-	private ResourceManager createAndStartResourceManager(TestingLeaderElectionService resourceManagerLeaderElectionService, JobID jobID, TestingLeaderRetrievalService jobMasterLeaderRetrievalService) {
+	private ResourceManager createAndStartResourceManager(TestingLeaderElectionService leaderElectionService) {
 		TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
-		highAvailabilityServices.setResourceManagerLeaderElectionService(resourceManagerLeaderElectionService);
-		highAvailabilityServices.setJobMasterLeaderRetriever(jobID, jobMasterLeaderRetrievalService);
+		highAvailabilityServices.setResourceManagerLeaderElectionService(leaderElectionService);
 		ResourceManager resourceManager = new ResourceManager(rpcService, highAvailabilityServices);
 		resourceManager.start();
 		return resourceManager;
 	}
 
-	private UUID grantResourceManagerLeadership(TestingLeaderElectionService resourceManagerLeaderElectionService) {
+	private UUID grantResourceManagerLeadership(TestingLeaderElectionService leaderElectionService) {
 		UUID leaderSessionId = UUID.randomUUID();
-		resourceManagerLeaderElectionService.isLeader(leaderSessionId);
+		leaderElectionService.isLeader(leaderSessionId);
 		return leaderSessionId;
 	}
 


[2/4] flink git commit: [FLINK-4537] rebase and refine

Posted by mx...@apache.org.
[FLINK-4537] rebase and refine


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b905fdbf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b905fdbf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b905fdbf

Branch: refs/heads/flip-6
Commit: b905fdbf07433158b8bf1c6d2b5a172482e3c418
Parents: 8b2271b
Author: Maximilian Michels <mx...@apache.org>
Authored: Wed Sep 21 14:13:12 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Wed Sep 21 21:56:58 2016 +0200

----------------------------------------------------------------------
 .../resourcemanager/JobMasterRegistration.java  |  52 +++---
 .../resourcemanager/ResourceManager.java        | 165 ++++++++-----------
 .../slotmanager/SlotManager.java                |  29 +++-
 3 files changed, 110 insertions(+), 136 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b905fdbf/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java
index 7b8ec70..981441f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java
@@ -18,59 +18,47 @@
 
 package org.apache.flink.runtime.resourcemanager;
 
-<<<<<<< HEAD
 import org.apache.flink.api.common.JobID;
-=======
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
->>>>>>> db98efb... rsourceManager registration with JobManager
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 
-import java.io.Serializable;
 import java.util.UUID;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
 /**
  * This class is responsible for group the JobMasterGateway and the LeaderSessionID of a registered job master
  */
-public class JobMasterRegistration implements Serializable {
+public class JobMasterRegistration implements LeaderRetrievalListener {
 
-<<<<<<< HEAD
-	private final String address;
+	private final JobMasterGateway gateway;
 	private final JobID jobID;
+	private final UUID leaderSessionID;
+	private LeaderRetrievalListener retriever;
 
-	public JobMasterRegistration(String address, JobID jobID) {
-		this.address = address;
+	public JobMasterRegistration(JobMasterGateway gateway, JobID jobID, UUID leaderSessionID) {
+		this.gateway = gateway;
 		this.jobID = jobID;
-=======
-	private static final long serialVersionUID = -2316627821716999527L;
-
-	private final JobMasterGateway jobMasterGateway;
-
-	private UUID jobMasterLeaderSessionID;
-
-	public JobMasterRegistration(JobMasterGateway jobMasterGateway) {
-		this.jobMasterGateway = checkNotNull(jobMasterGateway);
+		this.leaderSessionID = leaderSessionID;
 	}
 
-	public JobMasterRegistration(JobMasterGateway jobMasterGateway, UUID jobMasterLeaderSessionID) {
-		this.jobMasterGateway = checkNotNull(jobMasterGateway);
-		this.jobMasterLeaderSessionID = jobMasterLeaderSessionID;
+	public JobMasterGateway getGateway() {
+		return gateway;
 	}
 
-	public JobMasterGateway getJobMasterGateway() {
-		return jobMasterGateway;
+	public UUID getLeaderSessionID() {
+		return leaderSessionID;
 	}
 
-	public void setJobMasterLeaderSessionID(UUID leaderSessionID) {
-		this.jobMasterLeaderSessionID = jobMasterLeaderSessionID;
->>>>>>> db98efb... rsourceManager registration with JobManager
+	public JobID getJobID() {
+		return jobID;
 	}
 
-	public UUID getJobMasterLeaderSessionID() {
-		return jobMasterLeaderSessionID;
+	@Override
+	public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) {
+		
 	}
 
-	public JobID getJobID() {
-		return jobID;
+	@Override
+	public void handleError(Exception exception) {
+
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b905fdbf/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 8be1455..aae4874 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.resourcemanager;
 
-import akka.dispatch.Futures;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -29,26 +28,31 @@ import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
 import org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
-<<<<<<< HEAD
-import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
-=======
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
->>>>>>> db98efb... rsourceManager registration with JobManager
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.jobmaster.JobMaster;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
-import org.apache.flink.runtime.rpc.exceptions.LeaderSessionIDException;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 
+import org.apache.flink.runtime.concurrent.Future;
+
+import org.apache.flink.runtime.util.LeaderConnectionInfo;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
 
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -62,17 +66,13 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  *     <li>{@link #requestSlot(SlotRequest)} requests a slot from the resource manager</li>
  * </ul>
  */
-<<<<<<< HEAD
 public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> implements LeaderContender {
 
 	private final Logger LOG = LoggerFactory.getLogger(getClass());
 
 	private final Map<JobID, JobMasterGateway> jobMasterGateways;
-=======
-public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
-	/** the mapping relationship of JobID and JobMasterGateway */
-	private final Map<JobID, JobMasterRegistration> jobMasters;
->>>>>>> db98efb... rsourceManager registration with JobManager
+
+	private final Set<LeaderRetrievalListener> jobMasterLeaderRetrievalListeners;
 
 	private final HighAvailabilityServices highAvailabilityServices;
 
@@ -88,12 +88,9 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
 			SlotManager slotManager) {
 		super(rpcService);
 		this.highAvailabilityServices = checkNotNull(highAvailabilityServices);
-<<<<<<< HEAD
 		this.jobMasterGateways = new HashMap<>();
 		this.slotManager = slotManager;
-=======
-		this.jobMasters = new HashMap<>(16);
->>>>>>> db98efb... rsourceManager registration with JobManager
+		this.jobMasterLeaderRetrievalListeners = new HashSet<>();
 	}
 
 	@Override
@@ -113,7 +110,7 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
 	public void shutDown() {
 		try {
 			leaderElectionService.stop();
-			for(JobID jobID : jobMasters.keySet()) {
+			for(JobID jobID : jobMasterGateways.keySet()) {
 				highAvailabilityServices.getJobMasterLeaderRetriever(jobID).stop();
 			}
 			super.shutDown();
@@ -142,52 +139,64 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
 	 * @return Future registration response
 	 */
 	@RpcMethod
-<<<<<<< HEAD
-	public Future<RegistrationResponse> registerJobMaster(JobMasterRegistration jobMasterRegistration) {
-		final Future<JobMasterGateway> jobMasterFuture =
-			getRpcService().connect(jobMasterRegistration.getAddress(), JobMasterGateway.class);
-		final JobID jobID = jobMasterRegistration.getJobID();
-=======
-	public Future<RegistrationResponse> registerJobMaster(UUID resourceManagerLeaderId, final String jobMasterAddress, final JobID jobID) {
+	public Future<RegistrationResponse> registerJobMaster(
+		final UUID resourceManagerLeaderId, final UUID jobMasterLeaderId,
+		final String jobMasterAddress, final JobID jobID) {
+
+		checkNotNull(resourceManagerLeaderId);
+		checkNotNull(jobMasterAddress);
+		checkNotNull(jobID);
+
+		// TODO mxm The leader retrieval needs to be split up in an async part which runs outside the main execution thread
+		// The state updates should be performed inside the main thread
+
+		final FlinkCompletableFuture<RegistrationResponse> future = new FlinkCompletableFuture<>();
 
 		if(!leaderSessionID.equals(resourceManagerLeaderId)) {
-			log.warn("Discard registration from JobMaster {} at ({}) because the expected leader session ID {} did not equal the received leader session ID  {}",
+			log.warn("Discard registration from JobMaster {} at ({}) because the expected leader session ID {}" +
+					" did not equal the received leader session ID  {}",
 				jobID, jobMasterAddress, leaderSessionID, resourceManagerLeaderId);
-			return Futures.failed(new LeaderSessionIDException(leaderSessionID, resourceManagerLeaderId));
+			future.complete(new RegistrationResponse.Decline("Invalid leader session id"));
+			return future;
 		}
 
-		Future<JobMasterGateway> jobMasterFuture = getRpcService().connect(jobMasterAddress, JobMasterGateway.class);
->>>>>>> db98efb... rsourceManager registration with JobManager
+		final LeaderConnectionInfo jobMasterLeaderInfo;
+		try {
+			jobMasterLeaderInfo = LeaderRetrievalUtils.retrieveLeaderConnectionInfo(
+				highAvailabilityServices.getJobMasterLeaderRetriever(jobID), new FiniteDuration(5, TimeUnit.SECONDS));
+		} catch (Exception e) {
+			LOG.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID);
+			future.complete(new RegistrationResponse.Decline("Failed to retrieve JobMasterLeaderRetriever"));
+			return future;
+		}
+
+		if (!jobMasterLeaderId.equals(jobMasterLeaderInfo.getLeaderSessionID())) {
+			LOG.info("Declining registration request from non-leading JobManager {}", jobMasterAddress);
+			future.complete(new RegistrationResponse.Decline("JobManager is not leading"));
+			return future;
+		}
 
-		return jobMasterFuture.thenApplyAsync(new ApplyFunction<JobMasterGateway, RegistrationResponse>() {
+		Future<JobMasterGateway> jobMasterGatewayFuture =
+			getRpcService().connect(jobMasterAddress, JobMasterGateway.class);
+
+		return jobMasterGatewayFuture.thenApplyAsync(new ApplyFunction<JobMasterGateway, RegistrationResponse>() {
 			@Override
 			public RegistrationResponse apply(JobMasterGateway jobMasterGateway) {
-<<<<<<< HEAD
+
+				final JobMasterLeaderListener jobMasterLeaderListener = new JobMasterLeaderListener(jobID);
+				try {
+					LeaderRetrievalService jobMasterLeaderRetriever = highAvailabilityServices.getJobMasterLeaderRetriever(jobID);
+					jobMasterLeaderRetriever.start(jobMasterLeaderListener);
+				} catch (Exception e) {
+					LOG.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID);
+					return new RegistrationResponse.Decline("Failed to retrieve JobMasterLeaderRetriever");
+				}
+				jobMasterLeaderRetrievalListeners.add(jobMasterLeaderListener);
 				final JobMasterGateway existingGateway = jobMasterGateways.put(jobID, jobMasterGateway);
 				if (existingGateway != null) {
-					LOG.info("Replacing existing gateway {} for JobID {} with  {}.",
-						existingGateway, jobID, jobMasterGateway);
-				}
-				return new RegistrationResponse(true);
-=======
-				if (jobMasters.containsKey(jobID)) {
-					JobMasterRegistration jobMasterRegistration = new JobMasterRegistration(jobMasterGateway, jobMasters.get(jobID).getJobMasterLeaderSessionID());
-					jobMasters.put(jobID, jobMasterRegistration);
 					log.info("Replacing gateway for registered JobID {}.", jobID);
-				} else {
-					JobMasterRegistration jobMasterRegistration = new JobMasterRegistration(jobMasterGateway);
-					jobMasters.put(jobID, jobMasterRegistration);
-					try {
-						highAvailabilityServices.getJobMasterLeaderRetriever(jobID).start(new JobMasterLeaderListener(jobID));
-					} catch(Throwable e) {
-						log.warn("Decline registration from JobMaster {} at ({}) because fail to get the leader retriever for the given job JobMaster",
-							jobID, jobMasterAddress);
-						return new RegistrationResponse.Decline("Fail to get the leader retriever for the given JobMaster");
-					}
 				}
-
 				return new JobMasterRegistrationSuccess(5000);
->>>>>>> db98efb... rsourceManager registration with JobManager
 			}
 		}, getMainThreadExecutor());
 	}
@@ -228,26 +237,9 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
 	}
 
 
-<<<<<<< HEAD
 	// ------------------------------------------------------------------------
 	//  Leader Contender
 	// ------------------------------------------------------------------------
-=======
-		/**
-		 * Callback method when current resourceManager lose leadership.
-		 */
-		@Override
-		public void revokeLeadership() {
-			runAsync(new Runnable() {
-				@Override
-				public void run() {
-					log.info("ResourceManager {} was revoked leadership.", getAddress());
-					jobMasters.clear();
-					leaderSessionID = null;
-				}
-			});
-		}
->>>>>>> db98efb... rsourceManager registration with JobManager
 
 	/**
 	 * Callback method when current resourceManager is granted leadership
@@ -263,7 +255,7 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
 				// confirming the leader session ID might be blocking,
 				leaderElectionService.confirmLeaderSessionID(leaderSessionID);
 				// notify SlotManager
-				slotManager.notifyLeaderAddress(getAddress(), leaderSessionID);
+				slotManager.setLeaderUUID(leaderSessionID);
 				ResourceManager.this.leaderSessionID = leaderSessionID;
 			}
 		});
@@ -279,7 +271,8 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
 			public void run() {
 				log.info("ResourceManager {} was revoked leadership.", getAddress());
 				jobMasterGateways.clear();
-				ResourceManager.this.leaderSessionID = null;
+				slotManager.clearState();
+				leaderSessionID = null;
 			}
 		});
 	}
@@ -291,20 +284,15 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
 	 */
 	@Override
 	public void handleError(final Exception exception) {
-		runAsync(new Runnable() {
-			@Override
-			public void run() {
-				log.error("ResourceManager received an error from the LeaderElectionService.", exception);
-				// notify SlotManager
-				slotManager.handleError(exception);
-				// terminate ResourceManager in case of an error
-				shutDown();
-			}
-		});
+		log.error("ResourceManager received an error from the LeaderElectionService.", exception);
+		// terminate ResourceManager in case of an error
+		shutDown();
 	}
 
-	private class JobMasterLeaderListener implements LeaderRetrievalListener {
+	private static class JobMasterLeaderListener implements LeaderRetrievalListener {
+
 		private final JobID jobID;
+		private UUID leaderID;
 
 		private JobMasterLeaderListener(JobID jobID) {
 			this.jobID = jobID;
@@ -312,25 +300,12 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
 
 		@Override
 		public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
-			runAsync(new Runnable() {
-				@Override
-				public void run() {
-					log.info("A new leader for JobMaster {} is elected, address is {}, leaderSessionID is {}", jobID, leaderAddress, leaderSessionID);
-					// update job master leader session id
-					JobMasterRegistration jobMasterRegistration = jobMasters.get(jobID);
-					jobMasterRegistration.setJobMasterLeaderSessionID(leaderSessionID);
-				}
-			});
+			this.leaderID = leaderSessionID;
 		}
 
 		@Override
 		public void handleError(final Exception exception) {
-			runAsync(new Runnable() {
-				@Override
-				public void run() {
-					log.error("JobMasterLeaderListener received an error from the LeaderRetrievalService", exception);
-				}
-			});
+			// TODO
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b905fdbf/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index 97176b2..5d0013c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -59,7 +59,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * </ul>
  * <b>IMPORTANT:</b> This class is <b>Not Thread-safe</b>.
  */
-public abstract class SlotManager implements LeaderRetrievalListener {
+public abstract class SlotManager {
 
 	protected final Logger LOG = LoggerFactory.getLogger(getClass());
 
@@ -514,22 +514,33 @@ public abstract class SlotManager implements LeaderRetrievalListener {
 		public int size() {
 			return allocatedSlots.size();
 		}
+
+		public void clear() {
+			allocatedSlots.clear();
+			allocatedSlotsByAllocationId.clear();
+		}
+	}
+
+	/**
+	 * Clears the state of the SlotManager after leadership revokal
+	 */
+	public void clearState() {
+		taskManagerGateways.clear();
+		registeredSlots.clear();
+		pendingSlotRequests.clear();
+		freeSlots.clear();
+		allocationMap.clear();
+		leaderID = null;
 	}
 
 	// ------------------------------------------------------------------------
-	//  High availability
+	//  High availability (called by the ResourceManager)
 	// ------------------------------------------------------------------------
 
-	@Override
-	public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) {
+	public void setLeaderUUID(UUID leaderSessionID) {
 		this.leaderID = leaderSessionID;
 	}
 
-	@Override
-	public void handleError(Exception exception) {
-		LOG.error("Slot Manager received an error from the leader service", exception);
-	}
-
 	// ------------------------------------------------------------------------
 	//  Testing utilities
 	// ------------------------------------------------------------------------


[4/4] flink git commit: [FLINK-4535] rebase and refine

Posted by mx...@apache.org.
[FLINK-4535] rebase and refine


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4076bd74
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4076bd74
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4076bd74

Branch: refs/heads/flip-6
Commit: 4076bd748f325a1b9c5342b1a214ccf4d15660d1
Parents: a31cf00
Author: Maximilian Michels <mx...@apache.org>
Authored: Wed Sep 21 20:20:25 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Wed Sep 21 21:58:33 2016 +0200

----------------------------------------------------------------------
 .../resourcemanager/JobMasterRegistration.java  |  64 ----
 .../resourcemanager/ResourceManager.java        | 322 ++++++++++++-------
 .../resourcemanager/ResourceManagerGateway.java |  36 +--
 .../TaskExecutorRegistration.java               |   2 +-
 .../slotmanager/SlotManager.java                |   1 -
 .../ResourceManagerJobMasterTest.java           | 174 ++++++++++
 .../ResourceManagerTaskExecutorTest.java        | 135 ++++++++
 .../resourcemanager/ResourceManagerTest.java    | 141 --------
 .../slotmanager/SlotProtocolTest.java           |  43 ++-
 9 files changed, 574 insertions(+), 344 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4076bd74/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java
deleted file mode 100644
index 981441f..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.jobmaster.JobMasterGateway;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
-
-import java.util.UUID;
-
-/**
- * This class is responsible for group the JobMasterGateway and the LeaderSessionID of a registered job master
- */
-public class JobMasterRegistration implements LeaderRetrievalListener {
-
-	private final JobMasterGateway gateway;
-	private final JobID jobID;
-	private final UUID leaderSessionID;
-	private LeaderRetrievalListener retriever;
-
-	public JobMasterRegistration(JobMasterGateway gateway, JobID jobID, UUID leaderSessionID) {
-		this.gateway = gateway;
-		this.jobID = jobID;
-		this.leaderSessionID = leaderSessionID;
-	}
-
-	public JobMasterGateway getGateway() {
-		return gateway;
-	}
-
-	public UUID getLeaderSessionID() {
-		return leaderSessionID;
-	}
-
-	public JobID getJobID() {
-		return jobID;
-	}
-
-	@Override
-	public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) {
-		
-	}
-
-	@Override
-	public void handleError(Exception exception) {
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4076bd74/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 15692b6..88b8a11 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -18,29 +18,41 @@
 
 package org.apache.flink.runtime.resourcemanager;
 
-import akka.dispatch.Futures;
-import akka.dispatch.Mapper;
-
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.BiFunction;
+import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
 import org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.jobmaster.JobMaster;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
-import org.apache.flink.runtime.rpc.exceptions.LeaderSessionIDException;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
-import org.apache.flink.runtime.registration.RegistrationResponse;
-import scala.concurrent.Future;
+import org.apache.flink.runtime.util.LeaderConnectionInfo;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
 
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -50,25 +62,38 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  *
  * It offers the following methods as part of its rpc interface to interact with the him remotely:
  * <ul>
- *     <li>{@link #registerJobMaster(JobMasterRegistration)} registers a {@link JobMaster} at the resource manager</li>
+ *     <li>{@link #registerJobMaster(UUID, UUID, String, JobID)} registers a {@link JobMaster} at the resource manager</li>
  *     <li>{@link #requestSlot(SlotRequest)} requests a slot from the resource manager</li>
  * </ul>
  */
-public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
-	private final Map<JobMasterGateway, InstanceID> jobMasterGateways;
+public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> implements LeaderContender {
+
+	private final Logger LOG = LoggerFactory.getLogger(getClass());
 
-	/** ResourceID and TaskExecutorRegistration mapping relationship of registered taskExecutors */
-	private final Map<ResourceID, TaskExecutorRegistration>  startedTaskExecutorGateways;
+	private final Map<JobID, JobMasterGateway> jobMasterGateways;
+
+	private final Set<LeaderRetrievalListener> jobMasterLeaderRetrievalListeners;
+
+	private final Map<ResourceID, TaskExecutorRegistration> taskExecutorGateways;
 
 	private final HighAvailabilityServices highAvailabilityServices;
-	private LeaderElectionService leaderElectionService = null;
-	private UUID leaderSessionID = null;
 
-	public ResourceManager(RpcService rpcService, HighAvailabilityServices highAvailabilityServices) {
+	private LeaderElectionService leaderElectionService;
+
+	private final SlotManager slotManager;
+
+	private UUID leaderSessionID;
+
+	public ResourceManager(
+			RpcService rpcService,
+			HighAvailabilityServices highAvailabilityServices,
+			SlotManager slotManager) {
 		super(rpcService);
 		this.highAvailabilityServices = checkNotNull(highAvailabilityServices);
-		this.jobMasterGateways = new HashMap<>(16);
-		this.startedTaskExecutorGateways = new HashMap<>(16);
+		this.jobMasterGateways = new HashMap<>();
+		this.slotManager = slotManager;
+		this.jobMasterLeaderRetrievalListeners = new HashSet<>();
+		this.taskExecutorGateways = new HashMap<>();
 	}
 
 	@Override
@@ -77,7 +102,7 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
 		try {
 			super.start();
 			leaderElectionService = highAvailabilityServices.getResourceManagerLeaderElectionService();
-			leaderElectionService.start(new ResourceManagerLeaderContender());
+			leaderElectionService.start(this);
 		} catch (Throwable e) {
 			log.error("A fatal error happened when starting the ResourceManager", e);
 			throw new RuntimeException("A fatal error happened when starting the ResourceManager", e);
@@ -88,8 +113,11 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
 	public void shutDown() {
 		try {
 			leaderElectionService.stop();
+			for(JobID jobID : jobMasterGateways.keySet()) {
+				highAvailabilityServices.getJobMasterLeaderRetriever(jobID).stop();
+			}
 			super.shutDown();
-		} catch(Throwable e) {
+		} catch (Throwable e) {
 			log.error("A fatal error happened when shutdown the ResourceManager", e);
 			throw new RuntimeException("A fatal error happened when shutdown the ResourceManager", e);
 		}
@@ -102,48 +130,79 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
 	 */
 	@VisibleForTesting
 	UUID getLeaderSessionID() {
-		return leaderSessionID;
+		return this.leaderSessionID;
 	}
 
 	/**
 	 * Register a {@link JobMaster} at the resource manager.
 	 *
-	 * @param jobMasterRegistration Job master registration information
+	 * @param resourceManagerLeaderId The fencing token for the ResourceManager leader
+	 * @param jobMasterAddress        The address of the JobMaster that registers
+	 * @param jobID                   The Job ID of the JobMaster that registers
 	 * @return Future registration response
 	 */
 	@RpcMethod
-	public Future<RegistrationResponse> registerJobMaster(JobMasterRegistration jobMasterRegistration) {
-		Future<JobMasterGateway> jobMasterFuture = getRpcService().connect(jobMasterRegistration.getAddress(), JobMasterGateway.class);
+	public Future<RegistrationResponse> registerJobMaster(
+		final UUID resourceManagerLeaderId, final UUID jobMasterLeaderId,
+		final String jobMasterAddress, final JobID jobID) {
 
-		return jobMasterFuture.map(new Mapper<JobMasterGateway, RegistrationResponse>() {
-			@Override
-			public RegistrationResponse apply(final JobMasterGateway jobMasterGateway) {
-				InstanceID instanceID;
+		checkNotNull(jobMasterAddress);
+		checkNotNull(jobID);
 
-				if (jobMasterGateways.containsKey(jobMasterGateway)) {
-					instanceID = jobMasterGateways.get(jobMasterGateway);
-				} else {
-					instanceID = new InstanceID();
-					jobMasterGateways.put(jobMasterGateway, instanceID);
-				}
+		return getRpcService()
+			.execute(new Callable<JobMasterGateway>() {
+				@Override
+				public JobMasterGateway call() throws Exception {
 
-				return new TaskExecutorRegistrationSuccess(instanceID, 5000);
-			}
-		}, getMainThreadExecutionContext());
-	}
+					if (!leaderSessionID.equals(resourceManagerLeaderId)) {
+						log.warn("Discard registration from JobMaster {} at ({}) because the expected leader session ID {}" +
+								" did not equal the received leader session ID  {}",
+							jobID, jobMasterAddress, leaderSessionID, resourceManagerLeaderId);
+						throw new Exception("Invalid leader session id");
+					}
 
-	/**
-	 * Requests a slot from the resource manager.
-	 *
-	 * @param slotRequest Slot request
-	 * @return Slot assignment
-	 */
-	@RpcMethod
-	public SlotAssignment requestSlot(SlotRequest slotRequest) {
-		System.out.println("SlotRequest: " + slotRequest);
-		return new SlotAssignment();
-	}
+					final LeaderConnectionInfo jobMasterLeaderInfo;
+					try {
+						jobMasterLeaderInfo = LeaderRetrievalUtils.retrieveLeaderConnectionInfo(
+							highAvailabilityServices.getJobMasterLeaderRetriever(jobID), new FiniteDuration(5, TimeUnit.SECONDS));
+					} catch (Exception e) {
+						LOG.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID);
+						throw new Exception("Failed to retrieve JobMasterLeaderRetriever");
+					}
+
+					if (!jobMasterLeaderId.equals(jobMasterLeaderInfo.getLeaderSessionID())) {
+						LOG.info("Declining registration request from non-leading JobManager {}", jobMasterAddress);
+						throw new Exception("JobManager is not leading");
+					}
 
+					return getRpcService().connect(jobMasterAddress, JobMasterGateway.class).get(5, TimeUnit.SECONDS);
+				}
+			})
+			.handleAsync(new BiFunction<JobMasterGateway, Throwable, RegistrationResponse>() {
+				@Override
+				public RegistrationResponse apply(JobMasterGateway jobMasterGateway, Throwable throwable) {
+					
+					if (throwable != null) {
+						return new RegistrationResponse.Decline(throwable.getMessage());
+					} else {
+						JobMasterLeaderListener jobMasterLeaderListener = new JobMasterLeaderListener(jobID);
+						try {
+							LeaderRetrievalService jobMasterLeaderRetriever = highAvailabilityServices.getJobMasterLeaderRetriever(jobID);
+							jobMasterLeaderRetriever.start(jobMasterLeaderListener);
+						} catch (Exception e) {
+							LOG.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID);
+							return new RegistrationResponse.Decline("Failed to retrieve JobMasterLeaderRetriever");
+						}
+						jobMasterLeaderRetrievalListeners.add(jobMasterLeaderListener);
+						final JobMasterGateway existingGateway = jobMasterGateways.put(jobID, jobMasterGateway);
+						if (existingGateway != null) {
+							log.info("Replacing gateway for registered JobID {}.", jobID);
+						}
+						return new JobMasterRegistrationSuccess(5000);
+					}
+				}
+			}, getMainThreadExecutor());
+	}
 
 	/**
 	 * Register a {@link org.apache.flink.runtime.taskexecutor.TaskExecutor} at the resource manager
@@ -160,90 +219,129 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
 		final String taskExecutorAddress,
 		final ResourceID resourceID) {
 
-		if(!leaderSessionID.equals(resourceManagerLeaderId)) {
-			log.warn("Discard registration from TaskExecutor {} at ({}) because the expected leader session ID {} did not equal the received leader session ID  {}",
-				resourceID, taskExecutorAddress, leaderSessionID, resourceManagerLeaderId);
-			return Futures.failed(new LeaderSessionIDException(leaderSessionID, resourceManagerLeaderId));
+		return getRpcService().execute(new Callable<TaskExecutorGateway>() {
+			@Override
+			public TaskExecutorGateway call() throws Exception {
+				if (!leaderSessionID.equals(resourceManagerLeaderId)) {
+					log.warn("Discard registration from TaskExecutor {} at ({}) because the expected leader session ID {} did " +
+							"not equal the received leader session ID  {}",
+						resourceID, taskExecutorAddress, leaderSessionID, resourceManagerLeaderId);
+					throw new Exception("Invalid leader session id");
+				}
+
+				return getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class).get(5, TimeUnit.SECONDS);
+			}
+		}).handleAsync(new BiFunction<TaskExecutorGateway, Throwable, RegistrationResponse>() {
+			@Override
+			public RegistrationResponse apply(TaskExecutorGateway taskExecutorGateway, Throwable throwable) {
+				if (throwable != null) {
+					return new RegistrationResponse.Decline(throwable.getMessage());
+				} else {
+					InstanceID id = new InstanceID();
+					TaskExecutorRegistration oldTaskExecutor =
+						taskExecutorGateways.put(resourceID, new TaskExecutorRegistration(taskExecutorGateway, id));
+					if (oldTaskExecutor != null) {
+						log.warn("Receive a duplicate registration from TaskExecutor {} at ({})", resourceID, taskExecutorAddress);
+					}
+					return new TaskExecutorRegistrationSuccess(id, 5000);
+				}
+			}
+		}, getMainThreadExecutor());
+	}
+
+	/**
+	 * Requests a slot from the resource manager.
+	 *
+	 * @param slotRequest Slot request
+	 * @return Slot assignment
+	 */
+	@RpcMethod
+	public SlotRequestReply requestSlot(SlotRequest slotRequest) {
+		final JobID jobId = slotRequest.getJobId();
+		final JobMasterGateway jobMasterGateway = jobMasterGateways.get(jobId);
+
+		if (jobMasterGateway != null) {
+			return slotManager.requestSlot(slotRequest);
+		} else {
+			LOG.info("Ignoring slot request for unknown JobMaster with JobID {}", jobId);
+			return new SlotRequestRejected(slotRequest.getAllocationId());
 		}
+	}
 
-		Future<TaskExecutorGateway> taskExecutorGatewayFuture = getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class);
 
-		return taskExecutorGatewayFuture.map(new Mapper<TaskExecutorGateway, RegistrationResponse>() {
 
+
+	// ------------------------------------------------------------------------
+	//  Leader Contender
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Callback method when current resourceManager is granted leadership
+	 *
+	 * @param leaderSessionID unique leadershipID
+	 */
+	@Override
+	public void grantLeadership(final UUID leaderSessionID) {
+		runAsync(new Runnable() {
 			@Override
-			public RegistrationResponse apply(final TaskExecutorGateway taskExecutorGateway) {
-				InstanceID instanceID = null;
-				TaskExecutorRegistration taskExecutorRegistration = startedTaskExecutorGateways.get(resourceID);
-				if(taskExecutorRegistration != null) {
-					log.warn("Receive a duplicate registration from TaskExecutor {} at ({})", resourceID, taskExecutorAddress);
-					instanceID = taskExecutorRegistration.getInstanceID();
-				} else {
-					instanceID = new InstanceID();
-					startedTaskExecutorGateways.put(resourceID, new TaskExecutorRegistration(taskExecutorGateway, instanceID));
-				}
+			public void run() {
+				log.info("ResourceManager {} was granted leadership with leader session ID {}", getAddress(), leaderSessionID);
+				// confirming the leader session ID might be blocking,
+				leaderElectionService.confirmLeaderSessionID(leaderSessionID);
+				// notify SlotManager
+				slotManager.setLeaderUUID(leaderSessionID);
+				ResourceManager.this.leaderSessionID = leaderSessionID;
+			}
+		});
+	}
 
-				return new TaskExecutorRegistrationSuccess(instanceID, 5000);
+	/**
+	 * Callback method when current resourceManager lose leadership.
+	 */
+	@Override
+	public void revokeLeadership() {
+		runAsync(new Runnable() {
+			@Override
+			public void run() {
+				log.info("ResourceManager {} was revoked leadership.", getAddress());
+				jobMasterGateways.clear();
+				taskExecutorGateways.clear();
+				slotManager.clearState();
+				leaderSessionID = null;
 			}
-		}, getMainThreadExecutionContext());
+		});
 	}
 
+	/**
+	 * Handles error occurring in the leader election service
+	 *
+	 * @param exception Exception being thrown in the leader election service
+	 */
+	@Override
+	public void handleError(final Exception exception) {
+		log.error("ResourceManager received an error from the LeaderElectionService.", exception);
+		// terminate ResourceManager in case of an error
+		shutDown();
+	}
 
-	private class ResourceManagerLeaderContender implements LeaderContender {
+	private static class JobMasterLeaderListener implements LeaderRetrievalListener {
 
-		/**
-		 * Callback method when current resourceManager is granted leadership
-		 *
-		 * @param leaderSessionID unique leadershipID
-		 */
-		@Override
-		public void grantLeadership(final UUID leaderSessionID) {
-			runAsync(new Runnable() {
-				@Override
-				public void run() {
-					log.info("ResourceManager {} was granted leadership with leader session ID {}", getAddress(), leaderSessionID);
-					ResourceManager.this.leaderSessionID = leaderSessionID;
-					// confirming the leader session ID might be blocking,
-					leaderElectionService.confirmLeaderSessionID(leaderSessionID);
-				}
-			});
-		}
+		private final JobID jobID;
+		private UUID leaderID;
 
-		/**
-		 * Callback method when current resourceManager lose leadership.
-		 */
-		@Override
-		public void revokeLeadership() {
-			runAsync(new Runnable() {
-				@Override
-				public void run() {
-					log.info("ResourceManager {} was revoked leadership.", getAddress());
-					jobMasterGateways.clear();
-					startedTaskExecutorGateways.clear();
-					leaderSessionID = null;
-				}
-			});
+		private JobMasterLeaderListener(JobID jobID) {
+			this.jobID = jobID;
 		}
 
 		@Override
-		public String getAddress() {
-			return ResourceManager.this.getAddress();
+		public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
+			this.leaderID = leaderSessionID;
 		}
 
-		/**
-		 * Handles error occurring in the leader election service
-		 *
-		 * @param exception Exception being thrown in the leader election service
-		 */
 		@Override
 		public void handleError(final Exception exception) {
-			runAsync(new Runnable() {
-				@Override
-				public void run() {
-					log.error("ResourceManager received an error from the LeaderElectionService.", exception);
-					// terminate ResourceManager in case of an error
-					shutDown();
-				}
-			});
+			// TODO
 		}
 	}
 }
+

http://git-wip-us.apache.org/repos/asf/flink/blob/4076bd74/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index 30a096f..d8b8ebe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -18,15 +18,16 @@
 
 package org.apache.flink.runtime.resourcemanager;
 
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.jobmaster.JobMaster;
-
 import org.apache.flink.runtime.registration.RegistrationResponse;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
 
+import org.apache.flink.runtime.registration.RegistrationResponse;
 import java.util.UUID;
 
 /**
@@ -37,21 +38,18 @@ public interface ResourceManagerGateway extends RpcGateway {
 	/**
 	 * Register a {@link JobMaster} at the resource manager.
 	 *
-	 * @param jobMasterRegistration Job master registration information
-	 * @param timeout Timeout for the future to complete
+	 * @param resourceManagerLeaderId The fencing token for the ResourceManager leader
+	 * @param jobMasterAddress        The address of the JobMaster that registers
+	 * @param jobID                   The Job ID of the JobMaster that registers
+	 * @param timeout                 Timeout for the future to complete
 	 * @return Future registration response
 	 */
 	Future<RegistrationResponse> registerJobMaster(
-		JobMasterRegistration jobMasterRegistration,
-		@RpcTimeout FiniteDuration timeout);
+		UUID resourceManagerLeaderId,
+		String jobMasterAddress,
+		JobID jobID,
+		@RpcTimeout Time timeout);
 
-	/**
-	 * Register a {@link JobMaster} at the resource manager.
-	 *
-	 * @param jobMasterRegistration Job master registration information
-	 * @return Future registration response
-	 */
-	Future<RegistrationResponse> registerJobMaster(JobMasterRegistration jobMasterRegistration);
 
 	/**
 	 * Requests a slot from the resource manager.
@@ -59,15 +57,15 @@ public interface ResourceManagerGateway extends RpcGateway {
 	 * @param slotRequest Slot request
 	 * @return Future slot assignment
 	 */
-	Future<SlotAssignment> requestSlot(SlotRequest slotRequest);
+	Future<SlotRequestReply> requestSlot(SlotRequest slotRequest);
 
 	/**
 	 * Register a {@link org.apache.flink.runtime.taskexecutor.TaskExecutor} at the resource manager.
 	 *
 	 * @param resourceManagerLeaderId  The fencing token for the ResourceManager leader
-	 * @param taskExecutorAddress      The address of the TaskExecutor that registers
-	 * @param resourceID               The resource ID of the TaskExecutor that registers
-	 * @param timeout                  The timeout for the response.
+	 * @param taskExecutorAddress     The address of the TaskExecutor that registers
+	 * @param resourceID              The resource ID of the TaskExecutor that registers
+	 * @param timeout                 The timeout for the response.
 	 *
 	 * @return The future to the response by the ResourceManager.
 	 */
@@ -75,5 +73,5 @@ public interface ResourceManagerGateway extends RpcGateway {
 		UUID resourceManagerLeaderId,
 		String taskExecutorAddress,
 		ResourceID resourceID,
-		@RpcTimeout FiniteDuration timeout);
+		@RpcTimeout Time timeout);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4076bd74/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java
index bd78a47..f8dfdc7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java
@@ -35,7 +35,7 @@ public class TaskExecutorRegistration implements Serializable {
 	private InstanceID instanceID;
 
 	public TaskExecutorRegistration(TaskExecutorGateway taskExecutorGateway,
-		InstanceID instanceID) {
+									InstanceID instanceID) {
 		this.taskExecutorGateway = taskExecutorGateway;
 		this.instanceID = instanceID;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/4076bd74/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index 5d0013c..a6d2196 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -26,7 +26,6 @@ import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
 import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered;
 import org.apache.flink.runtime.resourcemanager.SlotRequestReply;

http://git-wip-us.apache.org/repos/asf/flink/blob/4076bd74/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
new file mode 100644
index 0000000..332c093
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SimpleSlotManager;
+import org.apache.flink.runtime.rpc.TestingSerialRpcService;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.rpc.exceptions.LeaderSessionIDException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.*;
+
+public class ResourceManagerJobMasterTest {
+
+	private TestingSerialRpcService rpcService;
+
+	@Before
+	public void setup() throws Exception {
+		rpcService = new TestingSerialRpcService();
+	}
+
+	@After
+	public void teardown() throws Exception {
+		rpcService.stopService();
+	}
+
+	/**
+	 * Test receive normal registration from job master and receive duplicate registration from job master
+	 */
+	@Test
+	public void testRegisterJobMaster() throws Exception {
+		String jobMasterAddress = "/jobMasterAddress1";
+		JobID jobID = mockJobMaster(jobMasterAddress);
+		TestingLeaderElectionService resourceManagerLeaderElectionService = new TestingLeaderElectionService();
+		UUID jmLeaderID = UUID.randomUUID();
+		TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService(jobMasterAddress, jmLeaderID);
+		final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService);
+		final UUID rmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
+
+		// test response successful
+		Future<RegistrationResponse> successfulFuture = resourceManager.registerJobMaster(rmLeaderSessionId, jmLeaderID, jobMasterAddress, jobID);
+		RegistrationResponse response = successfulFuture.get(5, TimeUnit.SECONDS);
+		assertTrue(response instanceof JobMasterRegistrationSuccess);
+	}
+
+	/**
+	 * Test receive registration with unmatched leadershipId from job master
+	 */
+	@Test
+	public void testRegisterJobMasterWithUnmatchedLeaderSessionId1() throws Exception {
+		String jobMasterAddress = "/jobMasterAddress1";
+		JobID jobID = mockJobMaster(jobMasterAddress);
+		TestingLeaderElectionService resourceManagerLeaderElectionService = new TestingLeaderElectionService();
+		UUID jmLeaderID = UUID.randomUUID();
+		TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService(jobMasterAddress, jmLeaderID);
+		final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService);
+		final UUID rmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
+
+		// test throw exception when receive a registration from job master which takes unmatched leaderSessionId
+		UUID differentLeaderSessionID = UUID.randomUUID();
+		Future<RegistrationResponse> unMatchedLeaderFuture = resourceManager.registerJobMaster(differentLeaderSessionID, jmLeaderID, jobMasterAddress, jobID);
+		assertTrue(unMatchedLeaderFuture.get(5, TimeUnit.SECONDS) instanceof RegistrationResponse.Decline);
+	}
+
+	/**
+	 * Test receive registration with unmatched leadershipId from job master
+	 */
+	@Test
+	public void testRegisterJobMasterWithUnmatchedLeaderSessionId2() throws Exception {
+		String jobMasterAddress = "/jobMasterAddress1";
+		JobID jobID = mockJobMaster(jobMasterAddress);
+		TestingLeaderElectionService resourceManagerLeaderElectionService = new TestingLeaderElectionService();
+		TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService();
+		final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService);
+		final UUID rmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
+		final UUID jmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
+
+		// test throw exception when receive a registration from job master which takes unmatched leaderSessionId
+		UUID differentLeaderSessionID = UUID.randomUUID();
+		Future<RegistrationResponse> unMatchedLeaderFuture = resourceManager.registerJobMaster(rmLeaderSessionId, differentLeaderSessionID, jobMasterAddress, jobID);
+		assertTrue(unMatchedLeaderFuture.get(5, TimeUnit.SECONDS) instanceof RegistrationResponse.Decline);
+	}
+
+	/**
+	 * Test receive registration with invalid address from job master
+	 */
+	@Test
+	public void testRegisterJobMasterFromInvalidAddress() throws Exception {
+		String jobMasterAddress = "/jobMasterAddress1";
+		JobID jobID = mockJobMaster(jobMasterAddress);
+		TestingLeaderElectionService resourceManagerLeaderElectionService = new TestingLeaderElectionService();
+		TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService();
+		final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService);
+		final UUID rmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
+		final UUID jmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
+
+		// test throw exception when receive a registration from job master which takes invalid address
+		String invalidAddress = "/jobMasterAddress2";
+		Future<RegistrationResponse> invalidAddressFuture = resourceManager.registerJobMaster(rmLeaderSessionId, jmLeaderSessionId, invalidAddress, jobID);
+		assertTrue(invalidAddressFuture.get(5, TimeUnit.SECONDS) instanceof RegistrationResponse.Decline);
+	}
+
+	/**
+	 * Check and verify return RegistrationResponse.Decline when failed to start a job master Leader retrieval listener
+	 */
+	@Test
+	public void testRegisterJobMasterWithFailureLeaderListener() throws Exception {
+		String jobMasterAddress = "/jobMasterAddress1";
+		JobID jobID = mockJobMaster(jobMasterAddress);
+		TestingLeaderElectionService resourceManagerLeaderElectionService = new TestingLeaderElectionService();
+		TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService();
+		final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService);
+		final UUID rmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
+		final UUID jmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
+
+		JobID unknownJobIDToHAServices = new JobID();
+		// verify return RegistrationResponse.Decline when failed to start a job master Leader retrieval listener
+		Future<RegistrationResponse> declineFuture = resourceManager.registerJobMaster(rmLeaderSessionId, jmLeaderSessionId, jobMasterAddress, unknownJobIDToHAServices);
+		RegistrationResponse response = declineFuture.get(5, TimeUnit.SECONDS);
+		assertTrue(response instanceof RegistrationResponse.Decline);
+	}
+
+	private JobID mockJobMaster(String jobMasterAddress) {
+		JobID jobID = new JobID();
+		JobMasterGateway jobMasterGateway = mock(JobMasterGateway.class);
+		rpcService.registerGateway(jobMasterAddress, jobMasterGateway);
+		return jobID;
+	}
+
+	private ResourceManager createAndStartResourceManager(TestingLeaderElectionService resourceManagerLeaderElectionService, JobID jobID, TestingLeaderRetrievalService jobMasterLeaderRetrievalService) {
+		TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
+		highAvailabilityServices.setResourceManagerLeaderElectionService(resourceManagerLeaderElectionService);
+		highAvailabilityServices.setJobMasterLeaderRetriever(jobID, jobMasterLeaderRetrievalService);
+		ResourceManager resourceManager = new ResourceManager(rpcService, highAvailabilityServices, new SimpleSlotManager());
+		resourceManager.start();
+		return resourceManager;
+	}
+
+	private UUID grantResourceManagerLeadership(TestingLeaderElectionService resourceManagerLeaderElectionService) {
+		UUID leaderSessionId = UUID.randomUUID();
+		resourceManagerLeaderElectionService.isLeader(leaderSessionId);
+		return leaderSessionId;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4076bd74/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
new file mode 100644
index 0000000..ed7c7d7
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SimpleSlotManager;
+import org.apache.flink.runtime.rpc.TestingSerialRpcService;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.rpc.exceptions.LeaderSessionIDException;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+public class ResourceManagerTaskExecutorTest {
+
+	private TestingSerialRpcService rpcService;
+
+	@Before
+	public void setup() throws Exception {
+		rpcService = new TestingSerialRpcService();
+	}
+
+	@After
+	public void teardown() throws Exception {
+		rpcService.stopService();
+	}
+
+	/**
+	 * Test receive normal registration from task executor and receive duplicate registration from task executor
+	 */
+	@Test
+	public void testRegisterTaskExecutor() throws Exception {
+		String taskExecutorAddress = "/taskExecutor1";
+		ResourceID taskExecutorResourceID = mockTaskExecutor(taskExecutorAddress);
+		TestingLeaderElectionService rmLeaderElectionService = new TestingLeaderElectionService();
+		final ResourceManager resourceManager = createAndStartResourceManager(rmLeaderElectionService);
+		final UUID leaderSessionId = grantLeadership(rmLeaderElectionService);
+
+		// test response successful
+		Future<RegistrationResponse> successfulFuture = resourceManager.registerTaskExecutor(leaderSessionId, taskExecutorAddress, taskExecutorResourceID);
+		RegistrationResponse response = successfulFuture.get(5, TimeUnit.SECONDS);
+		assertTrue(response instanceof TaskExecutorRegistrationSuccess);
+
+		// test response successful with instanceID not equal to previous when receive duplicate registration from taskExecutor
+		Future<RegistrationResponse> duplicateFuture = resourceManager.registerTaskExecutor(leaderSessionId, taskExecutorAddress, taskExecutorResourceID);
+		RegistrationResponse duplicateResponse = duplicateFuture.get();
+		assertTrue(duplicateResponse instanceof TaskExecutorRegistrationSuccess);
+		assertNotEquals(((TaskExecutorRegistrationSuccess) response).getRegistrationId(), ((TaskExecutorRegistrationSuccess) duplicateResponse).getRegistrationId());
+	}
+
+	/**
+	 * Test receive registration with unmatched leadershipId from task executor
+	 */
+	@Test
+	public void testRegisterTaskExecutorWithUnmatchedLeaderSessionId() throws Exception {
+		String taskExecutorAddress = "/taskExecutor1";
+		ResourceID taskExecutorResourceID = mockTaskExecutor(taskExecutorAddress);
+		TestingLeaderElectionService rmLeaderElectionService = new TestingLeaderElectionService();
+		final ResourceManager resourceManager = createAndStartResourceManager(rmLeaderElectionService);
+		final UUID leaderSessionId = grantLeadership(rmLeaderElectionService);
+
+		// test throw exception when receive a registration from taskExecutor which takes unmatched leaderSessionId
+		UUID differentLeaderSessionID = UUID.randomUUID();
+		Future<RegistrationResponse> unMatchedLeaderFuture = resourceManager.registerTaskExecutor(differentLeaderSessionID, taskExecutorAddress, taskExecutorResourceID);
+		assertTrue(unMatchedLeaderFuture.get(5, TimeUnit.SECONDS) instanceof RegistrationResponse.Decline);
+	}
+
+	/**
+	 * Test receive registration with invalid address from task executor
+	 */
+	@Test
+	public void testRegisterTaskExecutorFromInvalidAddress() throws Exception {
+		String taskExecutorAddress = "/taskExecutor1";
+		ResourceID taskExecutorResourceID = mockTaskExecutor(taskExecutorAddress);
+		TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService();
+		final ResourceManager resourceManager = createAndStartResourceManager(leaderElectionService);
+		final UUID leaderSessionId = grantLeadership(leaderElectionService);
+
+		// test throw exception when receive a registration from taskExecutor which takes invalid address
+		String invalidAddress = "/taskExecutor2";
+		Future<RegistrationResponse> invalidAddressFuture = resourceManager.registerTaskExecutor(leaderSessionId, invalidAddress, taskExecutorResourceID);
+		assertTrue(invalidAddressFuture.get(5, TimeUnit.SECONDS) instanceof RegistrationResponse.Decline);
+	}
+
+	private ResourceID mockTaskExecutor(String taskExecutorAddress) {
+		TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
+		ResourceID taskExecutorResourceID = ResourceID.generate();
+		rpcService.registerGateway(taskExecutorAddress, taskExecutorGateway);
+		return taskExecutorResourceID;
+	}
+
+	private ResourceManager createAndStartResourceManager(TestingLeaderElectionService rmLeaderElectionService) {
+		TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
+		highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService);
+		ResourceManager resourceManager = new ResourceManager(rpcService, highAvailabilityServices, new SimpleSlotManager());
+		resourceManager.start();
+		return resourceManager;
+	}
+
+	private UUID grantLeadership(TestingLeaderElectionService leaderElectionService) {
+		UUID leaderSessionId = UUID.randomUUID();
+		leaderElectionService.isLeader(leaderSessionId);
+		return leaderSessionId;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4076bd74/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
deleted file mode 100644
index b75d9b8..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager;
-
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
-import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
-import org.apache.flink.runtime.rpc.TestingSerialRpcService;
-import org.apache.flink.runtime.registration.RegistrationResponse;
-import org.apache.flink.runtime.rpc.exceptions.LeaderSessionIDException;
-import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
-import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-
-public class ResourceManagerTest {
-
-	private TestingSerialRpcService rpcService;
-
-	@Before
-	public void setup() throws Exception {
-		rpcService = new TestingSerialRpcService();
-	}
-
-	@After
-	public void teardown() throws Exception {
-		rpcService.stopService();
-	}
-
-	/**
-	 * Test receive normal registration from task executor and receive duplicate registration from task executor
-	 *
-	 * @throws Exception
-	 */
-	@Test
-	public void testRegisterTaskExecutor() throws Exception {
-		String taskExecutorAddress = "/taskExecutor1";
-		ResourceID taskExecutorResourceID = mockTaskExecutor(taskExecutorAddress);
-		TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService();
-		final ResourceManager resourceManager = createAndStartResourceManager(leaderElectionService);
-		final UUID leaderSessionId = grantResourceManagerLeadership(leaderElectionService);
-
-		// test response successful
-		Future<RegistrationResponse> successfulFuture = resourceManager.registerTaskExecutor(leaderSessionId, taskExecutorAddress, taskExecutorResourceID);
-		RegistrationResponse response = Await.result(successfulFuture, new FiniteDuration(0, TimeUnit.SECONDS));
-		assertTrue(response instanceof TaskExecutorRegistrationSuccess);
-
-		// test response successful with previous instanceID when receive duplicate registration from taskExecutor
-		Future<RegistrationResponse> duplicateFuture = resourceManager.registerTaskExecutor(leaderSessionId, taskExecutorAddress, taskExecutorResourceID);
-		RegistrationResponse duplicateResponse = Await.result(duplicateFuture, new FiniteDuration(0, TimeUnit.SECONDS));
-		assertTrue(duplicateResponse instanceof TaskExecutorRegistrationSuccess);
-		assertEquals(((TaskExecutorRegistrationSuccess) response).getRegistrationId(), ((TaskExecutorRegistrationSuccess) duplicateResponse).getRegistrationId());
-	}
-
-	/**
-	 * Test receive registration with unmatched leadershipId from task executor
-	 *
-	 * @throws Exception
-	 */
-	@Test(expected = LeaderSessionIDException.class)
-	public void testRegisterTaskExecutorWithUnmatchedLeaderSessionId() throws Exception {
-		String taskExecutorAddress = "/taskExecutor1";
-		ResourceID taskExecutorResourceID = mockTaskExecutor(taskExecutorAddress);
-		TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService();
-		final ResourceManager resourceManager = createAndStartResourceManager(leaderElectionService);
-		final UUID leaderSessionId = grantResourceManagerLeadership(leaderElectionService);
-
-		// test throw exception when receive a registration from taskExecutor which takes unmatched leaderSessionId
-		UUID differentLeaderSessionID = UUID.randomUUID();
-		Future<RegistrationResponse> unMatchedLeaderFuture = resourceManager.registerTaskExecutor(differentLeaderSessionID, taskExecutorAddress, taskExecutorResourceID);
-		Await.result(unMatchedLeaderFuture, new FiniteDuration(200, TimeUnit.MILLISECONDS));
-	}
-
-	/**
-	 * Test receive registration with invalid address from task executor
-	 *
-	 * @throws Exception
-	 */
-	@Test(expected = Exception.class)
-	public void testRegisterTaskExecutorFromInvalidAddress() throws Exception {
-		String taskExecutorAddress = "/taskExecutor1";
-		ResourceID taskExecutorResourceID = mockTaskExecutor(taskExecutorAddress);
-		TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService();
-		final ResourceManager resourceManager = createAndStartResourceManager(leaderElectionService);
-		final UUID leaderSessionId = grantResourceManagerLeadership(leaderElectionService);
-
-		// test throw exception when receive a registration from taskExecutor which takes invalid address
-		String invalidAddress = "/taskExecutor2";
-		Future<RegistrationResponse> invalidAddressFuture = resourceManager.registerTaskExecutor(leaderSessionId, invalidAddress, taskExecutorResourceID);
-		Await.result(invalidAddressFuture, new FiniteDuration(200, TimeUnit.MILLISECONDS));
-	}
-
-	private ResourceID mockTaskExecutor(String taskExecutorAddress) {
-		TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
-		ResourceID taskExecutorResourceID = ResourceID.generate();
-		rpcService.registerGateway(taskExecutorAddress, taskExecutorGateway);
-		return taskExecutorResourceID;
-	}
-
-	private ResourceManager createAndStartResourceManager(TestingLeaderElectionService leaderElectionService) {
-		TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
-		highAvailabilityServices.setResourceManagerLeaderElectionService(leaderElectionService);
-		ResourceManager resourceManager = new ResourceManager(rpcService, highAvailabilityServices);
-		resourceManager.start();
-		return resourceManager;
-	}
-
-	private UUID grantResourceManagerLeadership(TestingLeaderElectionService leaderElectionService) {
-		UUID leaderSessionId = UUID.randomUUID();
-		leaderElectionService.isLeader(leaderSessionId);
-		return leaderSessionId;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4076bd74/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
index 1f9e7e8..0232fab 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
@@ -24,10 +24,14 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.NonHaServices;
+import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
-import org.apache.flink.runtime.resourcemanager.JobMasterRegistration;
-import org.apache.flink.runtime.resourcemanager.RegistrationResponse;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
 import org.apache.flink.runtime.resourcemanager.SlotRequestReply;
@@ -88,14 +92,20 @@ public class SlotProtocolTest extends TestLogger {
 
 		testRpcService.registerGateway(jmAddress, mock(JobMasterGateway.class));
 
+		final TestingHighAvailabilityServices testingHaServices = new TestingHighAvailabilityServices();
+		final UUID rmLeaderID = UUID.randomUUID();
+		final UUID jmLeaderID = UUID.randomUUID();
+		TestingLeaderElectionService rmLeaderElectionService =
+			configureHA(testingHaServices, jobID, rmAddress, rmLeaderID, jmAddress, jmLeaderID);
 
 		TestingSlotManager slotManager = Mockito.spy(new TestingSlotManager());
 		ResourceManager resourceManager =
-			new ResourceManager(testRpcService, new NonHaServices(rmAddress), slotManager);
+			new ResourceManager(testRpcService, testingHaServices, slotManager);
 		resourceManager.start();
+		rmLeaderElectionService.isLeader(rmLeaderID);
 
 		Future<RegistrationResponse> registrationFuture =
-			resourceManager.registerJobMaster(new JobMasterRegistration(jmAddress, jobID));
+			resourceManager.registerJobMaster(rmLeaderID, jmLeaderID, jmAddress, jobID);
 		try {
 			registrationFuture.get(5, TimeUnit.SECONDS);
 		} catch (Exception e) {
@@ -158,16 +168,23 @@ public class SlotProtocolTest extends TestLogger {
 
 		testRpcService.registerGateway(jmAddress, mock(JobMasterGateway.class));
 
+		final TestingHighAvailabilityServices testingHaServices = new TestingHighAvailabilityServices();
+		final UUID rmLeaderID = UUID.randomUUID();
+		final UUID jmLeaderID = UUID.randomUUID();
+		TestingLeaderElectionService rmLeaderElectionService =
+			configureHA(testingHaServices, jobID, rmAddress, rmLeaderID, jmAddress, jmLeaderID);
+
 		TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
 		testRpcService.registerGateway(tmAddress, taskExecutorGateway);
 
 		TestingSlotManager slotManager = Mockito.spy(new TestingSlotManager());
 		ResourceManager resourceManager =
-			new ResourceManager(testRpcService, new NonHaServices(rmAddress), slotManager);
+			new ResourceManager(testRpcService, testingHaServices, slotManager);
 		resourceManager.start();
+		rmLeaderElectionService.isLeader(rmLeaderID);
 
 		Future<RegistrationResponse> registrationFuture =
-			resourceManager.registerJobMaster(new JobMasterRegistration(jmAddress, jobID));
+			resourceManager.registerJobMaster(rmLeaderID, jmLeaderID, jmAddress, jobID);
 		try {
 			registrationFuture.get(5, TimeUnit.SECONDS);
 		} catch (Exception e) {
@@ -208,6 +225,20 @@ public class SlotProtocolTest extends TestLogger {
 		verify(taskExecutorGateway, timeout(5000)).requestSlot(eq(allocationID), any(UUID.class), any(Time.class));
 	}
 
+	private static TestingLeaderElectionService configureHA(
+			TestingHighAvailabilityServices testingHA, JobID jobID, String rmAddress, UUID rmID, String jmAddress, UUID jmID) {
+		final TestingLeaderElectionService rmLeaderElectionService = new TestingLeaderElectionService();
+		testingHA.setResourceManagerLeaderElectionService(rmLeaderElectionService);
+		final TestingLeaderRetrievalService rmLeaderRetrievalService = new TestingLeaderRetrievalService(rmAddress, rmID);
+		testingHA.setResourceManagerLeaderRetriever(rmLeaderRetrievalService);
+
+		final TestingLeaderElectionService jmLeaderElectionService = new TestingLeaderElectionService();
+		testingHA.setJobMasterLeaderElectionService(jmLeaderElectionService);
+		final TestingLeaderRetrievalService jmLeaderRetrievalService = new TestingLeaderRetrievalService(jmAddress, jmID);
+		testingHA.setJobMasterLeaderRetriever(jobID, jmLeaderRetrievalService);
+
+		return rmLeaderElectionService;
+	}
 
 	private static class TestingSlotManager extends SimpleSlotManager {