You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/10/06 11:48:53 UTC
[33/50] [abbrv] flink git commit: [FLINK-4537] rebase and refine
[FLINK-4537] rebase and refine
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0c67aa96
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0c67aa96
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0c67aa96
Branch: refs/heads/flip-6
Commit: 0c67aa96e4311154b330ec32950f113c928732f8
Parents: 3d737b0
Author: Maximilian Michels <mx...@apache.org>
Authored: Wed Sep 21 14:13:12 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Oct 6 13:38:43 2016 +0200
----------------------------------------------------------------------
.../resourcemanager/JobMasterRegistration.java | 52 +++---
.../resourcemanager/ResourceManager.java | 165 ++++++++-----------
.../slotmanager/SlotManager.java | 29 +++-
3 files changed, 110 insertions(+), 136 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0c67aa96/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java
index 7b8ec70..981441f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java
@@ -18,59 +18,47 @@
package org.apache.flink.runtime.resourcemanager;
-<<<<<<< HEAD
import org.apache.flink.api.common.JobID;
-=======
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
->>>>>>> db98efb... rsourceManager registration with JobManager
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
-import java.io.Serializable;
import java.util.UUID;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
/**
* This class is responsible for group the JobMasterGateway and the LeaderSessionID of a registered job master
*/
-public class JobMasterRegistration implements Serializable {
+public class JobMasterRegistration implements LeaderRetrievalListener {
-<<<<<<< HEAD
- private final String address;
+ private final JobMasterGateway gateway;
private final JobID jobID;
+ private final UUID leaderSessionID;
+ private LeaderRetrievalListener retriever;
- public JobMasterRegistration(String address, JobID jobID) {
- this.address = address;
+ public JobMasterRegistration(JobMasterGateway gateway, JobID jobID, UUID leaderSessionID) {
+ this.gateway = gateway;
this.jobID = jobID;
-=======
- private static final long serialVersionUID = -2316627821716999527L;
-
- private final JobMasterGateway jobMasterGateway;
-
- private UUID jobMasterLeaderSessionID;
-
- public JobMasterRegistration(JobMasterGateway jobMasterGateway) {
- this.jobMasterGateway = checkNotNull(jobMasterGateway);
+ this.leaderSessionID = leaderSessionID;
}
- public JobMasterRegistration(JobMasterGateway jobMasterGateway, UUID jobMasterLeaderSessionID) {
- this.jobMasterGateway = checkNotNull(jobMasterGateway);
- this.jobMasterLeaderSessionID = jobMasterLeaderSessionID;
+ public JobMasterGateway getGateway() {
+ return gateway;
}
- public JobMasterGateway getJobMasterGateway() {
- return jobMasterGateway;
+ public UUID getLeaderSessionID() {
+ return leaderSessionID;
}
- public void setJobMasterLeaderSessionID(UUID leaderSessionID) {
- this.jobMasterLeaderSessionID = jobMasterLeaderSessionID;
->>>>>>> db98efb... rsourceManager registration with JobManager
+ public JobID getJobID() {
+ return jobID;
}
- public UUID getJobMasterLeaderSessionID() {
- return jobMasterLeaderSessionID;
+ @Override
+ public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) {
+
}
- public JobID getJobID() {
- return jobID;
+ @Override
+ public void handleError(Exception exception) {
+
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c67aa96/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 8be1455..aae4874 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.resourcemanager;
-import akka.dispatch.Futures;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -29,26 +28,31 @@ import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
import org.apache.flink.runtime.leaderelection.LeaderContender;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
-<<<<<<< HEAD
-import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
-=======
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
->>>>>>> db98efb... rsourceManager registration with JobManager
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.rpc.RpcMethod;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.jobmaster.JobMaster;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
-import org.apache.flink.runtime.rpc.exceptions.LeaderSessionIDException;
import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.concurrent.Future;
+
+import org.apache.flink.runtime.util.LeaderConnectionInfo;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -62,17 +66,13 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* <li>{@link #requestSlot(SlotRequest)} requests a slot from the resource manager</li>
* </ul>
*/
-<<<<<<< HEAD
public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> implements LeaderContender {
private final Logger LOG = LoggerFactory.getLogger(getClass());
private final Map<JobID, JobMasterGateway> jobMasterGateways;
-=======
-public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
- /** the mapping relationship of JobID and JobMasterGateway */
- private final Map<JobID, JobMasterRegistration> jobMasters;
->>>>>>> db98efb... rsourceManager registration with JobManager
+
+ private final Set<LeaderRetrievalListener> jobMasterLeaderRetrievalListeners;
private final HighAvailabilityServices highAvailabilityServices;
@@ -88,12 +88,9 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
SlotManager slotManager) {
super(rpcService);
this.highAvailabilityServices = checkNotNull(highAvailabilityServices);
-<<<<<<< HEAD
this.jobMasterGateways = new HashMap<>();
this.slotManager = slotManager;
-=======
- this.jobMasters = new HashMap<>(16);
->>>>>>> db98efb... rsourceManager registration with JobManager
+ this.jobMasterLeaderRetrievalListeners = new HashSet<>();
}
@Override
@@ -113,7 +110,7 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
public void shutDown() {
try {
leaderElectionService.stop();
- for(JobID jobID : jobMasters.keySet()) {
+ for(JobID jobID : jobMasterGateways.keySet()) {
highAvailabilityServices.getJobMasterLeaderRetriever(jobID).stop();
}
super.shutDown();
@@ -142,52 +139,64 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
* @return Future registration response
*/
@RpcMethod
-<<<<<<< HEAD
- public Future<RegistrationResponse> registerJobMaster(JobMasterRegistration jobMasterRegistration) {
- final Future<JobMasterGateway> jobMasterFuture =
- getRpcService().connect(jobMasterRegistration.getAddress(), JobMasterGateway.class);
- final JobID jobID = jobMasterRegistration.getJobID();
-=======
- public Future<RegistrationResponse> registerJobMaster(UUID resourceManagerLeaderId, final String jobMasterAddress, final JobID jobID) {
+ public Future<RegistrationResponse> registerJobMaster(
+ final UUID resourceManagerLeaderId, final UUID jobMasterLeaderId,
+ final String jobMasterAddress, final JobID jobID) {
+
+ checkNotNull(resourceManagerLeaderId);
+ checkNotNull(jobMasterAddress);
+ checkNotNull(jobID);
+
+ // TODO mxm The leader retrieval needs to be split up in an async part which runs outside the main execution thread
+ // The state updates should be performed inside the main thread
+
+ final FlinkCompletableFuture<RegistrationResponse> future = new FlinkCompletableFuture<>();
if(!leaderSessionID.equals(resourceManagerLeaderId)) {
- log.warn("Discard registration from JobMaster {} at ({}) because the expected leader session ID {} did not equal the received leader session ID {}",
+ log.warn("Discard registration from JobMaster {} at ({}) because the expected leader session ID {}" +
+ " did not equal the received leader session ID {}",
jobID, jobMasterAddress, leaderSessionID, resourceManagerLeaderId);
- return Futures.failed(new LeaderSessionIDException(leaderSessionID, resourceManagerLeaderId));
+ future.complete(new RegistrationResponse.Decline("Invalid leader session id"));
+ return future;
}
- Future<JobMasterGateway> jobMasterFuture = getRpcService().connect(jobMasterAddress, JobMasterGateway.class);
->>>>>>> db98efb... rsourceManager registration with JobManager
+ final LeaderConnectionInfo jobMasterLeaderInfo;
+ try {
+ jobMasterLeaderInfo = LeaderRetrievalUtils.retrieveLeaderConnectionInfo(
+ highAvailabilityServices.getJobMasterLeaderRetriever(jobID), new FiniteDuration(5, TimeUnit.SECONDS));
+ } catch (Exception e) {
+ LOG.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID);
+ future.complete(new RegistrationResponse.Decline("Failed to retrieve JobMasterLeaderRetriever"));
+ return future;
+ }
+
+ if (!jobMasterLeaderId.equals(jobMasterLeaderInfo.getLeaderSessionID())) {
+ LOG.info("Declining registration request from non-leading JobManager {}", jobMasterAddress);
+ future.complete(new RegistrationResponse.Decline("JobManager is not leading"));
+ return future;
+ }
- return jobMasterFuture.thenApplyAsync(new ApplyFunction<JobMasterGateway, RegistrationResponse>() {
+ Future<JobMasterGateway> jobMasterGatewayFuture =
+ getRpcService().connect(jobMasterAddress, JobMasterGateway.class);
+
+ return jobMasterGatewayFuture.thenApplyAsync(new ApplyFunction<JobMasterGateway, RegistrationResponse>() {
@Override
public RegistrationResponse apply(JobMasterGateway jobMasterGateway) {
-<<<<<<< HEAD
+
+ final JobMasterLeaderListener jobMasterLeaderListener = new JobMasterLeaderListener(jobID);
+ try {
+ LeaderRetrievalService jobMasterLeaderRetriever = highAvailabilityServices.getJobMasterLeaderRetriever(jobID);
+ jobMasterLeaderRetriever.start(jobMasterLeaderListener);
+ } catch (Exception e) {
+ LOG.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID);
+ return new RegistrationResponse.Decline("Failed to retrieve JobMasterLeaderRetriever");
+ }
+ jobMasterLeaderRetrievalListeners.add(jobMasterLeaderListener);
final JobMasterGateway existingGateway = jobMasterGateways.put(jobID, jobMasterGateway);
if (existingGateway != null) {
- LOG.info("Replacing existing gateway {} for JobID {} with {}.",
- existingGateway, jobID, jobMasterGateway);
- }
- return new RegistrationResponse(true);
-=======
- if (jobMasters.containsKey(jobID)) {
- JobMasterRegistration jobMasterRegistration = new JobMasterRegistration(jobMasterGateway, jobMasters.get(jobID).getJobMasterLeaderSessionID());
- jobMasters.put(jobID, jobMasterRegistration);
log.info("Replacing gateway for registered JobID {}.", jobID);
- } else {
- JobMasterRegistration jobMasterRegistration = new JobMasterRegistration(jobMasterGateway);
- jobMasters.put(jobID, jobMasterRegistration);
- try {
- highAvailabilityServices.getJobMasterLeaderRetriever(jobID).start(new JobMasterLeaderListener(jobID));
- } catch(Throwable e) {
- log.warn("Decline registration from JobMaster {} at ({}) because fail to get the leader retriever for the given job JobMaster",
- jobID, jobMasterAddress);
- return new RegistrationResponse.Decline("Fail to get the leader retriever for the given JobMaster");
- }
}
-
return new JobMasterRegistrationSuccess(5000);
->>>>>>> db98efb... rsourceManager registration with JobManager
}
}, getMainThreadExecutor());
}
@@ -228,26 +237,9 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
}
-<<<<<<< HEAD
// ------------------------------------------------------------------------
// Leader Contender
// ------------------------------------------------------------------------
-=======
- /**
- * Callback method when current resourceManager lose leadership.
- */
- @Override
- public void revokeLeadership() {
- runAsync(new Runnable() {
- @Override
- public void run() {
- log.info("ResourceManager {} was revoked leadership.", getAddress());
- jobMasters.clear();
- leaderSessionID = null;
- }
- });
- }
->>>>>>> db98efb... rsourceManager registration with JobManager
/**
* Callback method when current resourceManager is granted leadership
@@ -263,7 +255,7 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
// confirming the leader session ID might be blocking,
leaderElectionService.confirmLeaderSessionID(leaderSessionID);
// notify SlotManager
- slotManager.notifyLeaderAddress(getAddress(), leaderSessionID);
+ slotManager.setLeaderUUID(leaderSessionID);
ResourceManager.this.leaderSessionID = leaderSessionID;
}
});
@@ -279,7 +271,8 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
public void run() {
log.info("ResourceManager {} was revoked leadership.", getAddress());
jobMasterGateways.clear();
- ResourceManager.this.leaderSessionID = null;
+ slotManager.clearState();
+ leaderSessionID = null;
}
});
}
@@ -291,20 +284,15 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
*/
@Override
public void handleError(final Exception exception) {
- runAsync(new Runnable() {
- @Override
- public void run() {
- log.error("ResourceManager received an error from the LeaderElectionService.", exception);
- // notify SlotManager
- slotManager.handleError(exception);
- // terminate ResourceManager in case of an error
- shutDown();
- }
- });
+ log.error("ResourceManager received an error from the LeaderElectionService.", exception);
+ // terminate ResourceManager in case of an error
+ shutDown();
}
- private class JobMasterLeaderListener implements LeaderRetrievalListener {
+ private static class JobMasterLeaderListener implements LeaderRetrievalListener {
+
private final JobID jobID;
+ private UUID leaderID;
private JobMasterLeaderListener(JobID jobID) {
this.jobID = jobID;
@@ -312,25 +300,12 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
@Override
public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
- runAsync(new Runnable() {
- @Override
- public void run() {
- log.info("A new leader for JobMaster {} is elected, address is {}, leaderSessionID is {}", jobID, leaderAddress, leaderSessionID);
- // update job master leader session id
- JobMasterRegistration jobMasterRegistration = jobMasters.get(jobID);
- jobMasterRegistration.setJobMasterLeaderSessionID(leaderSessionID);
- }
- });
+ this.leaderID = leaderSessionID;
}
@Override
public void handleError(final Exception exception) {
- runAsync(new Runnable() {
- @Override
- public void run() {
- log.error("JobMasterLeaderListener received an error from the LeaderRetrievalService", exception);
- }
- });
+ // TODO
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c67aa96/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index 97176b2..5d0013c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -59,7 +59,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* </ul>
* <b>IMPORTANT:</b> This class is <b>Not Thread-safe</b>.
*/
-public abstract class SlotManager implements LeaderRetrievalListener {
+public abstract class SlotManager {
protected final Logger LOG = LoggerFactory.getLogger(getClass());
@@ -514,22 +514,33 @@ public abstract class SlotManager implements LeaderRetrievalListener {
public int size() {
return allocatedSlots.size();
}
+
+ public void clear() {
+ allocatedSlots.clear();
+ allocatedSlotsByAllocationId.clear();
+ }
+ }
+
+ /**
+ * Clears the state of the SlotManager after leadership revokal
+ */
+ public void clearState() {
+ taskManagerGateways.clear();
+ registeredSlots.clear();
+ pendingSlotRequests.clear();
+ freeSlots.clear();
+ allocationMap.clear();
+ leaderID = null;
}
// ------------------------------------------------------------------------
- // High availability
+ // High availability (called by the ResourceManager)
// ------------------------------------------------------------------------
- @Override
- public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) {
+ public void setLeaderUUID(UUID leaderSessionID) {
this.leaderID = leaderSessionID;
}
- @Override
- public void handleError(Exception exception) {
- LOG.error("Slot Manager received an error from the leader service", exception);
- }
-
// ------------------------------------------------------------------------
// Testing utilities
// ------------------------------------------------------------------------