You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/11/01 08:40:46 UTC
[19/50] [abbrv] flink git commit: [FLINK-4516] update leadership
information in ResourceManager
[FLINK-4516] update leadership information in ResourceManager
The leadership information remained static for connected
JobMasters. This updates it to remove stale JobMasters when they lose
leadership status.
This closes #2624
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bf4d3843
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bf4d3843
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bf4d3843
Branch: refs/heads/flip-6
Commit: bf4d38432a5ca1fe2dd027333c53746dd1abb11b
Parents: 205de72
Author: Maximilian Michels <mx...@apache.org>
Authored: Mon Oct 10 17:36:10 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Nov 1 09:39:30 2016 +0100
----------------------------------------------------------------------
.../resourcemanager/ResourceManager.java | 196 +++++++++++++------
.../resourcemanager/ResourceManagerGateway.java | 4 +-
.../ResourceManagerServices.java | 6 +
.../registration/JobMasterRegistration.java | 62 ++++++
.../slotmanager/SlotManager.java | 16 +-
.../resourcemanager/TestingSlotManager.java | 8 +
.../slotmanager/SlotManagerTest.java | 10 +-
7 files changed, 224 insertions(+), 78 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/bf4d3843/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index d2d00cf..8fbb34b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.concurrent.AcceptFunction;
import org.apache.flink.runtime.concurrent.ApplyFunction;
import org.apache.flink.runtime.concurrent.BiFunction;
import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
@@ -40,6 +41,7 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestRejected;
import org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestReply;
import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.SlotAvailableReply;
+import org.apache.flink.runtime.resourcemanager.registration.JobMasterRegistration;
import org.apache.flink.runtime.resourcemanager.registration.WorkerRegistration;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
@@ -53,17 +55,14 @@ import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
-import org.apache.flink.runtime.util.LeaderConnectionInfo;
-import org.apache.flink.runtime.util.LeaderRetrievalUtils;
-import scala.concurrent.duration.FiniteDuration;
import java.io.Serializable;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -85,10 +84,10 @@ public abstract class ResourceManager<WorkerType extends Serializable>
protected static final int EXIT_CODE_FATAL_ERROR = -13;
/** All currently registered JobMasterGateways scoped by JobID. */
- private final Map<JobID, JobMasterGateway> jobMasterGateways;
+ private final Map<JobID, JobMasterRegistration> jobMasters;
- /** LeaderListeners for all registered JobMasters. */
- private final Map<JobID, JobMasterLeaderListener> jobMasterLeaderRetrievalListeners;
+ /** LeaderListeners for all registered JobIDs. */
+ private final Map<JobID, JobIdLeaderListener> leaderListeners;
/** All currently registered TaskExecutors with there framework specific worker information. */
private final Map<ResourceID, WorkerRegistration<WorkerType>> taskExecutors;
@@ -106,7 +105,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
private LeaderElectionService leaderElectionService;
/** ResourceManager's leader session id which is updated on leader election. */
- private UUID leaderSessionID;
+ private volatile UUID leaderSessionID;
/** All registered listeners for status updates of the ResourceManager. */
private Map<String, InfoMessageListenerRpcGateway> infoMessageListeners;
@@ -121,8 +120,8 @@ public abstract class ResourceManager<WorkerType extends Serializable>
super(rpcService);
this.highAvailabilityServices = checkNotNull(highAvailabilityServices);
this.slotManagerFactory = checkNotNull(slotManagerFactory);
- this.jobMasterGateways = new HashMap<>();
- this.jobMasterLeaderRetrievalListeners = new HashMap<>();
+ this.jobMasters = new HashMap<>();
+ this.leaderListeners = new HashMap<>();
this.taskExecutors = new HashMap<>();
this.leaderSessionID = new UUID(0, 0);
infoMessageListeners = new HashMap<>();
@@ -149,9 +148,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
public void shutDown() {
try {
leaderElectionService.stop();
- for (JobID jobID : jobMasterGateways.keySet()) {
- highAvailabilityServices.getJobManagerLeaderRetriever(jobID).stop();
- }
+ clearState();
super.shutDown();
} catch (Throwable e) {
log.error("A fatal error happened when shutdown the ResourceManager", e);
@@ -185,6 +182,24 @@ public abstract class ResourceManager<WorkerType extends Serializable>
checkNotNull(jobMasterAddress);
checkNotNull(jobID);
+ // create a leader retriever in case it doesn't exist
+ final JobIdLeaderListener jobIdLeaderListener;
+ if (leaderListeners.containsKey(jobID)) {
+ jobIdLeaderListener = leaderListeners.get(jobID);
+ } else {
+ try {
+ LeaderRetrievalService jobMasterLeaderRetriever =
+ highAvailabilityServices.getJobManagerLeaderRetriever(jobID);
+ jobIdLeaderListener = new JobIdLeaderListener(jobID, jobMasterLeaderRetriever);
+ } catch (Exception e) {
+ log.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID);
+ FlinkCompletableFuture<RegistrationResponse> responseFuture = new FlinkCompletableFuture<>();
+ responseFuture.complete(new RegistrationResponse.Decline("Failed to retrieve JobMasterLeaderRetriever"));
+ return responseFuture;
+ }
+ leaderListeners.put(jobID, jobIdLeaderListener);
+ }
+
return getRpcService()
.execute(new Callable<JobMasterGateway>() {
@Override
@@ -197,21 +212,13 @@ public abstract class ResourceManager<WorkerType extends Serializable>
throw new Exception("Invalid leader session id");
}
- final LeaderConnectionInfo jobMasterLeaderInfo;
- try {
- jobMasterLeaderInfo = LeaderRetrievalUtils.retrieveLeaderConnectionInfo(
- highAvailabilityServices.getJobManagerLeaderRetriever(jobID), new FiniteDuration(5, TimeUnit.SECONDS));
- } catch (Exception e) {
- log.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID);
- throw new Exception("Failed to retrieve JobMasterLeaderRetriever");
- }
-
- if (!jobMasterLeaderId.equals(jobMasterLeaderInfo.getLeaderSessionID())) {
- log.info("Declining registration request from non-leading JobManager {}", jobMasterAddress);
- throw new Exception("JobManager is not leading");
+ if (!jobIdLeaderListener.getLeaderID().get(timeout.getSize(), timeout.getUnit())
+ .equals(jobMasterLeaderId)) {
+ throw new Exception("Leader Id did not match");
}
- return getRpcService().connect(jobMasterAddress, JobMasterGateway.class).get(5, TimeUnit.SECONDS);
+ return getRpcService().connect(jobMasterAddress, JobMasterGateway.class)
+ .get(timeout.getSize(), timeout.getUnit());
}
})
.handleAsync(new BiFunction<JobMasterGateway, Throwable, RegistrationResponse>() {
@@ -220,24 +227,34 @@ public abstract class ResourceManager<WorkerType extends Serializable>
if (throwable != null) {
return new RegistrationResponse.Decline(throwable.getMessage());
- } else {
- if (!jobMasterLeaderRetrievalListeners.containsKey(jobID)) {
- JobMasterLeaderListener jobMasterLeaderListener = new JobMasterLeaderListener(jobID);
- try {
- LeaderRetrievalService jobMasterLeaderRetriever = highAvailabilityServices.getJobManagerLeaderRetriever(jobID);
- jobMasterLeaderRetriever.start(jobMasterLeaderListener);
- } catch (Exception e) {
- log.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID);
- return new RegistrationResponse.Decline("Failed to retrieve JobMasterLeaderRetriever");
- }
- jobMasterLeaderRetrievalListeners.put(jobID, jobMasterLeaderListener);
- }
- final JobMasterGateway existingGateway = jobMasterGateways.put(jobID, jobMasterGateway);
- if (existingGateway != null) {
- log.info("Replacing gateway for registered JobID {}.", jobID);
+ }
+
+ if (!leaderSessionID.equals(resourceManagerLeaderId)) {
+ log.warn("Discard registration from JobMaster {} at ({}) because the expected leader session ID {}" +
+ " did not equal the received leader session ID {}",
+ jobID, jobMasterAddress, leaderSessionID, resourceManagerLeaderId);
+ return new RegistrationResponse.Decline("Invalid leader session id");
+ }
+
+ try {
+ // LeaderID should be available now, but if not we fail the registration
+ UUID currentJobMasterLeaderId = jobIdLeaderListener.getLeaderID().getNow(null);
+ if (currentJobMasterLeaderId == null || !currentJobMasterLeaderId.equals(jobMasterLeaderId)) {
+ throw new Exception("Leader Id did not match");
}
- return new JobMasterRegistrationSuccess(5000, resourceManagerLeaderId);
+ } catch (Exception e) {
+ return new RegistrationResponse.Decline(e.getMessage());
+ }
+
+ final JobMasterRegistration registration =
+ new JobMasterRegistration(jobID, jobMasterLeaderId, jobMasterGateway);
+
+ final JobMasterRegistration existingRegistration = jobMasters.put(jobID, registration);
+ if (existingRegistration != null) {
+ log.info("Replacing JobMaster registration for newly registered JobMaster with JobID {}.", jobID);
}
+ return new JobMasterRegistrationSuccess(5000, resourceManagerLeaderId);
+
}
}, getMainThreadExecutor());
}
@@ -305,13 +322,10 @@ public abstract class ResourceManager<WorkerType extends Serializable>
SlotRequest slotRequest) {
JobID jobId = slotRequest.getJobId();
- JobMasterGateway jobMasterGateway = jobMasterGateways.get(jobId);
- JobMasterLeaderListener jobMasterLeaderListener = jobMasterLeaderRetrievalListeners.get(jobId);
+ JobMasterRegistration jobMasterRegistration = jobMasters.get(jobId);
- UUID leaderID = jobMasterLeaderListener.getLeaderID();
-
- if (jobMasterGateway != null
- && jobMasterLeaderID.equals(leaderID)
+ if (jobMasterRegistration != null
+ && jobMasterLeaderID.equals(jobMasterRegistration.getLeaderID())
&& resourceManagerLeaderID.equals(leaderSessionID)) {
return slotManager.requestSlot(slotRequest);
} else {
@@ -371,8 +385,6 @@ public abstract class ResourceManager<WorkerType extends Serializable>
log.info("ResourceManager {} was granted leadership with leader session ID {}", getAddress(), leaderSessionID);
// confirming the leader session ID might be blocking,
leaderElectionService.confirmLeaderSessionID(leaderSessionID);
- // notify SlotManager
- slotManager.setLeaderUUID(leaderSessionID);
ResourceManager.this.leaderSessionID = leaderSessionID;
}
});
@@ -387,10 +399,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
@Override
public void run() {
log.info("ResourceManager {} was revoked leadership.", getAddress());
- jobMasterGateways.clear();
- taskExecutors.clear();
- slotManager.clearState();
- leaderSessionID = new UUID(0, 0);
+ clearState();
}
});
}
@@ -577,6 +586,11 @@ public abstract class ResourceManager<WorkerType extends Serializable>
private class DefaultResourceManagerServices implements ResourceManagerServices {
@Override
+ public UUID getLeaderID() {
+ return ResourceManager.this.leaderSessionID;
+ }
+
+ @Override
public void allocateResource(ResourceProfile resourceProfile) {
ResourceManager.this.startNewWorker(resourceProfile);
}
@@ -592,33 +606,95 @@ public abstract class ResourceManager<WorkerType extends Serializable>
}
}
- private static class JobMasterLeaderListener implements LeaderRetrievalListener {
+ /**
+ * Leader instantiated for each connected JobMaster
+ */
+ private class JobIdLeaderListener implements LeaderRetrievalListener {
private final JobID jobID;
- private UUID leaderID;
+ private final LeaderRetrievalService retrievalService;
- private JobMasterLeaderListener(JobID jobID) {
+ private final FlinkCompletableFuture<UUID> initialLeaderIdFuture;
+
+ private volatile UUID leaderID;
+
+ private JobIdLeaderListener(
+ JobID jobID,
+ LeaderRetrievalService retrievalService) throws Exception {
this.jobID = jobID;
+ this.retrievalService = retrievalService;
+ this.initialLeaderIdFuture = new FlinkCompletableFuture<>();
+ this.retrievalService.start(this);
+ }
+
+ public Future<UUID> getLeaderID() {
+ if (!initialLeaderIdFuture.isDone()) {
+ return initialLeaderIdFuture;
+ } else {
+ return FlinkCompletableFuture.completed(leaderID);
+ }
}
public JobID getJobID() {
return jobID;
}
- public UUID getLeaderID() {
- return leaderID;
+
+ public void stopService() throws Exception {
+ retrievalService.stop();
}
@Override
public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
this.leaderID = leaderSessionID;
+
+ if (!initialLeaderIdFuture.isDone()) {
+ initialLeaderIdFuture.complete(leaderSessionID);
+ }
+
+ ResourceManager.this.runAsync(new Runnable() {
+ @Override
+ public void run() {
+ JobMasterRegistration jobMasterRegistration = ResourceManager.this.jobMasters.get(jobID);
+ if (jobMasterRegistration == null || !jobMasterRegistration.getLeaderID().equals(leaderSessionID)) {
+ // registration is not valid anymore, remove registration
+ ResourceManager.this.jobMasters.remove(jobID);
+ // leader listener is not necessary anymore
+ JobIdLeaderListener listener = ResourceManager.this.leaderListeners.remove(jobID);
+ if (listener != null) {
+ try {
+ listener.stopService();
+ } catch (Exception e) {
+ ResourceManager.this.handleError(e);
+ }
+ }
+ }
+ }
+ });
}
@Override
public void handleError(final Exception exception) {
- // TODO
+ ResourceManager.this.handleError(exception);
}
}
+ private void clearState() {
+ jobMasters.clear();
+ taskExecutors.clear();
+ slotManager.clearState();
+ Iterator<JobIdLeaderListener> leaderListenerIterator =
+ leaderListeners.values().iterator();
+ while (leaderListenerIterator.hasNext()) {
+ JobIdLeaderListener listener = leaderListenerIterator.next();
+ try {
+ listener.stopService();
+ } catch (Exception e) {
+ handleError(e);
+ }
+ leaderListenerIterator.remove();
+ }
+ leaderSessionID = new UUID(0, 0);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bf4d3843/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index 3c81227..07e9e43 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -61,14 +61,14 @@ public interface ResourceManagerGateway extends RpcGateway {
/**
* Requests a slot from the resource manager.
*
- * @param jobMasterLeaderID leader id of the JobMaster
* @param resourceManagerLeaderID leader if of the ResourceMaster
+ * @param jobMasterLeaderID leader if of the JobMaster
* @param slotRequest The slot to request
* @return The confirmation that the slot gets allocated
*/
Future<RMSlotRequestReply> requestSlot(
- UUID jobMasterLeaderID,
UUID resourceManagerLeaderID,
+ UUID jobMasterLeaderID,
SlotRequest slotRequest,
@RpcTimeout Time timeout);
http://git-wip-us.apache.org/repos/asf/flink/blob/bf4d3843/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServices.java
index b997a3a..16d0a7d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServices.java
@@ -19,6 +19,7 @@ package org.apache.flink.runtime.resourcemanager;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import java.util.UUID;
import java.util.concurrent.Executor;
/**
@@ -27,6 +28,11 @@ import java.util.concurrent.Executor;
public interface ResourceManagerServices {
/**
+ * Gets the current leader id assigned at the ResourceManager.
+ */
+ UUID getLeaderID();
+
+ /**
* Allocates a resource according to the resource profile.
*/
void allocateResource(ResourceProfile resourceProfile);
http://git-wip-us.apache.org/repos/asf/flink/blob/bf4d3843/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/JobMasterRegistration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/JobMasterRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/JobMasterRegistration.java
new file mode 100644
index 0000000..f417935
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/JobMasterRegistration.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.registration;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+
+import java.util.UUID;
+
+/**
+ * This class is responsible for grouping the JobMasterGateway and the JobMaster's
+ * leader id
+ */
+public class JobMasterRegistration {
+
+ private static final long serialVersionUID = -2062957799469434614L;
+
+ private final JobID jobID;
+
+ private final UUID leaderID;
+
+ private final JobMasterGateway jobMasterGateway;
+
+ public JobMasterRegistration(
+ JobID jobID,
+ UUID leaderID,
+ JobMasterGateway jobMasterGateway) {
+ this.jobID = jobID;
+ this.leaderID = leaderID;
+ this.jobMasterGateway = jobMasterGateway;
+ }
+
+ public JobID getJobID() {
+ return jobID;
+ }
+
+
+ public UUID getLeaderID() {
+ return leaderID;
+ }
+
+ public JobMasterGateway getJobMasterGateway() {
+ return jobMasterGateway;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/bf4d3843/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index 7eb2d78..e312ea2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -41,7 +41,6 @@ import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
-import java.util.UUID;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -85,9 +84,6 @@ public abstract class SlotManager {
private final Time timeout;
- /** The current leader id set by the ResourceManager */
- private UUID leaderID;
-
public SlotManager(ResourceManagerServices rmServices) {
this.rmServices = checkNotNull(rmServices);
this.registeredSlots = new HashMap<>(16);
@@ -96,7 +92,6 @@ public abstract class SlotManager {
this.allocationMap = new AllocationMap();
this.taskManagers = new HashMap<>();
this.timeout = Time.seconds(10);
- this.leaderID = new UUID(0, 0);
}
// ------------------------------------------------------------------------
@@ -303,7 +298,7 @@ public abstract class SlotManager {
final TaskExecutorRegistration registration = freeSlot.getTaskExecutorRegistration();
final Future<TMSlotRequestReply> slotRequestReplyFuture =
registration.getTaskExecutorGateway()
- .requestSlot(freeSlot.getSlotId(), allocationID, leaderID, timeout);
+ .requestSlot(freeSlot.getSlotId(), allocationID, rmServices.getLeaderID(), timeout);
slotRequestReplyFuture.handleAsync(new BiFunction<TMSlotRequestReply, Throwable, Void>() {
@Override
@@ -488,15 +483,6 @@ public abstract class SlotManager {
pendingSlotRequests.clear();
freeSlots.clear();
allocationMap.clear();
- leaderID = new UUID(0, 0);
- }
-
- // ------------------------------------------------------------------------
- // High availability (called by the ResourceManager)
- // ------------------------------------------------------------------------
-
- public void setLeaderUUID(UUID leaderSessionID) {
- this.leaderID = leaderSessionID;
}
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/bf4d3843/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingSlotManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingSlotManager.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingSlotManager.java
index 0b2c42b..67b208d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingSlotManager.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingSlotManager.java
@@ -26,6 +26,7 @@ import org.mockito.Mockito;
import java.util.Iterator;
import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.Executor;
public class TestingSlotManager extends SlotManager {
@@ -60,6 +61,13 @@ public class TestingSlotManager extends SlotManager {
private static class TestingResourceManagerServices implements ResourceManagerServices {
+ private final UUID leaderID = UUID.randomUUID();
+
+ @Override
+ public UUID getLeaderID() {
+ return leaderID;
+ }
+
@Override
public void allocateResource(ResourceProfile resourceProfile) {
http://git-wip-us.apache.org/repos/asf/flink/blob/bf4d3843/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index 0d2b40d..558d3c2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -498,13 +498,21 @@ public class SlotManagerTest {
private static class TestingRmServices implements ResourceManagerServices {
- private List<ResourceProfile> allocatedContainers;
+ private final UUID leaderID;
+
+ private final List<ResourceProfile> allocatedContainers;
public TestingRmServices() {
+ this.leaderID = UUID.randomUUID();
this.allocatedContainers = new LinkedList<>();
}
@Override
+ public UUID getLeaderID() {
+ return leaderID;
+ }
+
+ @Override
public void allocateResource(ResourceProfile resourceProfile) {
allocatedContainers.add(resourceProfile);
}