You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/10/14 13:45:55 UTC
[13/50] [abbrv] flink git commit: [FLINK-4537] [cluster management]
ResourceManager registration with JobManager
[FLINK-4537] [cluster management] ResourceManager registration with JobManager
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/efc7de5b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/efc7de5b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/efc7de5b
Branch: refs/heads/flip-6
Commit: efc7de5bd3bff0512c20485f94d563c9e9cea5ec
Parents: f4dc474
Author: beyond1920 <be...@126.com>
Authored: Thu Sep 1 15:27:20 2016 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Oct 14 15:14:40 2016 +0200
----------------------------------------------------------------------
.../HighAvailabilityServices.java | 9 ++
.../runtime/highavailability/NonHaServices.java | 19 +++
.../jobmaster/JobMasterRegistrationSuccess.java | 49 ++++++
.../resourcemanager/JobMasterRegistration.java | 39 ++++-
.../resourcemanager/ResourceManager.java | 125 +++++++++++++--
.../resourcemanager/ResourceManagerGateway.java | 34 ++--
.../exceptions/LeaderSessionIDException.java | 60 +++++++
.../runtime/taskexecutor/TaskExecutor.java | 5 +
.../TestingHighAvailabilityServices.java | 17 ++
.../resourcemanager/ResourceManagerTest.java | 160 +++++++++++++++++++
10 files changed, 483 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/efc7de5b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
index 298147c..7634176 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
@@ -40,6 +40,15 @@ public interface HighAvailabilityServices {
LeaderRetrievalService getResourceManagerLeaderRetriever() throws Exception;
/**
+ * Gets the leader retriever for the job JobMaster which is responsible for the given job
+ *
+ * @param jobID The identifier of the job.
+ * @return
+ * @throws Exception
+ */
+ LeaderRetrievalService getJobMasterLeaderRetriever(JobID jobID) throws Exception;
+
+ /**
* Gets the leader election service for the cluster's resource manager.
* @return
* @throws Exception
http://git-wip-us.apache.org/repos/asf/flink/blob/efc7de5b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
index 292a404..33dc2d7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.highavailability;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.hadoop.shaded.org.jboss.netty.util.internal.ConcurrentHashMap;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.leaderelection.StandaloneLeaderElectionService;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -42,6 +43,8 @@ public class NonHaServices implements HighAvailabilityServices {
/** The fix address of the ResourceManager */
private final String resourceManagerAddress;
+ private final ConcurrentHashMap<JobID, String> jobMastersAddress;
+
/**
* Creates a new services class for the fix pre-defined leaders.
*
@@ -49,6 +52,17 @@ public class NonHaServices implements HighAvailabilityServices {
*/
public NonHaServices(String resourceManagerAddress) {
this.resourceManagerAddress = checkNotNull(resourceManagerAddress);
+ this.jobMastersAddress = new ConcurrentHashMap<>(16);
+ }
+
+ /**
+ * Binds address of a specified job master
+ *
+ * @param jobID JobID for the specified job master
+ * @param jobMasterAddress address for the specified job master
+ */
+ public void bindJobMasterLeaderAddress(JobID jobID, String jobMasterAddress) {
+ jobMastersAddress.put(jobID, jobMasterAddress);
}
// ------------------------------------------------------------------------
@@ -61,6 +75,11 @@ public class NonHaServices implements HighAvailabilityServices {
}
@Override
+ public LeaderRetrievalService getJobMasterLeaderRetriever(JobID jobID) throws Exception {
+ return new StandaloneLeaderRetrievalService(jobMastersAddress.get(jobID), new UUID(0, 0));
+ }
+
+ @Override
public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception {
return new StandaloneLeaderElectionService();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/efc7de5b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRegistrationSuccess.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRegistrationSuccess.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRegistrationSuccess.java
new file mode 100644
index 0000000..031c38e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRegistrationSuccess.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.runtime.registration.RegistrationResponse;
+
+/**
+ * Base class for responses from the ResourceManager to a registration attempt by a JobMaster.
+ */
+public class JobMasterRegistrationSuccess extends RegistrationResponse.Success {
+
+ private static final long serialVersionUID = 5577641250204140415L;
+
+ private final long heartbeatInterval;
+
+ public JobMasterRegistrationSuccess(long heartbeatInterval) {
+ this.heartbeatInterval = heartbeatInterval;
+ }
+
+ /**
+ * Gets the interval in which the ResourceManager will heartbeat the JobMaster.
+ *
+ * @return the interval in which the ResourceManager will heartbeat the JobMaster
+ */
+ public long getHeartbeatInterval() {
+ return heartbeatInterval;
+ }
+
+ @Override
+ public String toString() {
+ return "JobMasterRegistrationSuccess(" + heartbeatInterval + ')';
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/efc7de5b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java
index 439e56b..7b8ec70 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java
@@ -18,23 +18,56 @@
package org.apache.flink.runtime.resourcemanager;
+<<<<<<< HEAD
import org.apache.flink.api.common.JobID;
+=======
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+>>>>>>> db98efb... rsourceManager registration with JobManager
import java.io.Serializable;
+import java.util.UUID;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class is responsible for group the JobMasterGateway and the LeaderSessionID of a registered job master
+ */
public class JobMasterRegistration implements Serializable {
- private static final long serialVersionUID = 8411214999193765202L;
+<<<<<<< HEAD
private final String address;
private final JobID jobID;
public JobMasterRegistration(String address, JobID jobID) {
this.address = address;
this.jobID = jobID;
+=======
+ private static final long serialVersionUID = -2316627821716999527L;
+
+ private final JobMasterGateway jobMasterGateway;
+
+ private UUID jobMasterLeaderSessionID;
+
+ public JobMasterRegistration(JobMasterGateway jobMasterGateway) {
+ this.jobMasterGateway = checkNotNull(jobMasterGateway);
+ }
+
+ public JobMasterRegistration(JobMasterGateway jobMasterGateway, UUID jobMasterLeaderSessionID) {
+ this.jobMasterGateway = checkNotNull(jobMasterGateway);
+ this.jobMasterLeaderSessionID = jobMasterLeaderSessionID;
+ }
+
+ public JobMasterGateway getJobMasterGateway() {
+ return jobMasterGateway;
+ }
+
+ public void setJobMasterLeaderSessionID(UUID leaderSessionID) {
+ this.jobMasterLeaderSessionID = jobMasterLeaderSessionID;
+>>>>>>> db98efb... rsourceManager registration with JobManager
}
- public String getAddress() {
- return address;
+ public UUID getJobMasterLeaderSessionID() {
+ return jobMasterLeaderSessionID;
}
public JobID getJobID() {
http://git-wip-us.apache.org/repos/asf/flink/blob/efc7de5b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 5370710..8be1455 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.resourcemanager;
+import akka.dispatch.Futures;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -25,15 +26,22 @@ import org.apache.flink.runtime.concurrent.ApplyFunction;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
import org.apache.flink.runtime.leaderelection.LeaderContender;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+<<<<<<< HEAD
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+=======
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+>>>>>>> db98efb... rsourceManager registration with JobManager
import org.apache.flink.runtime.rpc.RpcMethod;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.jobmaster.JobMaster;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.rpc.exceptions.LeaderSessionIDException;
import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
+import org.apache.flink.runtime.registration.RegistrationResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,15 +58,21 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*
* It offers the following methods as part of its rpc interface to interact with the him remotely:
* <ul>
- * <li>{@link #registerJobMaster(JobMasterRegistration)} registers a {@link JobMaster} at the resource manager</li>
+ * <li>{@link #registerJobMaster(UUID, String, JobID)} registers a {@link JobMaster} at the resource manager</li>
* <li>{@link #requestSlot(SlotRequest)} requests a slot from the resource manager</li>
* </ul>
*/
+<<<<<<< HEAD
public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> implements LeaderContender {
private final Logger LOG = LoggerFactory.getLogger(getClass());
private final Map<JobID, JobMasterGateway> jobMasterGateways;
+=======
+public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
+ /** the mapping relationship of JobID and JobMasterGateway */
+ private final Map<JobID, JobMasterRegistration> jobMasters;
+>>>>>>> db98efb... rsourceManager registration with JobManager
private final HighAvailabilityServices highAvailabilityServices;
@@ -74,8 +88,12 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> impleme
SlotManager slotManager) {
super(rpcService);
this.highAvailabilityServices = checkNotNull(highAvailabilityServices);
+<<<<<<< HEAD
this.jobMasterGateways = new HashMap<>();
this.slotManager = slotManager;
+=======
+ this.jobMasters = new HashMap<>(16);
+>>>>>>> db98efb... rsourceManager registration with JobManager
}
@Override
@@ -95,8 +113,11 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> impleme
public void shutDown() {
try {
leaderElectionService.stop();
+ for(JobID jobID : jobMasters.keySet()) {
+ highAvailabilityServices.getJobMasterLeaderRetriever(jobID).stop();
+ }
super.shutDown();
- } catch(Throwable e) {
+ } catch (Throwable e) {
log.error("A fatal error happened when shutdown the ResourceManager", e);
throw new RuntimeException("A fatal error happened when shutdown the ResourceManager", e);
}
@@ -115,24 +136,58 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> impleme
/**
* Register a {@link JobMaster} at the resource manager.
*
- * @param jobMasterRegistration Job master registration information
+ * @param resourceManagerLeaderId The fencing token for the ResourceManager leader
+ * @param jobMasterAddress The address of the JobMaster that registers
+ * @param jobID The Job ID of the JobMaster that registers
* @return Future registration response
*/
@RpcMethod
+<<<<<<< HEAD
public Future<RegistrationResponse> registerJobMaster(JobMasterRegistration jobMasterRegistration) {
final Future<JobMasterGateway> jobMasterFuture =
getRpcService().connect(jobMasterRegistration.getAddress(), JobMasterGateway.class);
final JobID jobID = jobMasterRegistration.getJobID();
+=======
+ public Future<RegistrationResponse> registerJobMaster(UUID resourceManagerLeaderId, final String jobMasterAddress, final JobID jobID) {
+
+ if(!leaderSessionID.equals(resourceManagerLeaderId)) {
+ log.warn("Discard registration from JobMaster {} at ({}) because the expected leader session ID {} did not equal the received leader session ID {}",
+ jobID, jobMasterAddress, leaderSessionID, resourceManagerLeaderId);
+ return Futures.failed(new LeaderSessionIDException(leaderSessionID, resourceManagerLeaderId));
+ }
+
+ Future<JobMasterGateway> jobMasterFuture = getRpcService().connect(jobMasterAddress, JobMasterGateway.class);
+>>>>>>> db98efb... rsourceManager registration with JobManager
return jobMasterFuture.thenApplyAsync(new ApplyFunction<JobMasterGateway, RegistrationResponse>() {
@Override
public RegistrationResponse apply(JobMasterGateway jobMasterGateway) {
+<<<<<<< HEAD
final JobMasterGateway existingGateway = jobMasterGateways.put(jobID, jobMasterGateway);
if (existingGateway != null) {
LOG.info("Replacing existing gateway {} for JobID {} with {}.",
existingGateway, jobID, jobMasterGateway);
}
return new RegistrationResponse(true);
+=======
+ if (jobMasters.containsKey(jobID)) {
+ JobMasterRegistration jobMasterRegistration = new JobMasterRegistration(jobMasterGateway, jobMasters.get(jobID).getJobMasterLeaderSessionID());
+ jobMasters.put(jobID, jobMasterRegistration);
+ log.info("Replacing gateway for registered JobID {}.", jobID);
+ } else {
+ JobMasterRegistration jobMasterRegistration = new JobMasterRegistration(jobMasterGateway);
+ jobMasters.put(jobID, jobMasterRegistration);
+ try {
+ highAvailabilityServices.getJobMasterLeaderRetriever(jobID).start(new JobMasterLeaderListener(jobID));
+ } catch(Throwable e) {
+ log.warn("Decline registration from JobMaster {} at ({}) because fail to get the leader retriever for the given job JobMaster",
+ jobID, jobMasterAddress);
+ return new RegistrationResponse.Decline("Fail to get the leader retriever for the given JobMaster");
+ }
+ }
+
+ return new JobMasterRegistrationSuccess(5000);
+>>>>>>> db98efb... rsourceManager registration with JobManager
}
}, getMainThreadExecutor());
}
@@ -158,26 +213,41 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> impleme
/**
- *
- * @param resourceManagerLeaderId The fencing token for the ResourceManager leader
- * @param taskExecutorAddress The address of the TaskExecutor that registers
- * @param resourceID The resource ID of the TaskExecutor that registers
- *
+ * @param resourceManagerLeaderId The fencing token for the ResourceManager leader
+ * @param taskExecutorAddress The address of the TaskExecutor that registers
+ * @param resourceID The resource ID of the TaskExecutor that registers
* @return The response by the ResourceManager.
*/
@RpcMethod
- public org.apache.flink.runtime.registration.RegistrationResponse registerTaskExecutor(
- UUID resourceManagerLeaderId,
- String taskExecutorAddress,
- ResourceID resourceID) {
+ public RegistrationResponse registerTaskExecutor(
+ UUID resourceManagerLeaderId,
+ String taskExecutorAddress,
+ ResourceID resourceID) {
return new TaskExecutorRegistrationSuccess(new InstanceID(), 5000);
}
+<<<<<<< HEAD
// ------------------------------------------------------------------------
// Leader Contender
// ------------------------------------------------------------------------
+=======
+ /**
+ * Callback method when current resourceManager lose leadership.
+ */
+ @Override
+ public void revokeLeadership() {
+ runAsync(new Runnable() {
+ @Override
+ public void run() {
+ log.info("ResourceManager {} was revoked leadership.", getAddress());
+ jobMasters.clear();
+ leaderSessionID = null;
+ }
+ });
+ }
+>>>>>>> db98efb... rsourceManager registration with JobManager
/**
* Callback method when current resourceManager is granted leadership
@@ -232,4 +302,35 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> impleme
}
});
}
+
+ private class JobMasterLeaderListener implements LeaderRetrievalListener {
+ private final JobID jobID;
+
+ private JobMasterLeaderListener(JobID jobID) {
+ this.jobID = jobID;
+ }
+
+ @Override
+ public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
+ runAsync(new Runnable() {
+ @Override
+ public void run() {
+ log.info("A new leader for JobMaster {} is elected, address is {}, leaderSessionID is {}", jobID, leaderAddress, leaderSessionID);
+ // update job master leader session id
+ JobMasterRegistration jobMasterRegistration = jobMasters.get(jobID);
+ jobMasterRegistration.setJobMasterLeaderSessionID(leaderSessionID);
+ }
+ });
+ }
+
+ @Override
+ public void handleError(final Exception exception) {
+ runAsync(new Runnable() {
+ @Override
+ public void run() {
+ log.error("JobMasterLeaderListener received an error from the LeaderRetrievalService", exception);
+ }
+ });
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/efc7de5b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index 5c8786c..1ee11a1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -18,12 +18,13 @@
package org.apache.flink.runtime.resourcemanager;
-import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcTimeout;
import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.registration.RegistrationResponse;
import java.util.UUID;
@@ -35,21 +36,18 @@ public interface ResourceManagerGateway extends RpcGateway {
/**
* Register a {@link JobMaster} at the resource manager.
*
- * @param jobMasterRegistration Job master registration information
- * @param timeout Timeout for the future to complete
+ * @param resourceManagerLeaderId The fencing token for the ResourceManager leader
+ * @param jobMasterAddress The address of the JobMaster that registers
+ * @param jobID The Job ID of the JobMaster that registers
+ * @param timeout Timeout for the future to complete
* @return Future registration response
*/
Future<RegistrationResponse> registerJobMaster(
- JobMasterRegistration jobMasterRegistration,
- @RpcTimeout Time timeout);
+ UUID resourceManagerLeaderId,
+ String jobMasterAddress,
+ JobID jobID,
+ @RpcTimeout Time timeout);
- /**
- * Register a {@link JobMaster} at the resource manager.
- *
- * @param jobMasterRegistration Job master registration information
- * @return Future registration response
- */
- Future<RegistrationResponse> registerJobMaster(JobMasterRegistration jobMasterRegistration);
/**
* Requests a slot from the resource manager.
@@ -60,15 +58,13 @@ public interface ResourceManagerGateway extends RpcGateway {
Future<SlotRequestReply> requestSlot(SlotRequest slotRequest);
/**
- *
- * @param resourceManagerLeaderId The fencing token for the ResourceManager leader
- * @param taskExecutorAddress The address of the TaskExecutor that registers
- * @param resourceID The resource ID of the TaskExecutor that registers
- * @param timeout The timeout for the response.
- *
+ * @param resourceManagerLeaderId The fencing token for the ResourceManager leader
+ * @param taskExecutorAddress The address of the TaskExecutor that registers
+ * @param resourceID The resource ID of the TaskExecutor that registers
+ * @param timeout The timeout for the response.
* @return The future to the response by the ResourceManager.
*/
- Future<org.apache.flink.runtime.registration.RegistrationResponse> registerTaskExecutor(
+ Future<RegistrationResponse> registerTaskExecutor(
UUID resourceManagerLeaderId,
String taskExecutorAddress,
ResourceID resourceID,
http://git-wip-us.apache.org/repos/asf/flink/blob/efc7de5b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/LeaderSessionIDException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/LeaderSessionIDException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/LeaderSessionIDException.java
new file mode 100644
index 0000000..cd14a0d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/LeaderSessionIDException.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.exceptions;
+
+import java.util.UUID;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An exception specifying that the received leader session ID is not the same as expected.
+ */
+public class LeaderSessionIDException extends Exception {
+
+ private static final long serialVersionUID = -3276145308053264636L;
+
+ /** expected leader session id */
+ private final UUID expectedLeaderSessionID;
+
+ /** actual leader session id */
+ private final UUID actualLeaderSessionID;
+
+ public LeaderSessionIDException(UUID expectedLeaderSessionID, UUID actualLeaderSessionID) {
+ super("Unmatched leader session ID : expected " + expectedLeaderSessionID + ", actual " + actualLeaderSessionID);
+ this.expectedLeaderSessionID = checkNotNull(expectedLeaderSessionID);
+ this.actualLeaderSessionID = checkNotNull(actualLeaderSessionID);
+ }
+
+ /**
+ * Get expected leader session id
+ *
+ * @return expect leader session id
+ */
+ public UUID getExpectedLeaderSessionID() {
+ return expectedLeaderSessionID;
+ }
+
+ /**
+ * Get actual leader session id
+ *
+ * @return actual leader session id
+ */
+ public UUID getActualLeaderSessionID() {
+ return actualLeaderSessionID;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/efc7de5b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index d84a6a9..cf709c8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -327,6 +327,11 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
}
@Override
+ public LeaderRetrievalService getJobMasterLeaderRetriever(JobID jobID) throws Exception {
+ return null;
+ }
+
+ @Override
public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception {
return null;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/efc7de5b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
index 3162f40..2ac43be 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.highavailability;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.hadoop.shaded.org.jboss.netty.util.internal.ConcurrentHashMap;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -30,6 +31,8 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
private volatile LeaderRetrievalService resourceManagerLeaderRetriever;
+ private ConcurrentHashMap<JobID, LeaderRetrievalService> jobMasterLeaderRetrievers = new ConcurrentHashMap<>();
+
private volatile LeaderElectionService jobMasterLeaderElectionService;
private volatile LeaderElectionService resourceManagerLeaderElectionService;
@@ -43,6 +46,10 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
this.resourceManagerLeaderRetriever = resourceManagerLeaderRetriever;
}
+ public void setJobMasterLeaderRetriever(JobID jobID, LeaderRetrievalService jobMasterLeaderRetriever) {
+ this.jobMasterLeaderRetrievers.put(jobID, jobMasterLeaderRetriever);
+ }
+
public void setJobMasterLeaderElectionService(LeaderElectionService leaderElectionService) {
this.jobMasterLeaderElectionService = leaderElectionService;
}
@@ -66,6 +73,16 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
}
@Override
+ public LeaderRetrievalService getJobMasterLeaderRetriever(JobID jobID) throws Exception {
+ LeaderRetrievalService service = this.jobMasterLeaderRetrievers.get(jobID);
+ if (service != null) {
+ return service;
+ } else {
+ throw new IllegalStateException("JobMasterLeaderRetriever has not been set");
+ }
+ }
+
+ @Override
public LeaderElectionService getJobMasterLeaderElectionService(JobID jobID) throws Exception {
LeaderElectionService service = jobMasterLeaderElectionService;
http://git-wip-us.apache.org/repos/asf/flink/blob/efc7de5b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
new file mode 100644
index 0000000..4d04001
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.rpc.TestingSerialRpcService;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.rpc.exceptions.LeaderSessionIDException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.*;
+
+public class ResourceManagerTest {
+
+ private TestingSerialRpcService rpcService;
+
+ @Before
+ public void setup() throws Exception {
+ rpcService = new TestingSerialRpcService();
+ }
+
+ @After
+ public void teardown() throws Exception {
+ rpcService.stopService();
+ }
+
+ /**
+ * Test receive normal registration from job master and receive duplicate registration from job master
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testRegisterJobMaster() throws Exception {
+ String jobMasterAddress = "/jobMasterAddress1";
+ JobID jobID = mockJobMaster(jobMasterAddress);
+ TestingLeaderElectionService resourceManagerLeaderElectionService = new TestingLeaderElectionService();
+ TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService();
+ final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService);
+ final UUID leaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
+
+ // test response successful
+ Future<RegistrationResponse> successfulFuture = resourceManager.registerJobMaster(leaderSessionId, jobMasterAddress, jobID);
+ RegistrationResponse response = Await.result(successfulFuture, new FiniteDuration(0, TimeUnit.SECONDS));
+ assertTrue(response instanceof JobMasterRegistrationSuccess);
+ }
+
+ /**
+ * Test receive registration with unmatched leadershipId from job master
+ *
+ * @throws Exception
+ */
+ @Test(expected = LeaderSessionIDException.class)
+ public void testRegisterJobMasterWithUnmatchedLeaderSessionId() throws Exception {
+ String jobMasterAddress = "/jobMasterAddress1";
+ JobID jobID = mockJobMaster(jobMasterAddress);
+ TestingLeaderElectionService resourceManagerLeaderElectionService = new TestingLeaderElectionService();
+ TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService();
+ final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService);
+ final UUID leaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
+
+ // test throw exception when receive a registration from job master which takes unmatched leaderSessionId
+ UUID differentLeaderSessionID = UUID.randomUUID();
+ Future<RegistrationResponse> unMatchedLeaderFuture = resourceManager.registerJobMaster(differentLeaderSessionID, jobMasterAddress, jobID);
+ Await.result(unMatchedLeaderFuture, new FiniteDuration(200, TimeUnit.MILLISECONDS));
+ }
+
+ /**
+ * Test receive registration with invalid address from job master
+ *
+ * @throws Exception
+ */
+ @Test(expected = Exception.class)
+ public void testRegisterJobMasterFromInvalidAddress() throws Exception {
+ String jobMasterAddress = "/jobMasterAddress1";
+ JobID jobID = mockJobMaster(jobMasterAddress);
+ TestingLeaderElectionService resourceManagerLeaderElectionService = new TestingLeaderElectionService();
+ TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService();
+ final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService);
+ final UUID leaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
+
+ // test throw exception when receive a registration from job master which takes invalid address
+ String invalidAddress = "/jobMasterAddress2";
+ Future<RegistrationResponse> invalidAddressFuture = resourceManager.registerJobMaster(leaderSessionId, invalidAddress, jobID);
+ Await.result(invalidAddressFuture, new FiniteDuration(200, TimeUnit.MILLISECONDS));
+ }
+
+ /**
+ * Check and verify return RegistrationResponse.Decline when failed to start a job master Leader retrieval listener
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testRegisterJobMasterWithFailureLeaderListener() throws Exception {
+ String jobMasterAddress = "/jobMasterAddress1";
+ JobID jobID = mockJobMaster(jobMasterAddress);
+ TestingLeaderElectionService resourceManagerLeaderElectionService = new TestingLeaderElectionService();
+ TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService();
+ final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService);
+ final UUID leaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
+
+ JobID unknownJobIDToHAServices = new JobID();
+ // verify return RegistrationResponse.Decline when failed to start a job master Leader retrieval listener
+ Future<RegistrationResponse> declineFuture = resourceManager.registerJobMaster(leaderSessionId, jobMasterAddress, unknownJobIDToHAServices);
+ RegistrationResponse response = Await.result(declineFuture, new FiniteDuration(0, TimeUnit.SECONDS));
+ assertTrue(response instanceof RegistrationResponse.Decline);
+ }
+
+ private JobID mockJobMaster(String jobMasterAddress) {
+ JobID jobID = new JobID();
+ JobMasterGateway jobMasterGateway = mock(JobMasterGateway.class);
+ rpcService.registerGateway(jobMasterAddress, jobMasterGateway);
+ return jobID;
+ }
+
+ private ResourceManager createAndStartResourceManager(TestingLeaderElectionService resourceManagerLeaderElectionService, JobID jobID, TestingLeaderRetrievalService jobMasterLeaderRetrievalService) {
+ TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
+ highAvailabilityServices.setResourceManagerLeaderElectionService(resourceManagerLeaderElectionService);
+ highAvailabilityServices.setJobMasterLeaderRetriever(jobID, jobMasterLeaderRetrievalService);
+ ResourceManager resourceManager = new ResourceManager(rpcService, highAvailabilityServices);
+ resourceManager.start();
+ return resourceManager;
+ }
+
+ private UUID grantResourceManagerLeadership(TestingLeaderElectionService resourceManagerLeaderElectionService) {
+ UUID leaderSessionId = UUID.randomUUID();
+ resourceManagerLeaderElectionService.isLeader(leaderSessionId);
+ return leaderSessionId;
+ }
+
+}