You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/10/14 13:46:05 UTC
[23/50] [abbrv] flink git commit: [FLINK-4535] rebase and refine
[FLINK-4535] rebase and refine
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/485ef003
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/485ef003
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/485ef003
Branch: refs/heads/flip-6
Commit: 485ef0035fe3f0d4335d880868ab9beb18731fdf
Parents: c9764c8
Author: Maximilian Michels <mx...@apache.org>
Authored: Wed Sep 21 20:20:25 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Oct 14 15:14:41 2016 +0200
----------------------------------------------------------------------
.../resourcemanager/JobMasterRegistration.java | 64 ----
.../resourcemanager/ResourceManager.java | 322 ++++++++++++-------
.../resourcemanager/ResourceManagerGateway.java | 36 +--
.../TaskExecutorRegistration.java | 2 +-
.../slotmanager/SlotManager.java | 1 -
.../ResourceManagerJobMasterTest.java | 174 ++++++++++
.../ResourceManagerTaskExecutorTest.java | 135 ++++++++
.../resourcemanager/ResourceManagerTest.java | 141 --------
.../slotmanager/SlotProtocolTest.java | 43 ++-
9 files changed, 574 insertions(+), 344 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/485ef003/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java
deleted file mode 100644
index 981441f..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.jobmaster.JobMasterGateway;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
-
-import java.util.UUID;
-
-/**
- * This class is responsible for group the JobMasterGateway and the LeaderSessionID of a registered job master
- */
-public class JobMasterRegistration implements LeaderRetrievalListener {
-
- private final JobMasterGateway gateway;
- private final JobID jobID;
- private final UUID leaderSessionID;
- private LeaderRetrievalListener retriever;
-
- public JobMasterRegistration(JobMasterGateway gateway, JobID jobID, UUID leaderSessionID) {
- this.gateway = gateway;
- this.jobID = jobID;
- this.leaderSessionID = leaderSessionID;
- }
-
- public JobMasterGateway getGateway() {
- return gateway;
- }
-
- public UUID getLeaderSessionID() {
- return leaderSessionID;
- }
-
- public JobID getJobID() {
- return jobID;
- }
-
- @Override
- public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) {
-
- }
-
- @Override
- public void handleError(Exception exception) {
-
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/485ef003/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 15692b6..88b8a11 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -18,29 +18,41 @@
package org.apache.flink.runtime.resourcemanager;
-import akka.dispatch.Futures;
-import akka.dispatch.Mapper;
-
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.BiFunction;
+import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
import org.apache.flink.runtime.leaderelection.LeaderContender;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.rpc.RpcMethod;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.jobmaster.JobMaster;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
-import org.apache.flink.runtime.rpc.exceptions.LeaderSessionIDException;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
-import org.apache.flink.runtime.registration.RegistrationResponse;
-import scala.concurrent.Future;
+import org.apache.flink.runtime.util.LeaderConnectionInfo;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -50,25 +62,38 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*
* It offers the following methods as part of its rpc interface to interact with the him remotely:
* <ul>
- * <li>{@link #registerJobMaster(JobMasterRegistration)} registers a {@link JobMaster} at the resource manager</li>
+ * <li>{@link #registerJobMaster(UUID, UUID, String, JobID)} registers a {@link JobMaster} at the resource manager</li>
* <li>{@link #requestSlot(SlotRequest)} requests a slot from the resource manager</li>
* </ul>
*/
-public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
- private final Map<JobMasterGateway, InstanceID> jobMasterGateways;
+public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> implements LeaderContender {
+
+ private final Logger LOG = LoggerFactory.getLogger(getClass());
- /** ResourceID and TaskExecutorRegistration mapping relationship of registered taskExecutors */
- private final Map<ResourceID, TaskExecutorRegistration> startedTaskExecutorGateways;
+ private final Map<JobID, JobMasterGateway> jobMasterGateways;
+
+ private final Set<LeaderRetrievalListener> jobMasterLeaderRetrievalListeners;
+
+ private final Map<ResourceID, TaskExecutorRegistration> taskExecutorGateways;
private final HighAvailabilityServices highAvailabilityServices;
- private LeaderElectionService leaderElectionService = null;
- private UUID leaderSessionID = null;
- public ResourceManager(RpcService rpcService, HighAvailabilityServices highAvailabilityServices) {
+ private LeaderElectionService leaderElectionService;
+
+ private final SlotManager slotManager;
+
+ private UUID leaderSessionID;
+
+ public ResourceManager(
+ RpcService rpcService,
+ HighAvailabilityServices highAvailabilityServices,
+ SlotManager slotManager) {
super(rpcService);
this.highAvailabilityServices = checkNotNull(highAvailabilityServices);
- this.jobMasterGateways = new HashMap<>(16);
- this.startedTaskExecutorGateways = new HashMap<>(16);
+ this.jobMasterGateways = new HashMap<>();
+ this.slotManager = slotManager;
+ this.jobMasterLeaderRetrievalListeners = new HashSet<>();
+ this.taskExecutorGateways = new HashMap<>();
}
@Override
@@ -77,7 +102,7 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
try {
super.start();
leaderElectionService = highAvailabilityServices.getResourceManagerLeaderElectionService();
- leaderElectionService.start(new ResourceManagerLeaderContender());
+ leaderElectionService.start(this);
} catch (Throwable e) {
log.error("A fatal error happened when starting the ResourceManager", e);
throw new RuntimeException("A fatal error happened when starting the ResourceManager", e);
@@ -88,8 +113,11 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
public void shutDown() {
try {
leaderElectionService.stop();
+ for(JobID jobID : jobMasterGateways.keySet()) {
+ highAvailabilityServices.getJobMasterLeaderRetriever(jobID).stop();
+ }
super.shutDown();
- } catch(Throwable e) {
+ } catch (Throwable e) {
log.error("A fatal error happened when shutdown the ResourceManager", e);
throw new RuntimeException("A fatal error happened when shutdown the ResourceManager", e);
}
@@ -102,48 +130,79 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
*/
@VisibleForTesting
UUID getLeaderSessionID() {
- return leaderSessionID;
+ return this.leaderSessionID;
}
/**
* Register a {@link JobMaster} at the resource manager.
*
- * @param jobMasterRegistration Job master registration information
+ * @param resourceManagerLeaderId The fencing token for the ResourceManager leader
+ * @param jobMasterAddress The address of the JobMaster that registers
+ * @param jobID The Job ID of the JobMaster that registers
* @return Future registration response
*/
@RpcMethod
- public Future<RegistrationResponse> registerJobMaster(JobMasterRegistration jobMasterRegistration) {
- Future<JobMasterGateway> jobMasterFuture = getRpcService().connect(jobMasterRegistration.getAddress(), JobMasterGateway.class);
+ public Future<RegistrationResponse> registerJobMaster(
+ final UUID resourceManagerLeaderId, final UUID jobMasterLeaderId,
+ final String jobMasterAddress, final JobID jobID) {
- return jobMasterFuture.map(new Mapper<JobMasterGateway, RegistrationResponse>() {
- @Override
- public RegistrationResponse apply(final JobMasterGateway jobMasterGateway) {
- InstanceID instanceID;
+ checkNotNull(jobMasterAddress);
+ checkNotNull(jobID);
- if (jobMasterGateways.containsKey(jobMasterGateway)) {
- instanceID = jobMasterGateways.get(jobMasterGateway);
- } else {
- instanceID = new InstanceID();
- jobMasterGateways.put(jobMasterGateway, instanceID);
- }
+ return getRpcService()
+ .execute(new Callable<JobMasterGateway>() {
+ @Override
+ public JobMasterGateway call() throws Exception {
- return new TaskExecutorRegistrationSuccess(instanceID, 5000);
- }
- }, getMainThreadExecutionContext());
- }
+ if (!leaderSessionID.equals(resourceManagerLeaderId)) {
+ log.warn("Discard registration from JobMaster {} at ({}) because the expected leader session ID {}" +
+ " did not equal the received leader session ID {}",
+ jobID, jobMasterAddress, leaderSessionID, resourceManagerLeaderId);
+ throw new Exception("Invalid leader session id");
+ }
- /**
- * Requests a slot from the resource manager.
- *
- * @param slotRequest Slot request
- * @return Slot assignment
- */
- @RpcMethod
- public SlotAssignment requestSlot(SlotRequest slotRequest) {
- System.out.println("SlotRequest: " + slotRequest);
- return new SlotAssignment();
- }
+ final LeaderConnectionInfo jobMasterLeaderInfo;
+ try {
+ jobMasterLeaderInfo = LeaderRetrievalUtils.retrieveLeaderConnectionInfo(
+ highAvailabilityServices.getJobMasterLeaderRetriever(jobID), new FiniteDuration(5, TimeUnit.SECONDS));
+ } catch (Exception e) {
+ LOG.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID);
+ throw new Exception("Failed to retrieve JobMasterLeaderRetriever");
+ }
+
+ if (!jobMasterLeaderId.equals(jobMasterLeaderInfo.getLeaderSessionID())) {
+ LOG.info("Declining registration request from non-leading JobManager {}", jobMasterAddress);
+ throw new Exception("JobManager is not leading");
+ }
+ return getRpcService().connect(jobMasterAddress, JobMasterGateway.class).get(5, TimeUnit.SECONDS);
+ }
+ })
+ .handleAsync(new BiFunction<JobMasterGateway, Throwable, RegistrationResponse>() {
+ @Override
+ public RegistrationResponse apply(JobMasterGateway jobMasterGateway, Throwable throwable) {
+
+ if (throwable != null) {
+ return new RegistrationResponse.Decline(throwable.getMessage());
+ } else {
+ JobMasterLeaderListener jobMasterLeaderListener = new JobMasterLeaderListener(jobID);
+ try {
+ LeaderRetrievalService jobMasterLeaderRetriever = highAvailabilityServices.getJobMasterLeaderRetriever(jobID);
+ jobMasterLeaderRetriever.start(jobMasterLeaderListener);
+ } catch (Exception e) {
+ LOG.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID);
+ return new RegistrationResponse.Decline("Failed to retrieve JobMasterLeaderRetriever");
+ }
+ jobMasterLeaderRetrievalListeners.add(jobMasterLeaderListener);
+ final JobMasterGateway existingGateway = jobMasterGateways.put(jobID, jobMasterGateway);
+ if (existingGateway != null) {
+ log.info("Replacing gateway for registered JobID {}.", jobID);
+ }
+ return new JobMasterRegistrationSuccess(5000);
+ }
+ }
+ }, getMainThreadExecutor());
+ }
/**
* Register a {@link org.apache.flink.runtime.taskexecutor.TaskExecutor} at the resource manager
@@ -160,90 +219,129 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
final String taskExecutorAddress,
final ResourceID resourceID) {
- if(!leaderSessionID.equals(resourceManagerLeaderId)) {
- log.warn("Discard registration from TaskExecutor {} at ({}) because the expected leader session ID {} did not equal the received leader session ID {}",
- resourceID, taskExecutorAddress, leaderSessionID, resourceManagerLeaderId);
- return Futures.failed(new LeaderSessionIDException(leaderSessionID, resourceManagerLeaderId));
+ return getRpcService().execute(new Callable<TaskExecutorGateway>() {
+ @Override
+ public TaskExecutorGateway call() throws Exception {
+ if (!leaderSessionID.equals(resourceManagerLeaderId)) {
+ log.warn("Discard registration from TaskExecutor {} at ({}) because the expected leader session ID {} did " +
+ "not equal the received leader session ID {}",
+ resourceID, taskExecutorAddress, leaderSessionID, resourceManagerLeaderId);
+ throw new Exception("Invalid leader session id");
+ }
+
+ return getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class).get(5, TimeUnit.SECONDS);
+ }
+ }).handleAsync(new BiFunction<TaskExecutorGateway, Throwable, RegistrationResponse>() {
+ @Override
+ public RegistrationResponse apply(TaskExecutorGateway taskExecutorGateway, Throwable throwable) {
+ if (throwable != null) {
+ return new RegistrationResponse.Decline(throwable.getMessage());
+ } else {
+ InstanceID id = new InstanceID();
+ TaskExecutorRegistration oldTaskExecutor =
+ taskExecutorGateways.put(resourceID, new TaskExecutorRegistration(taskExecutorGateway, id));
+ if (oldTaskExecutor != null) {
+ log.warn("Receive a duplicate registration from TaskExecutor {} at ({})", resourceID, taskExecutorAddress);
+ }
+ return new TaskExecutorRegistrationSuccess(id, 5000);
+ }
+ }
+ }, getMainThreadExecutor());
+ }
+
+ /**
+ * Requests a slot from the resource manager.
+ *
+ * @param slotRequest Slot request
+ * @return Slot assignment
+ */
+ @RpcMethod
+ public SlotRequestReply requestSlot(SlotRequest slotRequest) {
+ final JobID jobId = slotRequest.getJobId();
+ final JobMasterGateway jobMasterGateway = jobMasterGateways.get(jobId);
+
+ if (jobMasterGateway != null) {
+ return slotManager.requestSlot(slotRequest);
+ } else {
+ LOG.info("Ignoring slot request for unknown JobMaster with JobID {}", jobId);
+ return new SlotRequestRejected(slotRequest.getAllocationId());
}
+ }
- Future<TaskExecutorGateway> taskExecutorGatewayFuture = getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class);
- return taskExecutorGatewayFuture.map(new Mapper<TaskExecutorGateway, RegistrationResponse>() {
+
+ // ------------------------------------------------------------------------
+ // Leader Contender
+ // ------------------------------------------------------------------------
+
+ /**
+ * Callback method when current resourceManager is granted leadership
+ *
+ * @param leaderSessionID unique leadershipID
+ */
+ @Override
+ public void grantLeadership(final UUID leaderSessionID) {
+ runAsync(new Runnable() {
@Override
- public RegistrationResponse apply(final TaskExecutorGateway taskExecutorGateway) {
- InstanceID instanceID = null;
- TaskExecutorRegistration taskExecutorRegistration = startedTaskExecutorGateways.get(resourceID);
- if(taskExecutorRegistration != null) {
- log.warn("Receive a duplicate registration from TaskExecutor {} at ({})", resourceID, taskExecutorAddress);
- instanceID = taskExecutorRegistration.getInstanceID();
- } else {
- instanceID = new InstanceID();
- startedTaskExecutorGateways.put(resourceID, new TaskExecutorRegistration(taskExecutorGateway, instanceID));
- }
+ public void run() {
+ log.info("ResourceManager {} was granted leadership with leader session ID {}", getAddress(), leaderSessionID);
+ // confirming the leader session ID might be blocking,
+ leaderElectionService.confirmLeaderSessionID(leaderSessionID);
+ // notify SlotManager
+ slotManager.setLeaderUUID(leaderSessionID);
+ ResourceManager.this.leaderSessionID = leaderSessionID;
+ }
+ });
+ }
- return new TaskExecutorRegistrationSuccess(instanceID, 5000);
+ /**
+ * Callback method when current resourceManager lose leadership.
+ */
+ @Override
+ public void revokeLeadership() {
+ runAsync(new Runnable() {
+ @Override
+ public void run() {
+ log.info("ResourceManager {} was revoked leadership.", getAddress());
+ jobMasterGateways.clear();
+ taskExecutorGateways.clear();
+ slotManager.clearState();
+ leaderSessionID = null;
}
- }, getMainThreadExecutionContext());
+ });
}
+ /**
+ * Handles error occurring in the leader election service
+ *
+ * @param exception Exception being thrown in the leader election service
+ */
+ @Override
+ public void handleError(final Exception exception) {
+ log.error("ResourceManager received an error from the LeaderElectionService.", exception);
+ // terminate ResourceManager in case of an error
+ shutDown();
+ }
- private class ResourceManagerLeaderContender implements LeaderContender {
+ private static class JobMasterLeaderListener implements LeaderRetrievalListener {
- /**
- * Callback method when current resourceManager is granted leadership
- *
- * @param leaderSessionID unique leadershipID
- */
- @Override
- public void grantLeadership(final UUID leaderSessionID) {
- runAsync(new Runnable() {
- @Override
- public void run() {
- log.info("ResourceManager {} was granted leadership with leader session ID {}", getAddress(), leaderSessionID);
- ResourceManager.this.leaderSessionID = leaderSessionID;
- // confirming the leader session ID might be blocking,
- leaderElectionService.confirmLeaderSessionID(leaderSessionID);
- }
- });
- }
+ private final JobID jobID;
+ private UUID leaderID;
- /**
- * Callback method when current resourceManager lose leadership.
- */
- @Override
- public void revokeLeadership() {
- runAsync(new Runnable() {
- @Override
- public void run() {
- log.info("ResourceManager {} was revoked leadership.", getAddress());
- jobMasterGateways.clear();
- startedTaskExecutorGateways.clear();
- leaderSessionID = null;
- }
- });
+ private JobMasterLeaderListener(JobID jobID) {
+ this.jobID = jobID;
}
@Override
- public String getAddress() {
- return ResourceManager.this.getAddress();
+ public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
+ this.leaderID = leaderSessionID;
}
- /**
- * Handles error occurring in the leader election service
- *
- * @param exception Exception being thrown in the leader election service
- */
@Override
public void handleError(final Exception exception) {
- runAsync(new Runnable() {
- @Override
- public void run() {
- log.error("ResourceManager received an error from the LeaderElectionService.", exception);
- // terminate ResourceManager in case of an error
- shutDown();
- }
- });
+ // TODO
}
}
}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/485ef003/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index 30a096f..d8b8ebe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -18,15 +18,16 @@
package org.apache.flink.runtime.resourcemanager;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcTimeout;
import org.apache.flink.runtime.jobmaster.JobMaster;
-
import org.apache.flink.runtime.registration.RegistrationResponse;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
+import org.apache.flink.runtime.registration.RegistrationResponse;
import java.util.UUID;
/**
@@ -37,21 +38,18 @@ public interface ResourceManagerGateway extends RpcGateway {
/**
* Register a {@link JobMaster} at the resource manager.
*
- * @param jobMasterRegistration Job master registration information
- * @param timeout Timeout for the future to complete
+ * @param resourceManagerLeaderId The fencing token for the ResourceManager leader
+ * @param jobMasterAddress The address of the JobMaster that registers
+ * @param jobID The Job ID of the JobMaster that registers
+ * @param timeout Timeout for the future to complete
* @return Future registration response
*/
Future<RegistrationResponse> registerJobMaster(
- JobMasterRegistration jobMasterRegistration,
- @RpcTimeout FiniteDuration timeout);
+ UUID resourceManagerLeaderId,
+ String jobMasterAddress,
+ JobID jobID,
+ @RpcTimeout Time timeout);
- /**
- * Register a {@link JobMaster} at the resource manager.
- *
- * @param jobMasterRegistration Job master registration information
- * @return Future registration response
- */
- Future<RegistrationResponse> registerJobMaster(JobMasterRegistration jobMasterRegistration);
/**
* Requests a slot from the resource manager.
@@ -59,15 +57,15 @@ public interface ResourceManagerGateway extends RpcGateway {
* @param slotRequest Slot request
* @return Future slot assignment
*/
- Future<SlotAssignment> requestSlot(SlotRequest slotRequest);
+ Future<SlotRequestReply> requestSlot(SlotRequest slotRequest);
/**
* Register a {@link org.apache.flink.runtime.taskexecutor.TaskExecutor} at the resource manager.
*
* @param resourceManagerLeaderId The fencing token for the ResourceManager leader
- * @param taskExecutorAddress The address of the TaskExecutor that registers
- * @param resourceID The resource ID of the TaskExecutor that registers
- * @param timeout The timeout for the response.
+ * @param taskExecutorAddress The address of the TaskExecutor that registers
+ * @param resourceID The resource ID of the TaskExecutor that registers
+ * @param timeout The timeout for the response.
*
* @return The future to the response by the ResourceManager.
*/
@@ -75,5 +73,5 @@ public interface ResourceManagerGateway extends RpcGateway {
UUID resourceManagerLeaderId,
String taskExecutorAddress,
ResourceID resourceID,
- @RpcTimeout FiniteDuration timeout);
+ @RpcTimeout Time timeout);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/485ef003/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java
index bd78a47..f8dfdc7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java
@@ -35,7 +35,7 @@ public class TaskExecutorRegistration implements Serializable {
private InstanceID instanceID;
public TaskExecutorRegistration(TaskExecutorGateway taskExecutorGateway,
- InstanceID instanceID) {
+ InstanceID instanceID) {
this.taskExecutorGateway = taskExecutorGateway;
this.instanceID = instanceID;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/485ef003/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index 5d0013c..a6d2196 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -26,7 +26,6 @@ import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.resourcemanager.SlotRequest;
import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered;
import org.apache.flink.runtime.resourcemanager.SlotRequestReply;
http://git-wip-us.apache.org/repos/asf/flink/blob/485ef003/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
new file mode 100644
index 0000000..332c093
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SimpleSlotManager;
+import org.apache.flink.runtime.rpc.TestingSerialRpcService;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.rpc.exceptions.LeaderSessionIDException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.*;
+
+public class ResourceManagerJobMasterTest {
+
+ private TestingSerialRpcService rpcService;
+
+ @Before
+ public void setup() throws Exception {
+ rpcService = new TestingSerialRpcService();
+ }
+
+ @After
+ public void teardown() throws Exception {
+ rpcService.stopService();
+ }
+
+ /**
+ * Test receive normal registration from job master and receive duplicate registration from job master
+ */
+ @Test
+ public void testRegisterJobMaster() throws Exception {
+ String jobMasterAddress = "/jobMasterAddress1";
+ JobID jobID = mockJobMaster(jobMasterAddress);
+ TestingLeaderElectionService resourceManagerLeaderElectionService = new TestingLeaderElectionService();
+ UUID jmLeaderID = UUID.randomUUID();
+ TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService(jobMasterAddress, jmLeaderID);
+ final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService);
+ final UUID rmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
+
+ // test response successful
+ Future<RegistrationResponse> successfulFuture = resourceManager.registerJobMaster(rmLeaderSessionId, jmLeaderID, jobMasterAddress, jobID);
+ RegistrationResponse response = successfulFuture.get(5, TimeUnit.SECONDS);
+ assertTrue(response instanceof JobMasterRegistrationSuccess);
+ }
+
+ /**
+ * Test receive registration with unmatched leadershipId from job master
+ */
+ @Test
+ public void testRegisterJobMasterWithUnmatchedLeaderSessionId1() throws Exception {
+ String jobMasterAddress = "/jobMasterAddress1";
+ JobID jobID = mockJobMaster(jobMasterAddress);
+ TestingLeaderElectionService resourceManagerLeaderElectionService = new TestingLeaderElectionService();
+ UUID jmLeaderID = UUID.randomUUID();
+ TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService(jobMasterAddress, jmLeaderID);
+ final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService);
+ final UUID rmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
+
+ // test throw exception when receive a registration from job master which takes unmatched leaderSessionId
+ UUID differentLeaderSessionID = UUID.randomUUID();
+ Future<RegistrationResponse> unMatchedLeaderFuture = resourceManager.registerJobMaster(differentLeaderSessionID, jmLeaderID, jobMasterAddress, jobID);
+ assertTrue(unMatchedLeaderFuture.get(5, TimeUnit.SECONDS) instanceof RegistrationResponse.Decline);
+ }
+
+ /**
+ * Test receive registration with unmatched leadershipId from job master
+ */
+ @Test
+ public void testRegisterJobMasterWithUnmatchedLeaderSessionId2() throws Exception {
+ String jobMasterAddress = "/jobMasterAddress1";
+ JobID jobID = mockJobMaster(jobMasterAddress);
+ TestingLeaderElectionService resourceManagerLeaderElectionService = new TestingLeaderElectionService();
+ TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService();
+ final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService);
+ final UUID rmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
+ final UUID jmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
+
+ // test throw exception when receive a registration from job master which takes unmatched leaderSessionId
+ UUID differentLeaderSessionID = UUID.randomUUID();
+ Future<RegistrationResponse> unMatchedLeaderFuture = resourceManager.registerJobMaster(rmLeaderSessionId, differentLeaderSessionID, jobMasterAddress, jobID);
+ assertTrue(unMatchedLeaderFuture.get(5, TimeUnit.SECONDS) instanceof RegistrationResponse.Decline);
+ }
+
+ /**
+ * Test receive registration with invalid address from job master
+ */
+ @Test
+ public void testRegisterJobMasterFromInvalidAddress() throws Exception {
+ String jobMasterAddress = "/jobMasterAddress1";
+ JobID jobID = mockJobMaster(jobMasterAddress);
+ TestingLeaderElectionService resourceManagerLeaderElectionService = new TestingLeaderElectionService();
+ TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService();
+ final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService);
+ final UUID rmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
+ final UUID jmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
+
+ // test throw exception when receive a registration from job master which takes invalid address
+ String invalidAddress = "/jobMasterAddress2";
+ Future<RegistrationResponse> invalidAddressFuture = resourceManager.registerJobMaster(rmLeaderSessionId, jmLeaderSessionId, invalidAddress, jobID);
+ assertTrue(invalidAddressFuture.get(5, TimeUnit.SECONDS) instanceof RegistrationResponse.Decline);
+ }
+
+ /**
+ * Check and verify return RegistrationResponse.Decline when failed to start a job master Leader retrieval listener
+ */
+ @Test
+ public void testRegisterJobMasterWithFailureLeaderListener() throws Exception {
+ String jobMasterAddress = "/jobMasterAddress1";
+ JobID jobID = mockJobMaster(jobMasterAddress);
+ TestingLeaderElectionService resourceManagerLeaderElectionService = new TestingLeaderElectionService();
+ TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService();
+ final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService);
+ final UUID rmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
+ final UUID jmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
+
+ JobID unknownJobIDToHAServices = new JobID();
+ // verify return RegistrationResponse.Decline when failed to start a job master Leader retrieval listener
+ Future<RegistrationResponse> declineFuture = resourceManager.registerJobMaster(rmLeaderSessionId, jmLeaderSessionId, jobMasterAddress, unknownJobIDToHAServices);
+ RegistrationResponse response = declineFuture.get(5, TimeUnit.SECONDS);
+ assertTrue(response instanceof RegistrationResponse.Decline);
+ }
+
+ private JobID mockJobMaster(String jobMasterAddress) {
+ JobID jobID = new JobID();
+ JobMasterGateway jobMasterGateway = mock(JobMasterGateway.class);
+ rpcService.registerGateway(jobMasterAddress, jobMasterGateway);
+ return jobID;
+ }
+
+ private ResourceManager createAndStartResourceManager(TestingLeaderElectionService resourceManagerLeaderElectionService, JobID jobID, TestingLeaderRetrievalService jobMasterLeaderRetrievalService) {
+ TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
+ highAvailabilityServices.setResourceManagerLeaderElectionService(resourceManagerLeaderElectionService);
+ highAvailabilityServices.setJobMasterLeaderRetriever(jobID, jobMasterLeaderRetrievalService);
+ ResourceManager resourceManager = new ResourceManager(rpcService, highAvailabilityServices, new SimpleSlotManager());
+ resourceManager.start();
+ return resourceManager;
+ }
+
+ private UUID grantResourceManagerLeadership(TestingLeaderElectionService resourceManagerLeaderElectionService) {
+ UUID leaderSessionId = UUID.randomUUID();
+ resourceManagerLeaderElectionService.isLeader(leaderSessionId);
+ return leaderSessionId;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/485ef003/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
new file mode 100644
index 0000000..ed7c7d7
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SimpleSlotManager;
+import org.apache.flink.runtime.rpc.TestingSerialRpcService;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.rpc.exceptions.LeaderSessionIDException;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+public class ResourceManagerTaskExecutorTest {
+
+ private TestingSerialRpcService rpcService;
+
+ @Before
+ public void setup() throws Exception {
+ rpcService = new TestingSerialRpcService();
+ }
+
+ @After
+ public void teardown() throws Exception {
+ rpcService.stopService();
+ }
+
+ /**
+ * Test receive normal registration from task executor and receive duplicate registration from task executor
+ */
+ @Test
+ public void testRegisterTaskExecutor() throws Exception {
+ String taskExecutorAddress = "/taskExecutor1";
+ ResourceID taskExecutorResourceID = mockTaskExecutor(taskExecutorAddress);
+ TestingLeaderElectionService rmLeaderElectionService = new TestingLeaderElectionService();
+ final ResourceManager resourceManager = createAndStartResourceManager(rmLeaderElectionService);
+ final UUID leaderSessionId = grantLeadership(rmLeaderElectionService);
+
+ // test response successful
+ Future<RegistrationResponse> successfulFuture = resourceManager.registerTaskExecutor(leaderSessionId, taskExecutorAddress, taskExecutorResourceID);
+ RegistrationResponse response = successfulFuture.get(5, TimeUnit.SECONDS);
+ assertTrue(response instanceof TaskExecutorRegistrationSuccess);
+
+ // test response successful with instanceID not equal to previous when receive duplicate registration from taskExecutor
+ Future<RegistrationResponse> duplicateFuture = resourceManager.registerTaskExecutor(leaderSessionId, taskExecutorAddress, taskExecutorResourceID);
+ RegistrationResponse duplicateResponse = duplicateFuture.get();
+ assertTrue(duplicateResponse instanceof TaskExecutorRegistrationSuccess);
+ assertNotEquals(((TaskExecutorRegistrationSuccess) response).getRegistrationId(), ((TaskExecutorRegistrationSuccess) duplicateResponse).getRegistrationId());
+ }
+
+ /**
+ * Test receive registration with unmatched leadershipId from task executor
+ */
+ @Test
+ public void testRegisterTaskExecutorWithUnmatchedLeaderSessionId() throws Exception {
+ String taskExecutorAddress = "/taskExecutor1";
+ ResourceID taskExecutorResourceID = mockTaskExecutor(taskExecutorAddress);
+ TestingLeaderElectionService rmLeaderElectionService = new TestingLeaderElectionService();
+ final ResourceManager resourceManager = createAndStartResourceManager(rmLeaderElectionService);
+ final UUID leaderSessionId = grantLeadership(rmLeaderElectionService);
+
+ // test throw exception when receive a registration from taskExecutor which takes unmatched leaderSessionId
+ UUID differentLeaderSessionID = UUID.randomUUID();
+ Future<RegistrationResponse> unMatchedLeaderFuture = resourceManager.registerTaskExecutor(differentLeaderSessionID, taskExecutorAddress, taskExecutorResourceID);
+ assertTrue(unMatchedLeaderFuture.get(5, TimeUnit.SECONDS) instanceof RegistrationResponse.Decline);
+ }
+
+ /**
+ * Test receive registration with invalid address from task executor
+ */
+ @Test
+ public void testRegisterTaskExecutorFromInvalidAddress() throws Exception {
+ String taskExecutorAddress = "/taskExecutor1";
+ ResourceID taskExecutorResourceID = mockTaskExecutor(taskExecutorAddress);
+ TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService();
+ final ResourceManager resourceManager = createAndStartResourceManager(leaderElectionService);
+ final UUID leaderSessionId = grantLeadership(leaderElectionService);
+
+ // test throw exception when receive a registration from taskExecutor which takes invalid address
+ String invalidAddress = "/taskExecutor2";
+ Future<RegistrationResponse> invalidAddressFuture = resourceManager.registerTaskExecutor(leaderSessionId, invalidAddress, taskExecutorResourceID);
+ assertTrue(invalidAddressFuture.get(5, TimeUnit.SECONDS) instanceof RegistrationResponse.Decline);
+ }
+
+ private ResourceID mockTaskExecutor(String taskExecutorAddress) {
+ TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
+ ResourceID taskExecutorResourceID = ResourceID.generate();
+ rpcService.registerGateway(taskExecutorAddress, taskExecutorGateway);
+ return taskExecutorResourceID;
+ }
+
+ private ResourceManager createAndStartResourceManager(TestingLeaderElectionService rmLeaderElectionService) {
+ TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
+ highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService);
+ ResourceManager resourceManager = new ResourceManager(rpcService, highAvailabilityServices, new SimpleSlotManager());
+ resourceManager.start();
+ return resourceManager;
+ }
+
+ private UUID grantLeadership(TestingLeaderElectionService leaderElectionService) {
+ UUID leaderSessionId = UUID.randomUUID();
+ leaderElectionService.isLeader(leaderSessionId);
+ return leaderSessionId;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/485ef003/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
deleted file mode 100644
index b75d9b8..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager;
-
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
-import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
-import org.apache.flink.runtime.rpc.TestingSerialRpcService;
-import org.apache.flink.runtime.registration.RegistrationResponse;
-import org.apache.flink.runtime.rpc.exceptions.LeaderSessionIDException;
-import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
-import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-
-public class ResourceManagerTest {
-
- private TestingSerialRpcService rpcService;
-
- @Before
- public void setup() throws Exception {
- rpcService = new TestingSerialRpcService();
- }
-
- @After
- public void teardown() throws Exception {
- rpcService.stopService();
- }
-
- /**
- * Test receive normal registration from task executor and receive duplicate registration from task executor
- *
- * @throws Exception
- */
- @Test
- public void testRegisterTaskExecutor() throws Exception {
- String taskExecutorAddress = "/taskExecutor1";
- ResourceID taskExecutorResourceID = mockTaskExecutor(taskExecutorAddress);
- TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService();
- final ResourceManager resourceManager = createAndStartResourceManager(leaderElectionService);
- final UUID leaderSessionId = grantResourceManagerLeadership(leaderElectionService);
-
- // test response successful
- Future<RegistrationResponse> successfulFuture = resourceManager.registerTaskExecutor(leaderSessionId, taskExecutorAddress, taskExecutorResourceID);
- RegistrationResponse response = Await.result(successfulFuture, new FiniteDuration(0, TimeUnit.SECONDS));
- assertTrue(response instanceof TaskExecutorRegistrationSuccess);
-
- // test response successful with previous instanceID when receive duplicate registration from taskExecutor
- Future<RegistrationResponse> duplicateFuture = resourceManager.registerTaskExecutor(leaderSessionId, taskExecutorAddress, taskExecutorResourceID);
- RegistrationResponse duplicateResponse = Await.result(duplicateFuture, new FiniteDuration(0, TimeUnit.SECONDS));
- assertTrue(duplicateResponse instanceof TaskExecutorRegistrationSuccess);
- assertEquals(((TaskExecutorRegistrationSuccess) response).getRegistrationId(), ((TaskExecutorRegistrationSuccess) duplicateResponse).getRegistrationId());
- }
-
- /**
- * Test receive registration with unmatched leadershipId from task executor
- *
- * @throws Exception
- */
- @Test(expected = LeaderSessionIDException.class)
- public void testRegisterTaskExecutorWithUnmatchedLeaderSessionId() throws Exception {
- String taskExecutorAddress = "/taskExecutor1";
- ResourceID taskExecutorResourceID = mockTaskExecutor(taskExecutorAddress);
- TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService();
- final ResourceManager resourceManager = createAndStartResourceManager(leaderElectionService);
- final UUID leaderSessionId = grantResourceManagerLeadership(leaderElectionService);
-
- // test throw exception when receive a registration from taskExecutor which takes unmatched leaderSessionId
- UUID differentLeaderSessionID = UUID.randomUUID();
- Future<RegistrationResponse> unMatchedLeaderFuture = resourceManager.registerTaskExecutor(differentLeaderSessionID, taskExecutorAddress, taskExecutorResourceID);
- Await.result(unMatchedLeaderFuture, new FiniteDuration(200, TimeUnit.MILLISECONDS));
- }
-
- /**
- * Test receive registration with invalid address from task executor
- *
- * @throws Exception
- */
- @Test(expected = Exception.class)
- public void testRegisterTaskExecutorFromInvalidAddress() throws Exception {
- String taskExecutorAddress = "/taskExecutor1";
- ResourceID taskExecutorResourceID = mockTaskExecutor(taskExecutorAddress);
- TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService();
- final ResourceManager resourceManager = createAndStartResourceManager(leaderElectionService);
- final UUID leaderSessionId = grantResourceManagerLeadership(leaderElectionService);
-
- // test throw exception when receive a registration from taskExecutor which takes invalid address
- String invalidAddress = "/taskExecutor2";
- Future<RegistrationResponse> invalidAddressFuture = resourceManager.registerTaskExecutor(leaderSessionId, invalidAddress, taskExecutorResourceID);
- Await.result(invalidAddressFuture, new FiniteDuration(200, TimeUnit.MILLISECONDS));
- }
-
- private ResourceID mockTaskExecutor(String taskExecutorAddress) {
- TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
- ResourceID taskExecutorResourceID = ResourceID.generate();
- rpcService.registerGateway(taskExecutorAddress, taskExecutorGateway);
- return taskExecutorResourceID;
- }
-
- private ResourceManager createAndStartResourceManager(TestingLeaderElectionService leaderElectionService) {
- TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
- highAvailabilityServices.setResourceManagerLeaderElectionService(leaderElectionService);
- ResourceManager resourceManager = new ResourceManager(rpcService, highAvailabilityServices);
- resourceManager.start();
- return resourceManager;
- }
-
- private UUID grantResourceManagerLeadership(TestingLeaderElectionService leaderElectionService) {
- UUID leaderSessionId = UUID.randomUUID();
- leaderElectionService.isLeader(leaderSessionId);
- return leaderSessionId;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/485ef003/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
index 1f9e7e8..0232fab 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
@@ -24,10 +24,14 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.NonHaServices;
+import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
-import org.apache.flink.runtime.resourcemanager.JobMasterRegistration;
-import org.apache.flink.runtime.resourcemanager.RegistrationResponse;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.resourcemanager.ResourceManager;
import org.apache.flink.runtime.resourcemanager.SlotRequest;
import org.apache.flink.runtime.resourcemanager.SlotRequestReply;
@@ -88,14 +92,20 @@ public class SlotProtocolTest extends TestLogger {
testRpcService.registerGateway(jmAddress, mock(JobMasterGateway.class));
+ final TestingHighAvailabilityServices testingHaServices = new TestingHighAvailabilityServices();
+ final UUID rmLeaderID = UUID.randomUUID();
+ final UUID jmLeaderID = UUID.randomUUID();
+ TestingLeaderElectionService rmLeaderElectionService =
+ configureHA(testingHaServices, jobID, rmAddress, rmLeaderID, jmAddress, jmLeaderID);
TestingSlotManager slotManager = Mockito.spy(new TestingSlotManager());
ResourceManager resourceManager =
- new ResourceManager(testRpcService, new NonHaServices(rmAddress), slotManager);
+ new ResourceManager(testRpcService, testingHaServices, slotManager);
resourceManager.start();
+ rmLeaderElectionService.isLeader(rmLeaderID);
Future<RegistrationResponse> registrationFuture =
- resourceManager.registerJobMaster(new JobMasterRegistration(jmAddress, jobID));
+ resourceManager.registerJobMaster(rmLeaderID, jmLeaderID, jmAddress, jobID);
try {
registrationFuture.get(5, TimeUnit.SECONDS);
} catch (Exception e) {
@@ -158,16 +168,23 @@ public class SlotProtocolTest extends TestLogger {
testRpcService.registerGateway(jmAddress, mock(JobMasterGateway.class));
+ final TestingHighAvailabilityServices testingHaServices = new TestingHighAvailabilityServices();
+ final UUID rmLeaderID = UUID.randomUUID();
+ final UUID jmLeaderID = UUID.randomUUID();
+ TestingLeaderElectionService rmLeaderElectionService =
+ configureHA(testingHaServices, jobID, rmAddress, rmLeaderID, jmAddress, jmLeaderID);
+
TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
testRpcService.registerGateway(tmAddress, taskExecutorGateway);
TestingSlotManager slotManager = Mockito.spy(new TestingSlotManager());
ResourceManager resourceManager =
- new ResourceManager(testRpcService, new NonHaServices(rmAddress), slotManager);
+ new ResourceManager(testRpcService, testingHaServices, slotManager);
resourceManager.start();
+ rmLeaderElectionService.isLeader(rmLeaderID);
Future<RegistrationResponse> registrationFuture =
- resourceManager.registerJobMaster(new JobMasterRegistration(jmAddress, jobID));
+ resourceManager.registerJobMaster(rmLeaderID, jmLeaderID, jmAddress, jobID);
try {
registrationFuture.get(5, TimeUnit.SECONDS);
} catch (Exception e) {
@@ -208,6 +225,20 @@ public class SlotProtocolTest extends TestLogger {
verify(taskExecutorGateway, timeout(5000)).requestSlot(eq(allocationID), any(UUID.class), any(Time.class));
}
+ private static TestingLeaderElectionService configureHA(
+ TestingHighAvailabilityServices testingHA, JobID jobID, String rmAddress, UUID rmID, String jmAddress, UUID jmID) {
+ final TestingLeaderElectionService rmLeaderElectionService = new TestingLeaderElectionService();
+ testingHA.setResourceManagerLeaderElectionService(rmLeaderElectionService);
+ final TestingLeaderRetrievalService rmLeaderRetrievalService = new TestingLeaderRetrievalService(rmAddress, rmID);
+ testingHA.setResourceManagerLeaderRetriever(rmLeaderRetrievalService);
+
+ final TestingLeaderElectionService jmLeaderElectionService = new TestingLeaderElectionService();
+ testingHA.setJobMasterLeaderElectionService(jmLeaderElectionService);
+ final TestingLeaderRetrievalService jmLeaderRetrievalService = new TestingLeaderRetrievalService(jmAddress, jmID);
+ testingHA.setJobMasterLeaderRetriever(jobID, jmLeaderRetrievalService);
+
+ return rmLeaderElectionService;
+ }
private static class TestingSlotManager extends SimpleSlotManager {