You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/10/02 21:58:36 UTC

[31/50] [abbrv] flink git commit: [FLINK-4537] rebase and refine

[FLINK-4537] rebase and refine


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4efa90f4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4efa90f4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4efa90f4

Branch: refs/heads/flip-6
Commit: 4efa90f46ca5e6c9e3a2d545bdb58a388bcf5e1f
Parents: f0501df
Author: Maximilian Michels <mx...@apache.org>
Authored: Wed Sep 21 14:13:12 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Sun Oct 2 23:44:45 2016 +0200

----------------------------------------------------------------------
 .../resourcemanager/JobMasterRegistration.java  |  52 +++---
 .../resourcemanager/ResourceManager.java        | 165 ++++++++-----------
 .../slotmanager/SlotManager.java                |  29 +++-
 3 files changed, 110 insertions(+), 136 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4efa90f4/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java
index 7b8ec70..981441f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java
@@ -18,59 +18,47 @@
 
 package org.apache.flink.runtime.resourcemanager;
 
-<<<<<<< HEAD
 import org.apache.flink.api.common.JobID;
-=======
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
->>>>>>> db98efb... rsourceManager registration with JobManager
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 
-import java.io.Serializable;
 import java.util.UUID;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
 /**
  * This class is responsible for group the JobMasterGateway and the LeaderSessionID of a registered job master
  */
