You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/10/02 21:58:24 UTC

[19/50] [abbrv] flink git commit: [FLINK-4516] leader election of resourcemanager

[FLINK-4516] leader election of resourcemanager

- add serial rpc service
- add a special rpcService implementation which directly executes the asynchronous calls serially one by one, it is just for testcase
- Change ResourceManagerLeaderContender code and TestingSerialRpcService code
- override shutdown logic to stop leadershipService
- use a mocked RpcService rather than TestingSerialRpcService for resourceManager HA test

This closes #2427


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fa0fe0d4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fa0fe0d4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fa0fe0d4

Branch: refs/heads/flip-6
Commit: fa0fe0d4d4a6428039c3ed343059d9fa5a75b80d
Parents: 1c9f9d4
Author: beyond1920 <be...@126.com>
Authored: Sat Aug 27 14:14:28 2016 +0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Sun Oct 2 23:44:43 2016 +0200

----------------------------------------------------------------------
 .../HighAvailabilityServices.java               |   7 +
 .../runtime/highavailability/NonHaServices.java |   5 +
 .../rpc/resourcemanager/ResourceManager.java    | 111 +++++-
 .../TestingHighAvailabilityServices.java        |  19 +-
 .../runtime/rpc/TestingSerialRpcService.java    | 369 +++++++++++++++++++
 .../resourcemanager/ResourceManagerHATest.java  |  76 ++++
 6 files changed, 578 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fa0fe0d4/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
