You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/10/06 11:48:31 UTC
[11/50] [abbrv] flink git commit: [FLINK-4516] leader election of
resourcemanager
[FLINK-4516] leader election of resourcemanager
- add serial rpc service
- add a special rpcService implementation which directly executes the asynchronous calls serially one by one, it is just for testcase
- Change ResourceManagerLeaderContender code and TestingSerialRpcService code
- override shutdown logic to stop leadershipService
- use a mocked RpcService rather than TestingSerialRpcService for resourceManager HA test
This closes #2427
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/36e4bf90
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/36e4bf90
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/36e4bf90
Branch: refs/heads/flip-6
Commit: 36e4bf90c80e5ced109da014915bdd7d8c8d0631
Parents: 61050992
Author: beyond1920 <be...@126.com>
Authored: Sat Aug 27 14:14:28 2016 +0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Oct 6 13:38:40 2016 +0200
----------------------------------------------------------------------
.../HighAvailabilityServices.java | 7 +
.../runtime/highavailability/NonHaServices.java | 5 +
.../rpc/resourcemanager/ResourceManager.java | 111 +++++-
.../TestingHighAvailabilityServices.java | 19 +-
.../runtime/rpc/TestingSerialRpcService.java | 369 +++++++++++++++++++
.../resourcemanager/ResourceManagerHATest.java | 76 ++++
6 files changed, 578 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/36e4bf90/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
index 73e4f1f..298147c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
@@ -40,6 +40,13 @@ public interface HighAvailabilityServices {
LeaderRetrievalService getResourceManagerLeaderRetriever() throws Exception;
/**
+ * Gets the leader election service for the cluster's resource manager.
+ * @return
+ * @throws Exception
+ */
+ LeaderElectionService getResourceManagerLeaderElectionService() throws Exception;
+
+ /**
* Gets the leader election service for the given job.
*
* @param jobID The identifier of the job running the election.
http://git-wip-us.apache.org/repos/asf/flink/blob/36e4bf90/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
index 3d2769b..292a404 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
@@ -61,6 +61,11 @@ public class NonHaServices implements HighAvailabilityServices {
}
@Override
+ public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception {
+ return new StandaloneLeaderElectionService();
+ }
+
+ @Override
public LeaderElectionService getJobMasterLeaderElectionService(JobID jobID) throws Exception {
return new StandaloneLeaderElectionService();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/36e4bf90/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
index 6f34465..f7147c9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
@@ -20,24 +20,26 @@ package org.apache.flink.runtime.rpc.resourcemanager;
import akka.dispatch.Mapper;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.rpc.RpcMethod;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.jobmaster.JobMaster;
import org.apache.flink.runtime.rpc.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutorRegistrationSuccess;
-import org.apache.flink.util.Preconditions;
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.ExecutionContext$;
import scala.concurrent.Future;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
-import java.util.concurrent.ExecutorService;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* ResourceManager implementation. The resource manager is responsible for resource de-/allocation
@@ -50,16 +52,51 @@ import java.util.concurrent.ExecutorService;
* </ul>
*/
public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
- private final ExecutionContext executionContext;
private final Map<JobMasterGateway, InstanceID> jobMasterGateways;
+ private final HighAvailabilityServices highAvailabilityServices;
+ private LeaderElectionService leaderElectionService = null;
+ private UUID leaderSessionID = null;
- public ResourceManager(RpcService rpcService, ExecutorService executorService) {
+ public ResourceManager(RpcService rpcService, HighAvailabilityServices highAvailabilityServices) {
super(rpcService);
- this.executionContext = ExecutionContext$.MODULE$.fromExecutor(
- Preconditions.checkNotNull(executorService));
+ this.highAvailabilityServices = checkNotNull(highAvailabilityServices);
this.jobMasterGateways = new HashMap<>();
}
+ @Override
+ public void start() {
+ // start a leader
+ try {
+ super.start();
+ leaderElectionService = highAvailabilityServices.getResourceManagerLeaderElectionService();
+ leaderElectionService.start(new ResourceManagerLeaderContender());
+ } catch (Throwable e) {
+ log.error("A fatal error happened when starting the ResourceManager", e);
+ throw new RuntimeException("A fatal error happened when starting the ResourceManager", e);
+ }
+ }
+
+ @Override
+ public void shutDown() {
+ try {
+ leaderElectionService.stop();
+ super.shutDown();
+ } catch(Throwable e) {
+ log.error("A fatal error happened when shutdown the ResourceManager", e);
+ throw new RuntimeException("A fatal error happened when shutdown the ResourceManager", e);
+ }
+ }
+
+ /**
+ * Gets the leader session id of current resourceManager.
+ *
+ * @return return the leaderSessionId of current resourceManager, this returns null until the current resourceManager is granted leadership.
+ */
+ @VisibleForTesting
+ UUID getLeaderSessionID() {
+ return leaderSessionID;
+ }
+
/**
* Register a {@link JobMaster} at the resource manager.
*
@@ -116,4 +153,62 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
return new TaskExecutorRegistrationSuccess(new InstanceID(), 5000);
}
+
+ private class ResourceManagerLeaderContender implements LeaderContender {
+
+ /**
+ * Callback method when current resourceManager is granted leadership
+ *
+ * @param leaderSessionID unique leadershipID
+ */
+ @Override
+ public void grantLeadership(final UUID leaderSessionID) {
+ runAsync(new Runnable() {
+ @Override
+ public void run() {
+ log.info("ResourceManager {} was granted leadership with leader session ID {}", getAddress(), leaderSessionID);
+ ResourceManager.this.leaderSessionID = leaderSessionID;
+ // confirming the leader session ID might be blocking,
+ leaderElectionService.confirmLeaderSessionID(leaderSessionID);
+ }
+ });
+ }
+
+ /**
+ * Callback method when current resourceManager lose leadership.
+ */
+ @Override
+ public void revokeLeadership() {
+ runAsync(new Runnable() {
+ @Override
+ public void run() {
+ log.info("ResourceManager {} was revoked leadership.", getAddress());
+ jobMasterGateways.clear();
+ leaderSessionID = null;
+ }
+ });
+ }
+
+ @Override
+ public String getAddress() {
+ return ResourceManager.this.getAddress();
+ }
+
+ /**
+ * Handles error occurring in the leader election service
+ *
+ * @param exception Exception being thrown in the leader election service
+ */
+ @Override
+ public void handleError(final Exception exception) {
+ runAsync(new Runnable() {
+ @Override
+ public void run() {
+ log.error("ResourceManager received an error from the LeaderElectionService.", exception);
+ // terminate ResourceManager in case of an error
+ shutDown();
+ }
+ });
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/36e4bf90/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
index 4d654a3..3162f40 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
@@ -32,6 +32,8 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
private volatile LeaderElectionService jobMasterLeaderElectionService;
+ private volatile LeaderElectionService resourceManagerLeaderElectionService;
+
// ------------------------------------------------------------------------
// Setters for mock / testing implementations
@@ -44,7 +46,11 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
public void setJobMasterLeaderElectionService(LeaderElectionService leaderElectionService) {
this.jobMasterLeaderElectionService = leaderElectionService;
}
-
+
+ public void setResourceManagerLeaderElectionService(LeaderElectionService leaderElectionService) {
+ this.resourceManagerLeaderElectionService = leaderElectionService;
+ }
+
// ------------------------------------------------------------------------
// HA Services Methods
// ------------------------------------------------------------------------
@@ -69,4 +75,15 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
throw new IllegalStateException("JobMasterLeaderElectionService has not been set");
}
}
+
+ @Override
+ public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception {
+ LeaderElectionService service = resourceManagerLeaderElectionService;
+
+ if (service != null) {
+ return service;
+ } else {
+ throw new IllegalStateException("ResourceManagerLeaderElectionService has not been set");
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/36e4bf90/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
new file mode 100644
index 0000000..7bdbb99
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import akka.dispatch.ExecutionContexts;
+import akka.dispatch.Futures;
+import akka.util.Timeout;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.util.DirectExecutorService;
+import org.apache.flink.util.Preconditions;
+import scala.concurrent.Await;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.lang.annotation.Annotation;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.BitSet;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+
+/**
+ * An RPC Service implementation for testing. This RPC service directly executes all asynchronous calls one by one in the main thread.
+ */
+public class TestingSerialRpcService implements RpcService {
+
+ private final DirectExecutorService executorService;
+ private final ConcurrentHashMap<String, RpcGateway> registeredConnections;
+
+ public TestingSerialRpcService() {
+ executorService = new DirectExecutorService();
+ this.registeredConnections = new ConcurrentHashMap<>();
+ }
+
+ @Override
+ public void scheduleRunnable(final Runnable runnable, final long delay, final TimeUnit unit) {
+ try {
+ unit.sleep(delay);
+ runnable.run();
+ } catch (Throwable e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public ExecutionContext getExecutionContext() {
+ return ExecutionContexts.fromExecutorService(executorService);
+ }
+
+ @Override
+ public void stopService() {
+ executorService.shutdown();
+ registeredConnections.clear();
+ }
+
+ @Override
+ public void stopServer(RpcGateway selfGateway) {
+
+ }
+
+ @Override
+ public <C extends RpcGateway, S extends RpcEndpoint<C>> C startServer(S rpcEndpoint) {
+ final String address = UUID.randomUUID().toString();
+
+ InvocationHandler akkaInvocationHandler = new TestingSerialInvocationHandler(address, rpcEndpoint);
+ ClassLoader classLoader = getClass().getClassLoader();
+
+ @SuppressWarnings("unchecked")
+ C self = (C) Proxy.newProxyInstance(
+ classLoader,
+ new Class<?>[]{
+ rpcEndpoint.getSelfGatewayType(),
+ MainThreadExecutor.class,
+ StartStoppable.class,
+ RpcGateway.class
+ },
+ akkaInvocationHandler);
+
+ return self;
+ }
+
+ @Override
+ public <C extends RpcGateway> Future<C> connect(String address, Class<C> clazz) {
+ RpcGateway gateway = registeredConnections.get(address);
+
+ if (gateway != null) {
+ if (clazz.isAssignableFrom(gateway.getClass())) {
+ @SuppressWarnings("unchecked")
+ C typedGateway = (C) gateway;
+ return Futures.successful(typedGateway);
+ } else {
+ return Futures.failed(
+ new Exception("Gateway registered under " + address + " is not of type " + clazz));
+ }
+ } else {
+ return Futures.failed(new Exception("No gateway registered under that name"));
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // connections
+ // ------------------------------------------------------------------------
+
+ public void registerGateway(String address, RpcGateway gateway) {
+ checkNotNull(address);
+ checkNotNull(gateway);
+
+ if (registeredConnections.putIfAbsent(address, gateway) != null) {
+ throw new IllegalStateException("a gateway is already registered under " + address);
+ }
+ }
+
+ private static class TestingSerialInvocationHandler<C extends RpcGateway, T extends RpcEndpoint<C>> implements InvocationHandler, RpcGateway, MainThreadExecutor, StartStoppable {
+
+ private final T rpcEndpoint;
+
+ /** default timeout for asks */
+ private final Timeout timeout;
+
+ private final String address;
+
+ private TestingSerialInvocationHandler(String address, T rpcEndpoint) {
+ this(address, rpcEndpoint, new Timeout(new FiniteDuration(10, TimeUnit.SECONDS)));
+ }
+
+ private TestingSerialInvocationHandler(String address, T rpcEndpoint, Timeout timeout) {
+ this.rpcEndpoint = rpcEndpoint;
+ this.timeout = timeout;
+ this.address = address;
+ }
+
+ @Override
+ public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+ Class<?> declaringClass = method.getDeclaringClass();
+ if (declaringClass.equals(MainThreadExecutor.class) ||
+ declaringClass.equals(Object.class) || declaringClass.equals(StartStoppable.class) ||
+ declaringClass.equals(RpcGateway.class)) {
+ return method.invoke(this, args);
+ } else {
+ final String methodName = method.getName();
+ Class<?>[] parameterTypes = method.getParameterTypes();
+ Annotation[][] parameterAnnotations = method.getParameterAnnotations();
+ Timeout futureTimeout = extractRpcTimeout(parameterAnnotations, args, timeout);
+
+ final Tuple2<Class<?>[], Object[]> filteredArguments = filterArguments(
+ parameterTypes,
+ parameterAnnotations,
+ args);
+
+ Class<?> returnType = method.getReturnType();
+
+ if (returnType.equals(Future.class)) {
+ try {
+ Object result = handleRpcInvocationSync(methodName, filteredArguments.f0, filteredArguments.f1, futureTimeout);
+ return Futures.successful(result);
+ } catch (Throwable e) {
+ return Futures.failed(e);
+ }
+ } else {
+ return handleRpcInvocationSync(methodName, filteredArguments.f0, filteredArguments.f1, futureTimeout);
+ }
+ }
+ }
+
+ /**
+ * Handle rpc invocations by looking up the rpc method on the rpc endpoint and calling this
+ * method with the provided method arguments. If the method has a return value, it is returned
+ * to the sender of the call.
+ */
+ private Object handleRpcInvocationSync(final String methodName,
+ final Class<?>[] parameterTypes,
+ final Object[] args,
+ final Timeout futureTimeout) throws Exception {
+ final Method rpcMethod = lookupRpcMethod(methodName, parameterTypes);
+ Object result = rpcMethod.invoke(rpcEndpoint, args);
+
+ if (result != null && result instanceof Future) {
+ Future<?> future = (Future<?>) result;
+ return Await.result(future, futureTimeout.duration());
+ } else {
+ return result;
+ }
+ }
+
+ @Override
+ public void runAsync(Runnable runnable) {
+ runnable.run();
+ }
+
+ @Override
+ public <V> Future<V> callAsync(Callable<V> callable, Timeout callTimeout) {
+ try {
+ return Futures.successful(callable.call());
+ } catch (Throwable e) {
+ return Futures.failed(e);
+ }
+ }
+
+ @Override
+ public void scheduleRunAsync(final Runnable runnable, final long delay) {
+ try {
+ TimeUnit.MILLISECONDS.sleep(delay);
+ runnable.run();
+ } catch (Throwable e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public String getAddress() {
+ return address;
+ }
+
+ @Override
+ public void start() {
+ // do nothing
+ }
+
+ @Override
+ public void stop() {
+ // do nothing
+ }
+
+ /**
+ * Look up the rpc method on the given {@link RpcEndpoint} instance.
+ *
+ * @param methodName Name of the method
+ * @param parameterTypes Parameter types of the method
+ * @return Method of the rpc endpoint
+ * @throws NoSuchMethodException Thrown if the method with the given name and parameter types
+ * cannot be found at the rpc endpoint
+ */
+ private Method lookupRpcMethod(final String methodName,
+ final Class<?>[] parameterTypes) throws NoSuchMethodException {
+ return rpcEndpoint.getClass().getMethod(methodName, parameterTypes);
+ }
+
+ // ------------------------------------------------------------------------
+ // Helper methods
+ // ------------------------------------------------------------------------
+
+ /**
+ * Extracts the {@link RpcTimeout} annotated rpc timeout value from the list of given method
+ * arguments. If no {@link RpcTimeout} annotated parameter could be found, then the default
+ * timeout is returned.
+ *
+ * @param parameterAnnotations Parameter annotations
+ * @param args Array of arguments
+ * @param defaultTimeout Default timeout to return if no {@link RpcTimeout} annotated parameter
+ * has been found
+ * @return Timeout extracted from the array of arguments or the default timeout
+ */
+ private static Timeout extractRpcTimeout(Annotation[][] parameterAnnotations, Object[] args,
+ Timeout defaultTimeout) {
+ if (args != null) {
+ Preconditions.checkArgument(parameterAnnotations.length == args.length);
+
+ for (int i = 0; i < parameterAnnotations.length; i++) {
+ if (isRpcTimeout(parameterAnnotations[i])) {
+ if (args[i] instanceof FiniteDuration) {
+ return new Timeout((FiniteDuration) args[i]);
+ } else {
+ throw new RuntimeException("The rpc timeout parameter must be of type " +
+ FiniteDuration.class.getName() + ". The type " + args[i].getClass().getName() +
+ " is not supported.");
+ }
+ }
+ }
+ }
+
+ return defaultTimeout;
+ }
+
+ /**
+ * Removes all {@link RpcTimeout} annotated parameters from the parameter type and argument
+ * list.
+ *
+ * @param parameterTypes Array of parameter types
+ * @param parameterAnnotations Array of parameter annotations
+ * @param args Arary of arguments
+ * @return Tuple of filtered parameter types and arguments which no longer contain the
+ * {@link RpcTimeout} annotated parameter types and arguments
+ */
+ private static Tuple2<Class<?>[], Object[]> filterArguments(
+ Class<?>[] parameterTypes,
+ Annotation[][] parameterAnnotations,
+ Object[] args) {
+
+ Class<?>[] filteredParameterTypes;
+ Object[] filteredArgs;
+
+ if (args == null) {
+ filteredParameterTypes = parameterTypes;
+ filteredArgs = null;
+ } else {
+ Preconditions.checkArgument(parameterTypes.length == parameterAnnotations.length);
+ Preconditions.checkArgument(parameterAnnotations.length == args.length);
+
+ BitSet isRpcTimeoutParameter = new BitSet(parameterTypes.length);
+ int numberRpcParameters = parameterTypes.length;
+
+ for (int i = 0; i < parameterTypes.length; i++) {
+ if (isRpcTimeout(parameterAnnotations[i])) {
+ isRpcTimeoutParameter.set(i);
+ numberRpcParameters--;
+ }
+ }
+
+ if (numberRpcParameters == parameterTypes.length) {
+ filteredParameterTypes = parameterTypes;
+ filteredArgs = args;
+ } else {
+ filteredParameterTypes = new Class<?>[numberRpcParameters];
+ filteredArgs = new Object[numberRpcParameters];
+ int counter = 0;
+
+ for (int i = 0; i < parameterTypes.length; i++) {
+ if (!isRpcTimeoutParameter.get(i)) {
+ filteredParameterTypes[counter] = parameterTypes[i];
+ filteredArgs[counter] = args[i];
+ counter++;
+ }
+ }
+ }
+ }
+ return Tuple2.of(filteredParameterTypes, filteredArgs);
+ }
+
+ /**
+ * Checks whether any of the annotations is of type {@link RpcTimeout}
+ *
+ * @param annotations Array of annotations
+ * @return True if {@link RpcTimeout} was found; otherwise false
+ */
+ private static boolean isRpcTimeout(Annotation[] annotations) {
+ for (Annotation annotation : annotations) {
+ if (annotation.annotationType().equals(RpcTimeout.class)) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/36e4bf90/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerHATest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerHATest.java
new file mode 100644
index 0000000..dfffeda
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerHATest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.resourcemanager;
+
+import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.rpc.MainThreadExecutor;
+import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.StartStoppable;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.UUID;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doCallRealMethod;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * resourceManager HA test, including grant leadership and revoke leadership
+ */
+public class ResourceManagerHATest {
+
+ @Test
+ public void testGrantAndRevokeLeadership() throws Exception {
+ // mock a RpcService which will return a special RpcGateway when call its startServer method, the returned RpcGateway directly execute runAsync call
+ TestingResourceManagerGatewayProxy gateway = mock(TestingResourceManagerGatewayProxy.class);
+ doCallRealMethod().when(gateway).runAsync(any(Runnable.class));
+
+ RpcService rpcService = mock(RpcService.class);
+ when(rpcService.startServer(any(RpcEndpoint.class))).thenReturn(gateway);
+
+ TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService();
+ TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
+ highAvailabilityServices.setResourceManagerLeaderElectionService(leaderElectionService);
+
+ final ResourceManager resourceManager = new ResourceManager(rpcService, highAvailabilityServices);
+ resourceManager.start();
+ // before grant leadership, resourceManager's leaderId is null
+ Assert.assertNull(resourceManager.getLeaderSessionID());
+ final UUID leaderId = UUID.randomUUID();
+ leaderElectionService.isLeader(leaderId);
+ // after grant leadership, resourceManager's leaderId has value
+ Assert.assertEquals(leaderId, resourceManager.getLeaderSessionID());
+ // then revoke leadership, resourceManager's leaderId is null again
+ leaderElectionService.notLeader();
+ Assert.assertNull(resourceManager.getLeaderSessionID());
+ }
+
+ private static abstract class TestingResourceManagerGatewayProxy implements MainThreadExecutor, StartStoppable, RpcGateway {
+ @Override
+ public void runAsync(Runnable runnable) {
+ runnable.run();
+ }
+ }
+
+}