-public class JobMasterRegistration implements Serializable {
+public class JobMasterRegistration implements LeaderRetrievalListener {
 
-<<<<<<< HEAD
-	private final String address;
+	private final JobMasterGateway gateway;
 	private final JobID jobID;
+	private final UUID leaderSessionID;
+	private LeaderRetrievalListener retriever;
 
-	public JobMasterRegistration(String address, JobID jobID) {
-		this.address = address;
+	public JobMasterRegistration(JobMasterGateway gateway, JobID jobID, UUID leaderSessionID) {
+		this.gateway = gateway;
 		this.jobID = jobID;
-=======
-	private static final long serialVersionUID = -2316627821716999527L;
-
-	private final JobMasterGateway jobMasterGateway;
-
-	private UUID jobMasterLeaderSessionID;
-
-	public JobMasterRegistration(JobMasterGateway jobMasterGateway) {
-		this.jobMasterGateway = checkNotNull(jobMasterGateway);
+		this.leaderSessionID = leaderSessionID;
 	}
 
-	public JobMasterRegistration(JobMasterGateway jobMasterGateway, UUID jobMasterLeaderSessionID) {
-		this.jobMasterGateway = checkNotNull(jobMasterGateway);
-		this.jobMasterLeaderSessionID = jobMasterLeaderSessionID;
+	public JobMasterGateway getGateway() {
+		return gateway;
 	}
 
-	public JobMasterGateway getJobMasterGateway() {
-		return jobMasterGateway;
+	public UUID getLeaderSessionID() {
+		return leaderSessionID;
 	}
 
-	public void setJobMasterLeaderSessionID(UUID leaderSessionID) {
-		this.jobMasterLeaderSessionID = jobMasterLeaderSessionID;
->>>>>>> db98efb... rsourceManager registration with JobManager
+	public JobID getJobID() {
+		return jobID;
 	}
 
-	public UUID getJobMasterLeaderSessionID() {
-		return jobMasterLeaderSessionID;
+	@Override
+	public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) {
+		
 	}
 
-	public JobID getJobID() {
-		return jobID;
+	@Override
+	public void handleError(Exception exception) {
+
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4efa90f4/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 8be1455..aae4874 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.resourcemanager;
 
-import akka.dispatch.Futures;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -29,26 +28,31 @@ import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
 import org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
-<<<<<<< HEAD
-import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
-=======
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
->>>>>>> db98efb... rsourceManager registration with JobManager
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.jobmaster.JobMaster;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
-import org.apache.flink.runtime.rpc.exceptions.LeaderSessionIDException;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 
+import org.apache.flink.runtime.concurrent.Future;
+
+import org.apache.flink.runtime.util.LeaderConnectionInfo;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
 
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -62,17 +66,13 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  *     <li>{@link #requestSlot(SlotRequest)} requests a slot from the resource manager</li>
  * </ul>
  */
-<<<<<<< HEAD
 public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> implements LeaderContender {
 
 	private final Logger LOG = LoggerFactory.getLogger(getClass());
 
 	private final Map<JobID, JobMasterGateway> jobMasterGateways;
-=======
-public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
-	/** the mapping relationship of JobID and JobMasterGateway */
-	private final Map<JobID, JobMasterRegistration> jobMasters;
->>>>>>> db98efb... rsourceManager registration with JobManager
+
+	private final Set<LeaderRetrievalListener> jobMasterLeaderRetrievalListeners;
 
 	private final HighAvailabilityServices highAvailabilityServices;
 
@@ -88,12 +88,9 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
 			SlotManager slotManager) {
 		super(rpcService);
 		this.highAvailabilityServices = checkNotNull(highAvailabilityServices);
-<<<<<<< HEAD
 		this.jobMasterGateways = new HashMap<>();
 		this.slotManager = slotManager;
-=======
-		this.jobMasters = new HashMap<>(16);
->>>>>>> db98efb... rsourceManager registration with JobManager
+		this.jobMasterLeaderRetrievalListeners = new HashSet<>();
 	}
 
 	@Override
@@ -113,7 +110,7 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
 	public void shutDown() {
 		try {
 			leaderElectionService.stop();
-			for(JobID jobID : jobMasters.keySet()) {
+			for(JobID jobID : jobMasterGateways.keySet()) {
 				highAvailabilityServices.getJobMasterLeaderRetriever(jobID).stop();
 			}
 			super.shutDown();
@@ -142,52 +139,64 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
 	 * @return Future registration response
 	 */
 	@RpcMethod
-<<<<<<< HEAD
-	public Future<RegistrationResponse> registerJobMaster(JobMasterRegistration jobMasterRegistration) {
-		final Future<JobMasterGateway> jobMasterFuture =
-			getRpcService().connect(jobMasterRegistration.getAddress(), JobMasterGateway.class);
-		final JobID jobID = jobMasterRegistration.getJobID();
-=======
-	public Future<RegistrationResponse> registerJobMaster(UUID resourceManagerLeaderId, final String jobMasterAddress, final JobID jobID) {
+	public Future<RegistrationResponse> registerJobMaster(
+		final UUID resourceManagerLeaderId, final UUID jobMasterLeaderId,
+		final String jobMasterAddress, final JobID jobID) {
+
+		checkNotNull(resourceManagerLeaderId);
+		checkNotNull(jobMasterAddress);
+		checkNotNull(jobID);
+
+		// TODO mxm The leader retrieval needs to be split up in an async part which runs outside the main execution thread
+		// The state updates should be performed inside the main thread
+
+		final FlinkCompletableFuture<RegistrationResponse> future = new FlinkCompletableFuture<>();
 
 		if(!leaderSessionID.equals(resourceManagerLeaderId)) {
-			log.warn("Discard registration from JobMaster {} at ({}) because the expected leader session ID {} did not equal the received leader session ID  {}",
+			log.warn("Discard registration from JobMaster {} at ({}) because the expected leader session ID {}" +
+					" did not equal the received leader session ID  {}",
 				jobID, jobMasterAddress, leaderSessionID, resourceManagerLeaderId);
-			return Futures.failed(new LeaderSessionIDException(leaderSessionID, resourceManagerLeaderId));
+			future.complete(new RegistrationResponse.Decline("Invalid leader session id"));
+			return future;
 		}
 
-		Future<JobMasterGateway> jobMasterFuture = getRpcService().connect(jobMasterAddress, JobMasterGateway.class);
->>>>>>> db98efb... rsourceManager registration with JobManager
+		final LeaderConnectionInfo jobMasterLeaderInfo;
+		try {
+			jobMasterLeaderInfo = LeaderRetrievalUtils.retrieveLeaderConnectionInfo(
+				highAvailabilityServices.getJobMasterLeaderRetriever(jobID), new FiniteDuration(5, TimeUnit.SECONDS));
+		} catch (Exception e) {
+			LOG.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID);
+			future.complete(new RegistrationResponse.Decline("Failed to retrieve JobMasterLeaderRetriever"));
+			return future;
+		}
+
+		if (!jobMasterLeaderId.equals(jobMasterLeaderInfo.getLeaderSessionID())) {
+			LOG.info("Declining registration request from non-leading JobManager {}", jobMasterAddress);
+			future.complete(new RegistrationResponse.Decline("JobManager is not leading"));
+			return future;
+		}
 
-		return jobMasterFuture.thenApplyAsync(new ApplyFunction<JobMasterGateway, RegistrationResponse>() {
+		Future<JobMasterGateway> jobMasterGatewayFuture =
+			getRpcService().connect(jobMasterAddress, JobMasterGateway.class);
+
+		return jobMasterGatewayFuture.thenApplyAsync(new ApplyFunction<JobMasterGateway, RegistrationResponse>() {
 			@Override
 			public RegistrationResponse apply(JobMasterGateway jobMasterGateway) {
-<<<<<<< HEAD
+
+				final JobMasterLeaderListener jobMasterLeaderListener = new JobMasterLeaderListener(jobID);
+				try {
+					LeaderRetrievalService jobMasterLeaderRetriever = highAvailabilityServices.getJobMasterLeaderRetriever(jobID);
+					jobMasterLeaderRetriever.start(jobMasterLeaderListener);
+				} catch (Exception e) {
+					LOG.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID);
+					return new RegistrationResponse.Decline("Failed to retrieve JobMasterLeaderRetriever");
+				}
+				jobMasterLeaderRetrievalListeners.add(jobMasterLeaderListener);
 				final JobMasterGateway existingGateway = jobMasterGateways.put(jobID, jobMasterGateway);
 				if (existingGateway != null) {
-					LOG.info("Replacing existing gateway {} for JobID {} with  {}.",
-						existingGateway, jobID, jobMasterGateway);
-				}
-				return new RegistrationResponse(true);
-=======
-				if (jobMasters.containsKey(jobID)) {
-					JobMasterRegistration jobMasterRegistration = new JobMasterRegistration(jobMasterGateway, jobMasters.get(jobID).getJobMasterLeaderSessionID());
-					jobMasters.put(jobID, jobMasterRegistration);
 					log.info("Replacing gateway for registered JobID {}.", jobID);
-				} else {
-					JobMasterRegistration jobMasterRegistration = new JobMasterRegistration(jobMasterGateway);
-					jobMasters.put(jobID, jobMasterRegistration);
-					try {
-						highAvailabilityServices.getJobMasterLeaderRetriever(jobID).start(new JobMasterLeaderListener(jobID));
-					} catch(Throwable e) {
-						log.warn("Decline registration from JobMaster {} at ({}) because fail to get the leader retriever for the given job JobMaster",
-							jobID, jobMasterAddress);
-						return new RegistrationResponse.Decline("Fail to get the leader retriever for the given JobMaster");
-					}
 				}
-
 				return new JobMasterRegistrationSuccess(5000);
->>>>>>> db98efb... rsourceManager registration with JobManager
 			}
 		}, getMainThreadExecutor());
 	}
@@ -228,26 +237,9 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
 	}
 
 
-<<<<<<< HEAD
 	// ------------------------------------------------------------------------
 	//  Leader Contender
 	// ------------------------------------------------------------------------
-=======
-		/**
-		 * Callback method when current resourceManager lose leadership.
-		 */
-		@Override
-		public void revokeLeadership() {
-			runAsync(new Runnable() {
-				@Override
-				public void run() {
-					log.info("ResourceManager {} was revoked leadership.", getAddress());
-					jobMasters.clear();
-					leaderSessionID = null;
-				}
-			});
-		}
->>>>>>> db98efb... rsourceManager registration with JobManager
 
 	/**
 	 * Callback method when current resourceManager is granted leadership
@@ -263,7 +255,7 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
 				// confirming the leader session ID might be blocking,
 				leaderElectionService.confirmLeaderSessionID(leaderSessionID);
 				// notify SlotManager
-				slotManager.notifyLeaderAddress(getAddress(), leaderSessionID);
+				slotManager.setLeaderUUID(leaderSessionID);
 				ResourceManager.this.leaderSessionID = leaderSessionID;
 			}
 		});
@@ -279,7 +271,8 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
 			public void run() {
 				log.info("ResourceManager {} was revoked leadership.", getAddress());
 				jobMasterGateways.clear();
-				ResourceManager.this.leaderSessionID = null;
+				slotManager.clearState();
+				leaderSessionID = null;
 			}
 		});
 	}
@@ -291,20 +284,15 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
 	 */
 	@Override
 	public void handleError(final Exception exception) {
-		runAsync(new Runnable() {
-			@Override
-			public void run() {
-				log.error("ResourceManager received an error from the LeaderElectionService.", exception);
-				// notify SlotManager
-				slotManager.handleError(exception);
-				// terminate ResourceManager in case of an error
-				shutDown();
-			}
-		});
+		log.error("ResourceManager received an error from the LeaderElectionService.", exception);
+		// terminate ResourceManager in case of an error
+		shutDown();
 	}
 
-	private class JobMasterLeaderListener implements LeaderRetrievalListener {
+	private static class JobMasterLeaderListener implements LeaderRetrievalListener {
+
 		private final JobID jobID;
+		private UUID leaderID;
 
 		private JobMasterLeaderListener(JobID jobID) {
 			this.jobID = jobID;
@@ -312,25 +300,12 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
 
 		@Override
 		public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
-			runAsync(new Runnable() {
-				@Override
-				public void run() {
-					log.info("A new leader for JobMaster {} is elected, address is {}, leaderSessionID is {}", jobID, leaderAddress, leaderSessionID);
-					// update job master leader session id
-					JobMasterRegistration jobMasterRegistration = jobMasters.get(jobID);
-					jobMasterRegistration.setJobMasterLeaderSessionID(leaderSessionID);
-				}
-			});
+			this.leaderID = leaderSessionID;
 		}
 
 		@Override
 		public void handleError(final Exception exception) {
-			runAsync(new Runnable() {
-				@Override
-				public void run() {
-					log.error("JobMasterLeaderListener received an error from the LeaderRetrievalService", exception);
-				}
-			});
+			// TODO
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4efa90f4/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index 97176b2..5d0013c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -59,7 +59,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * </ul>
  * <b>IMPORTANT:</b> This class is <b>Not Thread-safe</b>.
  */
-public abstract class SlotManager implements LeaderRetrievalListener {
+public abstract class SlotManager {
 
 	protected final Logger LOG = LoggerFactory.getLogger(getClass());
 
@@ -514,22 +514,33 @@ public abstract class SlotManager implements LeaderRetrievalListener {
 		public int size() {
 			return allocatedSlots.size();
 		}
+
+		public void clear() {
+			allocatedSlots.clear();
+			allocatedSlotsByAllocationId.clear();
+		}
+	}
+
+	/**
+	 * Clears the state of the SlotManager after leadership revokal
+	 */
+	public void clearState() {
+		taskManagerGateways.clear();
+		registeredSlots.clear();
+		pendingSlotRequests.clear();
+		freeSlots.clear();
+		allocationMap.clear();
+		leaderID = null;
 	}
 
 	// ------------------------------------------------------------------------
-	//  High availability
+	//  High availability (called by the ResourceManager)
 	// ------------------------------------------------------------------------
 
-	@Override
-	public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) {
+	public void setLeaderUUID(UUID leaderSessionID) {
 		this.leaderID = leaderSessionID;
 	}
 
-	@Override
-	public void handleError(Exception exception) {
-		LOG.error("Slot Manager received an error from the leader service", exception);
-	}
-
 	// ------------------------------------------------------------------------
 	//  Testing utilities
 	// ------------------------------------------------------------------------