index 73e4f1f..298147c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
@@ -40,6 +40,13 @@ public interface HighAvailabilityServices {
 	LeaderRetrievalService getResourceManagerLeaderRetriever() throws Exception;
 
 	/**
+	 * Gets the leader election service for the cluster's resource manager.
+	 * @return
+	 * @throws Exception
+	 */
+	LeaderElectionService getResourceManagerLeaderElectionService() throws Exception;
+
+	/**
 	 * Gets the leader election service for the given job.
 	 *
 	 * @param jobID The identifier of the job running the election.

http://git-wip-us.apache.org/repos/asf/flink/blob/fa0fe0d4/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
index 3d2769b..292a404 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
@@ -61,6 +61,11 @@ public class NonHaServices implements HighAvailabilityServices {
 	}
 
 	@Override
+	public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception {
+		return new StandaloneLeaderElectionService();
+	}
+
+	@Override
 	public LeaderElectionService getJobMasterLeaderElectionService(JobID jobID) throws Exception {
 		return new StandaloneLeaderElectionService();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/fa0fe0d4/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
index 6f34465..f7147c9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
@@ -20,24 +20,26 @@ package org.apache.flink.runtime.rpc.resourcemanager;
 
 import akka.dispatch.Mapper;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.jobmaster.JobMaster;
 import org.apache.flink.runtime.rpc.jobmaster.JobMasterGateway;
 import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutorRegistrationSuccess;
-import org.apache.flink.util.Preconditions;
 
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.ExecutionContext$;
 import scala.concurrent.Future;
 
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
-import java.util.concurrent.ExecutorService;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * ResourceManager implementation. The resource manager is responsible for resource de-/allocation
@@ -50,16 +52,51 @@ import java.util.concurrent.ExecutorService;
  * </ul>
  */
 public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
-	private final ExecutionContext executionContext;
 	private final Map<JobMasterGateway, InstanceID> jobMasterGateways;
+	private final HighAvailabilityServices highAvailabilityServices;
+	private LeaderElectionService leaderElectionService = null;
+	private UUID leaderSessionID = null;
 
-	public ResourceManager(RpcService rpcService, ExecutorService executorService) {
+	public ResourceManager(RpcService rpcService, HighAvailabilityServices highAvailabilityServices) {
 		super(rpcService);
-		this.executionContext = ExecutionContext$.MODULE$.fromExecutor(
-			Preconditions.checkNotNull(executorService));
+		this.highAvailabilityServices = checkNotNull(highAvailabilityServices);
 		this.jobMasterGateways = new HashMap<>();
 	}
 
+	@Override
+	public void start() {
+		// start a leader
+		try {
+			super.start();
+			leaderElectionService = highAvailabilityServices.getResourceManagerLeaderElectionService();
+			leaderElectionService.start(new ResourceManagerLeaderContender());
+		} catch (Throwable e) {
+			log.error("A fatal error happened when starting the ResourceManager", e);
+			throw new RuntimeException("A fatal error happened when starting the ResourceManager", e);
+		}
+	}
+
+	@Override
+	public void shutDown() {
+		try {
+			leaderElectionService.stop();
+			super.shutDown();
+		} catch(Throwable e) {
+			log.error("A fatal error happened when shutdown the ResourceManager", e);
+			throw new RuntimeException("A fatal error happened when shutdown the ResourceManager", e);
+		}
+	}
+
+	/**
+	 * Gets the leader session id of current resourceManager.
+	 *
+	 * @return return the leaderSessionId of current resourceManager, this returns null until the current resourceManager is granted leadership.
+	 */
+	@VisibleForTesting
+	UUID getLeaderSessionID() {
+		return leaderSessionID;
+	}
+
 	/**
 	 * Register a {@link JobMaster} at the resource manager.
 	 *
@@ -116,4 +153,62 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
 
 		return new TaskExecutorRegistrationSuccess(new InstanceID(), 5000);
 	}
+
+	private class ResourceManagerLeaderContender implements LeaderContender {
+
+		/**
+		 * Callback method when current resourceManager is granted leadership
+		 *
+		 * @param leaderSessionID unique leadershipID
+		 */
+		@Override
+		public void grantLeadership(final UUID leaderSessionID) {
+			runAsync(new Runnable() {
+				@Override
+				public void run() {
+					log.info("ResourceManager {} was granted leadership with leader session ID {}", getAddress(), leaderSessionID);
+					ResourceManager.this.leaderSessionID = leaderSessionID;
+					// confirming the leader session ID might be blocking,
+					leaderElectionService.confirmLeaderSessionID(leaderSessionID);
+				}
+			});
+		}
+
+		/**
+		 * Callback method when current resourceManager lose leadership.
+		 */
+		@Override
+		public void revokeLeadership() {
+			runAsync(new Runnable() {
+				@Override
+				public void run() {
+					log.info("ResourceManager {} was revoked leadership.", getAddress());
+					jobMasterGateways.clear();
+					leaderSessionID = null;
+				}
+			});
+		}
+
+		@Override
+		public String getAddress() {
+			return ResourceManager.this.getAddress();
+		}
+
+		/**
+		 * Handles error occurring in the leader election service
+		 *
+		 * @param exception Exception being thrown in the leader election service
+		 */
+		@Override
+		public void handleError(final Exception exception) {
+			runAsync(new Runnable() {
+				@Override
+				public void run() {
+					log.error("ResourceManager received an error from the LeaderElectionService.", exception);
+					// terminate ResourceManager in case of an error
+					shutDown();
+				}
+			});
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fa0fe0d4/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
index 4d654a3..3162f40 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
@@ -32,6 +32,8 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
 
 	private volatile LeaderElectionService jobMasterLeaderElectionService;
 
+	private volatile LeaderElectionService resourceManagerLeaderElectionService;
+
 
 	// ------------------------------------------------------------------------
 	//  Setters for mock / testing implementations
@@ -44,7 +46,11 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
 	public void setJobMasterLeaderElectionService(LeaderElectionService leaderElectionService) {
 		this.jobMasterLeaderElectionService = leaderElectionService;
 	}
-	
+
+	public void setResourceManagerLeaderElectionService(LeaderElectionService leaderElectionService) {
+		this.resourceManagerLeaderElectionService = leaderElectionService;
+	}
+
 	// ------------------------------------------------------------------------
 	//  HA Services Methods
 	// ------------------------------------------------------------------------
@@ -69,4 +75,15 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
 			throw new IllegalStateException("JobMasterLeaderElectionService has not been set");
 		}
 	}
+
+	@Override
+	public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception {
+		LeaderElectionService service = resourceManagerLeaderElectionService;
+
+		if (service != null) {
+			return service;
+		} else {
+			throw new IllegalStateException("ResourceManagerLeaderElectionService has not been set");
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fa0fe0d4/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
new file mode 100644
index 0000000..7bdbb99
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import akka.dispatch.ExecutionContexts;
+import akka.dispatch.Futures;
+import akka.util.Timeout;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.util.DirectExecutorService;
+import org.apache.flink.util.Preconditions;
+import scala.concurrent.Await;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.lang.annotation.Annotation;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.BitSet;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+
+/**
+ * An RPC Service implementation for testing. This RPC service directly executes all asynchronous calls one by one in the main thread.
+ */
+public class TestingSerialRpcService implements RpcService {
+
+	private final DirectExecutorService executorService;
+	private final ConcurrentHashMap<String, RpcGateway> registeredConnections;
+
+	public TestingSerialRpcService() {
+		executorService = new DirectExecutorService();
+		this.registeredConnections = new ConcurrentHashMap<>();
+	}
+
+	@Override
+	public void scheduleRunnable(final Runnable runnable, final long delay, final TimeUnit unit) {
+		try {
+			unit.sleep(delay);
+			runnable.run();
+		} catch (Throwable e) {
+			throw new RuntimeException(e);
+		}
+	}
+
+	@Override
+	public ExecutionContext getExecutionContext() {
+		return ExecutionContexts.fromExecutorService(executorService);
+	}
+
+	@Override
+	public void stopService() {
+		executorService.shutdown();
+		registeredConnections.clear();
+	}
+
+	@Override
+	public void stopServer(RpcGateway selfGateway) {
+
+	}
+
+	@Override
+	public <C extends RpcGateway, S extends RpcEndpoint<C>> C startServer(S rpcEndpoint) {
+		final String address = UUID.randomUUID().toString();
+
+		InvocationHandler akkaInvocationHandler = new TestingSerialInvocationHandler(address, rpcEndpoint);
+		ClassLoader classLoader = getClass().getClassLoader();
+
+		@SuppressWarnings("unchecked")
+		C self = (C) Proxy.newProxyInstance(
+			classLoader,
+			new Class<?>[]{
+				rpcEndpoint.getSelfGatewayType(),
+				MainThreadExecutor.class,
+				StartStoppable.class,
+				RpcGateway.class
+			},
+			akkaInvocationHandler);
+
+		return self;
+	}
+
+	@Override
+	public <C extends RpcGateway> Future<C> connect(String address, Class<C> clazz) {
+		RpcGateway gateway = registeredConnections.get(address);
+
+		if (gateway != null) {
+			if (clazz.isAssignableFrom(gateway.getClass())) {
+				@SuppressWarnings("unchecked")
+				C typedGateway = (C) gateway;
+				return Futures.successful(typedGateway);
+			} else {
+				return Futures.failed(
+					new Exception("Gateway registered under " + address + " is not of type " + clazz));
+			}
+		} else {
+			return Futures.failed(new Exception("No gateway registered under that name"));
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	// connections
+	// ------------------------------------------------------------------------
+
+	public void registerGateway(String address, RpcGateway gateway) {
+		checkNotNull(address);
+		checkNotNull(gateway);
+
+		if (registeredConnections.putIfAbsent(address, gateway) != null) {
+			throw new IllegalStateException("a gateway is already registered under " + address);
+		}
+	}
+
+	private static class TestingSerialInvocationHandler<C extends RpcGateway, T extends RpcEndpoint<C>> implements InvocationHandler, RpcGateway, MainThreadExecutor, StartStoppable {
+
+		private final T rpcEndpoint;
+
+		/** default timeout for asks */
+		private final Timeout timeout;
+
+		private final String address;
+
+		private TestingSerialInvocationHandler(String address, T rpcEndpoint) {
+			this(address, rpcEndpoint, new Timeout(new FiniteDuration(10, TimeUnit.SECONDS)));
+		}
+
+		private TestingSerialInvocationHandler(String address, T rpcEndpoint, Timeout timeout) {
+			this.rpcEndpoint = rpcEndpoint;
+			this.timeout = timeout;
+			this.address = address;
+		}
+
+		@Override
+		public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+			Class<?> declaringClass = method.getDeclaringClass();
+			if (declaringClass.equals(MainThreadExecutor.class) ||
+				declaringClass.equals(Object.class) || declaringClass.equals(StartStoppable.class) ||
+				declaringClass.equals(RpcGateway.class)) {
+				return method.invoke(this, args);
+			} else {
+				final String methodName = method.getName();
+				Class<?>[] parameterTypes = method.getParameterTypes();
+				Annotation[][] parameterAnnotations = method.getParameterAnnotations();
+				Timeout futureTimeout = extractRpcTimeout(parameterAnnotations, args, timeout);
+
+				final Tuple2<Class<?>[], Object[]> filteredArguments = filterArguments(
+					parameterTypes,
+					parameterAnnotations,
+					args);
+
+				Class<?> returnType = method.getReturnType();
+
+				if (returnType.equals(Future.class)) {
+					try {
+						Object result = handleRpcInvocationSync(methodName, filteredArguments.f0, filteredArguments.f1, futureTimeout);
+						return Futures.successful(result);
+					} catch (Throwable e) {
+						return Futures.failed(e);
+					}
+				} else {
+					return handleRpcInvocationSync(methodName, filteredArguments.f0, filteredArguments.f1, futureTimeout);
+				}
+			}
+		}
+
+		/**
+		 * Handle rpc invocations by looking up the rpc method on the rpc endpoint and calling this
+		 * method with the provided method arguments. If the method has a return value, it is returned
+		 * to the sender of the call.
+		 */
+		private Object handleRpcInvocationSync(final String methodName,
+			final Class<?>[] parameterTypes,
+			final Object[] args,
+			final Timeout futureTimeout) throws Exception {
+			final Method rpcMethod = lookupRpcMethod(methodName, parameterTypes);
+			Object result = rpcMethod.invoke(rpcEndpoint, args);
+
+			if (result != null && result instanceof Future) {
+				Future<?> future = (Future<?>) result;
+				return Await.result(future, futureTimeout.duration());
+			} else {
+				return result;
+			}
+		}
+
+		@Override
+		public void runAsync(Runnable runnable) {
+			runnable.run();
+		}
+
+		@Override
+		public <V> Future<V> callAsync(Callable<V> callable, Timeout callTimeout) {
+			try {
+				return Futures.successful(callable.call());
+			} catch (Throwable e) {
+				return Futures.failed(e);
+			}
+		}
+
+		@Override
+		public void scheduleRunAsync(final Runnable runnable, final long delay) {
+			try {
+				TimeUnit.MILLISECONDS.sleep(delay);
+				runnable.run();
+			} catch (Throwable e) {
+				throw new RuntimeException(e);
+			}
+		}
+
+		@Override
+		public String getAddress() {
+			return address;
+		}
+
+		@Override
+		public void start() {
+			// do nothing
+		}
+
+		@Override
+		public void stop() {
+			// do nothing
+		}
+
+		/**
+		 * Look up the rpc method on the given {@link RpcEndpoint} instance.
+		 *
+		 * @param methodName     Name of the method
+		 * @param parameterTypes Parameter types of the method
+		 * @return Method of the rpc endpoint
+		 * @throws NoSuchMethodException Thrown if the method with the given name and parameter types
+		 *                               cannot be found at the rpc endpoint
+		 */
+		private Method lookupRpcMethod(final String methodName,
+			final Class<?>[] parameterTypes) throws NoSuchMethodException {
+			return rpcEndpoint.getClass().getMethod(methodName, parameterTypes);
+		}
+
+		// ------------------------------------------------------------------------
+		//  Helper methods
+		// ------------------------------------------------------------------------
+
+		/**
+		 * Extracts the {@link RpcTimeout} annotated rpc timeout value from the list of given method
+		 * arguments. If no {@link RpcTimeout} annotated parameter could be found, then the default
+		 * timeout is returned.
+		 *
+		 * @param parameterAnnotations Parameter annotations
+		 * @param args                 Array of arguments
+		 * @param defaultTimeout       Default timeout to return if no {@link RpcTimeout} annotated parameter
+		 *                             has been found
+		 * @return Timeout extracted from the array of arguments or the default timeout
+		 */
+		private static Timeout extractRpcTimeout(Annotation[][] parameterAnnotations, Object[] args,
+			Timeout defaultTimeout) {
+			if (args != null) {
+				Preconditions.checkArgument(parameterAnnotations.length == args.length);
+
+				for (int i = 0; i < parameterAnnotations.length; i++) {
+					if (isRpcTimeout(parameterAnnotations[i])) {
+						if (args[i] instanceof FiniteDuration) {
+							return new Timeout((FiniteDuration) args[i]);
+						} else {
+							throw new RuntimeException("The rpc timeout parameter must be of type " +
+								FiniteDuration.class.getName() + ". The type " + args[i].getClass().getName() +
+								" is not supported.");
+						}
+					}
+				}
+			}
+
+			return defaultTimeout;
+		}
+
+		/**
+		 * Removes all {@link RpcTimeout} annotated parameters from the parameter type and argument
+		 * list.
+		 *
+		 * @param parameterTypes       Array of parameter types
+		 * @param parameterAnnotations Array of parameter annotations
+		 * @param args                 Arary of arguments
+		 * @return Tuple of filtered parameter types and arguments which no longer contain the
+		 * {@link RpcTimeout} annotated parameter types and arguments
+		 */
+		private static Tuple2<Class<?>[], Object[]> filterArguments(
+			Class<?>[] parameterTypes,
+			Annotation[][] parameterAnnotations,
+			Object[] args) {
+
+			Class<?>[] filteredParameterTypes;
+			Object[] filteredArgs;
+
+			if (args == null) {
+				filteredParameterTypes = parameterTypes;
+				filteredArgs = null;
+			} else {
+				Preconditions.checkArgument(parameterTypes.length == parameterAnnotations.length);
+				Preconditions.checkArgument(parameterAnnotations.length == args.length);
+
+				BitSet isRpcTimeoutParameter = new BitSet(parameterTypes.length);
+				int numberRpcParameters = parameterTypes.length;
+
+				for (int i = 0; i < parameterTypes.length; i++) {
+					if (isRpcTimeout(parameterAnnotations[i])) {
+						isRpcTimeoutParameter.set(i);
+						numberRpcParameters--;
+					}
+				}
+
+				if (numberRpcParameters == parameterTypes.length) {
+					filteredParameterTypes = parameterTypes;
+					filteredArgs = args;
+				} else {
+					filteredParameterTypes = new Class<?>[numberRpcParameters];
+					filteredArgs = new Object[numberRpcParameters];
+					int counter = 0;
+
+					for (int i = 0; i < parameterTypes.length; i++) {
+						if (!isRpcTimeoutParameter.get(i)) {
+							filteredParameterTypes[counter] = parameterTypes[i];
+							filteredArgs[counter] = args[i];
+							counter++;
+						}
+					}
+				}
+			}
+			return Tuple2.of(filteredParameterTypes, filteredArgs);
+		}
+
+		/**
+		 * Checks whether any of the annotations is of type {@link RpcTimeout}
+		 *
+		 * @param annotations Array of annotations
+		 * @return True if {@link RpcTimeout} was found; otherwise false
+		 */
+		private static boolean isRpcTimeout(Annotation[] annotations) {
+			for (Annotation annotation : annotations) {
+				if (annotation.annotationType().equals(RpcTimeout.class)) {
+					return true;
+				}
+			}
+
+			return false;
+		}
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fa0fe0d4/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerHATest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerHATest.java
new file mode 100644
index 0000000..dfffeda
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerHATest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.resourcemanager;
+
+import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.rpc.MainThreadExecutor;
+import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.StartStoppable;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.UUID;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doCallRealMethod;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * resourceManager HA test, including grant leadership and revoke leadership
+ */
+public class ResourceManagerHATest {
+
+	@Test
+	public void testGrantAndRevokeLeadership() throws Exception {
+		// mock a RpcService which will return a special RpcGateway when call its startServer method, the returned RpcGateway directly execute runAsync call
+		TestingResourceManagerGatewayProxy gateway = mock(TestingResourceManagerGatewayProxy.class);
+		doCallRealMethod().when(gateway).runAsync(any(Runnable.class));
+
+		RpcService rpcService = mock(RpcService.class);
+		when(rpcService.startServer(any(RpcEndpoint.class))).thenReturn(gateway);
+
+		TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService();
+		TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
+		highAvailabilityServices.setResourceManagerLeaderElectionService(leaderElectionService);
+
+		final ResourceManager resourceManager = new ResourceManager(rpcService, highAvailabilityServices);
+		resourceManager.start();
+		// before grant leadership, resourceManager's leaderId is null
+		Assert.assertNull(resourceManager.getLeaderSessionID());
+		final UUID leaderId = UUID.randomUUID();
+		leaderElectionService.isLeader(leaderId);
+		// after grant leadership, resourceManager's leaderId has value
+		Assert.assertEquals(leaderId, resourceManager.getLeaderSessionID());
+		// then revoke leadership, resourceManager's leaderId is null again
+		leaderElectionService.notLeader();
+		Assert.assertNull(resourceManager.getLeaderSessionID());
+	}
+
+	private static abstract class TestingResourceManagerGatewayProxy implements MainThreadExecutor, StartStoppable, RpcGateway {
+		@Override
+		public void runAsync(Runnable runnable) {
+			runnable.run();
+		}
+	}
+
+}