You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by beyond1920 <gi...@git.apache.org> on 2016/08/27 06:26:51 UTC

[GitHub] flink pull request #2427: [FLINK-4516] [cluster management]leader election o...

GitHub user beyond1920 opened a pull request:

    https://github.com/apache/flink/pull/2427

    [FLINK-4516] [cluster management]leader election of resourcemanager

    This pull request is to implement resourceManager leader Election, which including:
    1. When a resourceManager is started, it starts the leadership election service first and take part in contending for leadership
    2. Every resourceManager contains a ResourceManagerLeaderContender, when it is granted leadership, it will start SlotManager and other main components. when it is revoked leadership, it will stop all its components and clear everything.
    
    Main difference are 3 points:
    1. Add ResourceManagerLeaderContender 
    2. Add getResourceManagerLeaderElectionService method in HighAvailabilityServices, NonHaServices, TestingHighAvailabilityServices to get leadership election service for resourceManager
    3. Add a test for ResourceManager HA

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/alibaba/flink jira-4516

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2427.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2427
    
----
commit 3117989b87c5e9a002251353a47271fed0a84271
Author: beyond1920 <be...@126.com>
Date:   2016-08-27T06:14:28Z

    leader election of resourcemanager

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2427: [FLINK-4516] [cluster management]leader election o...

Posted by beyond1920 <gi...@git.apache.org>.
Github user beyond1920 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2427#discussion_r76779266
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerHATest.java ---
    @@ -0,0 +1,86 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rpc.resourcemanager;
    +
    +import akka.util.Timeout;
    +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
    +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
    +import org.apache.flink.runtime.rpc.RpcService;
    +import org.apache.flink.runtime.rpc.TestingRpcService;
    +import org.junit.After;
    +import org.junit.Assert;
    +import org.junit.Before;
    +import org.junit.Test;
    +import scala.concurrent.Await;
    +import scala.concurrent.Future;
    +
    +import java.util.UUID;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * resourceManager HA test, including grant leadership and revoke leadership
    + */
    +public class ResourceManagerHATest {
    +
    +	private RpcService rpcService;
    +
    +	@Before
    +	public void setup() throws Exception {
    +		rpcService = new TestingRpcService();
    +	}
    +
    +	@After
    +	public void teardown() throws Exception {
    +		rpcService.stopService();
    +	}
    +
    +	@Test
    +	public void testGrantAndRevokeLeadership() throws Exception {
    --- End diff --
    
    Hi till, thanks for your good advice. I add a new rpcService implementation for test. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2427: [FLINK-4516] [cluster management]leader election o...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2427#discussion_r76810632
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java ---
    @@ -116,4 +142,78 @@ public SlotAssignment requestSlot(SlotRequest slotRequest) {
     
     		return new TaskExecutorRegistrationSuccess(new InstanceID(), 5000);
     	}
    +
    +	/**
    +	 * Callback method when current resourceManager is granted leadership
    +	 *
    +	 * @param newLeaderSessionID unique leadershipID
    +	 */
    +	void handleGrantLeadership(final UUID newLeaderSessionID) {
    +		runAsync(new Runnable() {
    +			@Override
    +			public void run() {
    +				log.info("ResourceManager {} was granted leadership with leader session ID {}", getAddress(), newLeaderSessionID);
    +				leaderSessionID = newLeaderSessionID;
    +				// confirming the leader session ID might be blocking,
    +				leaderElectionService.confirmLeaderSessionID(newLeaderSessionID);
    +			}
    +		});
    +	}
    +
    +	/**
    +	 * Callback method when current resourceManager lose leadership.
    +	 */
    +	void handleRevokeLeadership() {
    --- End diff --
    
    This method can be private.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2427: [FLINK-4516] [cluster management]leader election of resou...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on the issue:

    https://github.com/apache/flink/pull/2427
  
    Thank you for the update @beyond1920. I will merge your changes later on.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2427: [FLINK-4516] [cluster management]leader election o...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2427#discussion_r76816215
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java ---
    @@ -0,0 +1,336 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rpc;
    +
    +import akka.dispatch.ExecutionContexts;
    +import akka.dispatch.Futures;
    +import akka.util.Timeout;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.runtime.util.DirectExecutorService;
    +import org.apache.flink.util.Preconditions;
    +import scala.concurrent.Await;
    +import scala.concurrent.ExecutionContext;
    +import scala.concurrent.Future;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import java.lang.annotation.Annotation;
    +import java.lang.reflect.InvocationHandler;
    +import java.lang.reflect.Method;
    +import java.lang.reflect.Proxy;
    +import java.util.BitSet;
    +import java.util.UUID;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.TimeUnit;
    +
    +
    +/**
    + * An RPC Service implementation for testing. This RPC service directly executes all asynchronous calls one by one in the main thread.
    + */
    +public class TestingSerialRpcService extends TestingRpcService {
    --- End diff --
    
    This still extends the full `TestingRpcService` which extends the `AkkaService`. I think we can just implement this as a direct `RpcService`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2427: [FLINK-4516] [cluster management]leader election o...

Posted by beyond1920 <gi...@git.apache.org>.
Github user beyond1920 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2427#discussion_r76738145
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerHATest.java ---
    @@ -0,0 +1,86 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rpc.resourcemanager;
    +
    +import akka.util.Timeout;
    +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
    +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
    +import org.apache.flink.runtime.rpc.RpcService;
    +import org.apache.flink.runtime.rpc.TestingRpcService;
    +import org.junit.After;
    +import org.junit.Assert;
    +import org.junit.Before;
    +import org.junit.Test;
    +import scala.concurrent.Await;
    +import scala.concurrent.Future;
    +
    +import java.util.UUID;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * resourceManager HA test, including grant leadership and revoke leadership
    + */
    +public class ResourceManagerHATest {
    +
    +	private RpcService rpcService;
    +
    +	@Before
    +	public void setup() throws Exception {
    +		rpcService = new TestingRpcService();
    +	}
    +
    +	@After
    +	public void teardown() throws Exception {
    +		rpcService.stopService();
    +	}
    +
    +	@Test
    +	public void testGrantAndRevokeLeadership() throws Exception {
    --- End diff --
    
    Hi, till.  Even I had a RpcService which directly executes the asynchronous calls, I cannot simply call  resourceManager.getLeaderSessionID(). Because  although everything function call in the ResourceManager was executed serially is on the new RpcService,  but resourceManager.getLeaderSessionId() is running in the main thread of the testCase if I direct call it instead of execute it in the new  RpcService, right ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2427: [FLINK-4516] [cluster management]leader election o...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2427#discussion_r76961323
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerHATest.java ---
    @@ -0,0 +1,68 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rpc.resourcemanager;
    +
    +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
    +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
    +import org.apache.flink.runtime.rpc.RpcService;
    +import org.apache.flink.runtime.rpc.TestingSerialRpcService;
    +import org.junit.After;
    +import org.junit.Assert;
    +import org.junit.Before;
    +import org.junit.Test;
    +
    +import java.util.UUID;
    +
    +/**
    + * resourceManager HA test, including grant leadership and revoke leadership
    + */
    +public class ResourceManagerHATest {
    +
    +	private RpcService rpcService;
    +
    +	@Before
    +	public void setup() throws Exception {
    +		rpcService = new TestingSerialRpcService();
    --- End diff --
    
    I think it would suffice to use a mocked `RpcService` for the test, e.g. `Mockito.mock(RpcService.class)`. Nevertheless, the serial rpc service could be useful in other tests which don't need a full AkkaRpcService.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2427: [FLINK-4516] [cluster management]leader election of resou...

Posted by beyond1920 <gi...@git.apache.org>.
Github user beyond1920 commented on the issue:

    https://github.com/apache/flink/pull/2427
  
    @mxm , thanks for your review. I had change the code based on your advice, the changes are following:
    1. move all code in the methods handleGrantLeadership, revokeLeadership, onLeaderElectionError  to ResourceManagerLeaderContender class from ResourceManager
    2. change getAddress method in ResourceManagerLeaderContender.
    3. change callAsync method in TestingSerialRpcService, now it will be executed immediately rather than wait for some time.
    4. change TestingSerialRpcService to extend RpcService directly rather than TestingRpcService.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2427: [FLINK-4516] [cluster management]leader election o...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2427#discussion_r76813885
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java ---
    @@ -116,4 +142,78 @@ public SlotAssignment requestSlot(SlotRequest slotRequest) {
     
     		return new TaskExecutorRegistrationSuccess(new InstanceID(), 5000);
     	}
    +
    +	/**
    +	 * Callback method when current resourceManager is granted leadership
    +	 *
    +	 * @param newLeaderSessionID unique leadershipID
    +	 */
    +	void handleGrantLeadership(final UUID newLeaderSessionID) {
    +		runAsync(new Runnable() {
    +			@Override
    +			public void run() {
    +				log.info("ResourceManager {} was granted leadership with leader session ID {}", getAddress(), newLeaderSessionID);
    +				leaderSessionID = newLeaderSessionID;
    +				// confirming the leader session ID might be blocking,
    +				leaderElectionService.confirmLeaderSessionID(newLeaderSessionID);
    +			}
    +		});
    +	}
    +
    +	/**
    +	 * Callback method when current resourceManager lose leadership.
    +	 */
    +	void handleRevokeLeadership() {
    +		runAsync(new Runnable() {
    +			@Override
    +			public void run() {
    +				log.info("ResourceManager {} was revoked leadership.", getAddress());
    +				jobMasterGateways.clear();
    +				leaderSessionID = null;
    +			}
    +		});
    +	}
    +
    +	/**
    +	 * Callback method when an error happened to current resourceManager on leader election
    +	 * @param e
    +	 */
    +	void onLeaderElectionError(final Throwable e) {
    +		runAsync(new Runnable() {
    +			@Override
    +			public void run() {
    +				log.error("ResourceManager received an error from the LeaderElectionService.", e);
    +				// terminate ResourceManager in case of an error
    +				shutDown();
    +			}
    +		});
    +	}
    +
    +	private class ResourceManagerLeaderContender implements LeaderContender {
    +
    +		@Override
    +		public void grantLeadership(UUID leaderSessionID) {
    +			handleGrantLeadership(leaderSessionID);
    +		}
    +
    +		@Override
    +		public void revokeLeadership() {
    +			handleRevokeLeadership();
    +		}
    +
    +		@Override
    +		public String getAddress() {
    +			return getAddress();
    --- End diff --
    
    This is a recursive call which will lead to a StackOverflow.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2427: [FLINK-4516] [cluster management]leader election o...

Posted by beyond1920 <gi...@git.apache.org>.
Github user beyond1920 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2427#discussion_r76909528
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java ---
    @@ -0,0 +1,336 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rpc;
    +
    +import akka.dispatch.ExecutionContexts;
    +import akka.dispatch.Futures;
    +import akka.util.Timeout;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.runtime.util.DirectExecutorService;
    +import org.apache.flink.util.Preconditions;
    +import scala.concurrent.Await;
    +import scala.concurrent.ExecutionContext;
    +import scala.concurrent.Future;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import java.lang.annotation.Annotation;
    +import java.lang.reflect.InvocationHandler;
    +import java.lang.reflect.Method;
    +import java.lang.reflect.Proxy;
    +import java.util.BitSet;
    +import java.util.UUID;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.TimeUnit;
    +
    +
    +/**
    + * An RPC Service implementation for testing. This RPC service directly executes all asynchronous calls one by one in the main thread.
    + */
    +public class TestingSerialRpcService extends TestingRpcService {
    +
    +	private final DirectExecutorService executorService;
    +	private final ConcurrentHashMap<String, RpcGateway> registeredConnections;
    +
    +	public TestingSerialRpcService() {
    +		executorService = new DirectExecutorService();
    +		this.registeredConnections = new ConcurrentHashMap<>();
    +	}
    +
    +	@Override
    +	public void scheduleRunnable(final Runnable runnable, final long delay, final TimeUnit unit) {
    +		try {
    +			unit.sleep(delay);
    +			runnable.run();
    +		} catch (Throwable e) {
    +			throw new RuntimeException(e);
    +		}
    +	}
    +
    +	@Override
    +	public ExecutionContext getExecutionContext() {
    +		return ExecutionContexts.fromExecutorService(executorService);
    +	}
    +
    +	@Override
    +	public void stopService() {
    +		executorService.shutdown();
    +		registeredConnections.clear();
    +	}
    +
    +	@Override
    +	public void stopServer(RpcGateway selfGateway) {
    +
    +	}
    +
    +	@Override
    +	public <C extends RpcGateway, S extends RpcEndpoint<C>> C startServer(S rpcEndpoint) {
    +		final String address = UUID.randomUUID().toString();
    +
    +		InvocationHandler akkaInvocationHandler = new TestingSerialInvocationHandler(address, rpcEndpoint);
    +		ClassLoader classLoader = getClass().getClassLoader();
    +
    +		@SuppressWarnings("unchecked")
    +		C self = (C) Proxy.newProxyInstance(
    +			classLoader,
    +			new Class<?>[]{
    +				rpcEndpoint.getSelfGatewayType(),
    +				MainThreadExecutor.class,
    +				StartStoppable.class,
    +				RpcGateway.class},
    +			akkaInvocationHandler);
    +
    +		return self;
    +	}
    +
    +	private static class TestingSerialInvocationHandler<C extends RpcGateway, T extends RpcEndpoint<C>> implements InvocationHandler, RpcGateway, MainThreadExecutor, StartStoppable {
    +
    +		private final T rpcEndpoint;
    +
    +		/** default timeout for asks */
    +		private final Timeout timeout;
    +
    +		private final String address;
    +
    +		private TestingSerialInvocationHandler(String address, T rpcEndpoint) {
    +			this(address, rpcEndpoint, new Timeout(new FiniteDuration(10, TimeUnit.SECONDS)));
    +		}
    +
    +		private TestingSerialInvocationHandler(String address, T rpcEndpoint, Timeout timeout) {
    +			this.rpcEndpoint = rpcEndpoint;
    +			this.timeout = timeout;
    +			this.address = address;
    +		}
    +
    +		@Override
    +		public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    +			Class<?> declaringClass = method.getDeclaringClass();
    +			if (declaringClass.equals(MainThreadExecutor.class) ||
    +				declaringClass.equals(Object.class) || declaringClass.equals(StartStoppable.class) ||
    +				declaringClass.equals(RpcGateway.class)) {
    +				return method.invoke(this, args);
    +			} else {
    +				final String methodName = method.getName();
    +				Class<?>[] parameterTypes = method.getParameterTypes();
    +				Annotation[][] parameterAnnotations = method.getParameterAnnotations();
    +				Timeout futureTimeout = extractRpcTimeout(parameterAnnotations, args, timeout);
    +
    +				final Tuple2<Class<?>[], Object[]> filteredArguments = filterArguments(
    +					parameterTypes,
    +					parameterAnnotations,
    +					args);
    +
    +				Class<?> returnType = method.getReturnType();
    +
    +				if (returnType.equals(Future.class)) {
    +					try {
    +						Object result = handleRpcInvocationSync(methodName, filteredArguments.f0, filteredArguments.f1, futureTimeout);
    +						return Futures.successful(result);
    +					} catch (Throwable e) {
    +						return Futures.failed(e);
    +					}
    +				} else {
    +					return handleRpcInvocationSync(methodName, filteredArguments.f0, filteredArguments.f1, futureTimeout);
    +				}
    +			}
    +		}
    +
    +		/**
    +		 * Handle rpc invocations by looking up the rpc method on the rpc endpoint and calling this
    +		 * method with the provided method arguments. If the method has a return value, it is returned
    +		 * to the sender of the call.
    +		 */
    +		private Object handleRpcInvocationSync(final String methodName,
    +			final Class<?>[] parameterTypes,
    +			final Object[] args,
    +			final Timeout futureTimeout) throws Exception {
    +			final Method rpcMethod = lookupRpcMethod(methodName, parameterTypes);
    +			Object result = rpcMethod.invoke(rpcEndpoint, args);
    +
    +			if (result != null && result instanceof Future) {
    +				Future<?> future = (Future<?>) result;
    +				return Await.result(future, futureTimeout.duration());
    +			} else {
    +				return result;
    +			}
    +		}
    +
    +		@Override
    +		public void runAsync(Runnable runnable) {
    +			runnable.run();
    +		}
    +
    +		@Override
    +		public <V> Future<V> callAsync(Callable<V> callable, Timeout callTimeout) {
    +			try {
    +				TimeUnit.MILLISECONDS.sleep(callTimeout.duration().toMillis());
    --- End diff --
    
    yes, i confuse the delay with callTimeout parameters, executing immediately is right. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2427: [FLINK-4516] [cluster management]leader election o...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2427#discussion_r76810647
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java ---
    @@ -116,4 +142,78 @@ public SlotAssignment requestSlot(SlotRequest slotRequest) {
     
     		return new TaskExecutorRegistrationSuccess(new InstanceID(), 5000);
     	}
    +
    +	/**
    +	 * Callback method when current resourceManager is granted leadership
    +	 *
    +	 * @param newLeaderSessionID unique leadershipID
    +	 */
    +	void handleGrantLeadership(final UUID newLeaderSessionID) {
    +		runAsync(new Runnable() {
    +			@Override
    +			public void run() {
    +				log.info("ResourceManager {} was granted leadership with leader session ID {}", getAddress(), newLeaderSessionID);
    +				leaderSessionID = newLeaderSessionID;
    +				// confirming the leader session ID might be blocking,
    +				leaderElectionService.confirmLeaderSessionID(newLeaderSessionID);
    +			}
    +		});
    +	}
    +
    +	/**
    +	 * Callback method when current resourceManager lose leadership.
    +	 */
    +	void handleRevokeLeadership() {
    +		runAsync(new Runnable() {
    +			@Override
    +			public void run() {
    +				log.info("ResourceManager {} was revoked leadership.", getAddress());
    +				jobMasterGateways.clear();
    +				leaderSessionID = null;
    +			}
    +		});
    +	}
    +
    +	/**
    +	 * Callback method when an error happened to current resourceManager on leader election
    +	 * @param e
    +	 */
    +	void onLeaderElectionError(final Throwable e) {
    --- End diff --
    
    This method can be private.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2427: [FLINK-4516] [cluster management]leader election o...

Posted by beyond1920 <gi...@git.apache.org>.
Github user beyond1920 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2427#discussion_r76912353
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java ---
    @@ -116,4 +142,78 @@ public SlotAssignment requestSlot(SlotRequest slotRequest) {
     
     		return new TaskExecutorRegistrationSuccess(new InstanceID(), 5000);
     	}
    +
    +	/**
    +	 * Callback method when current resourceManager is granted leadership
    +	 *
    +	 * @param newLeaderSessionID unique leadershipID
    +	 */
    +	void handleGrantLeadership(final UUID newLeaderSessionID) {
    +		runAsync(new Runnable() {
    +			@Override
    +			public void run() {
    +				log.info("ResourceManager {} was granted leadership with leader session ID {}", getAddress(), newLeaderSessionID);
    +				leaderSessionID = newLeaderSessionID;
    +				// confirming the leader session ID might be blocking,
    +				leaderElectionService.confirmLeaderSessionID(newLeaderSessionID);
    +			}
    +		});
    +	}
    +
    +	/**
    +	 * Callback method when current resourceManager lose leadership.
    +	 */
    +	void handleRevokeLeadership() {
    +		runAsync(new Runnable() {
    +			@Override
    +			public void run() {
    +				log.info("ResourceManager {} was revoked leadership.", getAddress());
    +				jobMasterGateways.clear();
    +				leaderSessionID = null;
    +			}
    +		});
    +	}
    +
    +	/**
    +	 * Callback method when an error happened to current resourceManager on leader election
    +	 * @param e
    +	 */
    +	void onLeaderElectionError(final Throwable e) {
    +		runAsync(new Runnable() {
    +			@Override
    +			public void run() {
    +				log.error("ResourceManager received an error from the LeaderElectionService.", e);
    +				// terminate ResourceManager in case of an error
    +				shutDown();
    +			}
    +		});
    +	}
    +
    +	private class ResourceManagerLeaderContender implements LeaderContender {
    +
    +		@Override
    +		public void grantLeadership(UUID leaderSessionID) {
    +			handleGrantLeadership(leaderSessionID);
    +		}
    +
    +		@Override
    +		public void revokeLeadership() {
    +			handleRevokeLeadership();
    +		}
    +
    +		@Override
    +		public String getAddress() {
    +			return getAddress();
    --- End diff --
    
    yes, sorry for the low level mistake.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2427: [FLINK-4516] [cluster management]leader election o...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2427#discussion_r76813171
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java ---
    @@ -0,0 +1,336 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rpc;
    +
    +import akka.dispatch.ExecutionContexts;
    +import akka.dispatch.Futures;
    +import akka.util.Timeout;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.runtime.util.DirectExecutorService;
    +import org.apache.flink.util.Preconditions;
    +import scala.concurrent.Await;
    +import scala.concurrent.ExecutionContext;
    +import scala.concurrent.Future;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import java.lang.annotation.Annotation;
    +import java.lang.reflect.InvocationHandler;
    +import java.lang.reflect.Method;
    +import java.lang.reflect.Proxy;
    +import java.util.BitSet;
    +import java.util.UUID;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.TimeUnit;
    +
    +
    +/**
    + * An RPC Service implementation for testing. This RPC service directly executes all asynchronous calls one by one in the main thread.
    + */
    +public class TestingSerialRpcService extends TestingRpcService {
    +
    +	private final DirectExecutorService executorService;
    +	private final ConcurrentHashMap<String, RpcGateway> registeredConnections;
    +
    +	public TestingSerialRpcService() {
    +		executorService = new DirectExecutorService();
    +		this.registeredConnections = new ConcurrentHashMap<>();
    +	}
    +
    +	@Override
    +	public void scheduleRunnable(final Runnable runnable, final long delay, final TimeUnit unit) {
    +		try {
    +			unit.sleep(delay);
    +			runnable.run();
    +		} catch (Throwable e) {
    +			throw new RuntimeException(e);
    +		}
    +	}
    +
    +	@Override
    +	public ExecutionContext getExecutionContext() {
    +		return ExecutionContexts.fromExecutorService(executorService);
    +	}
    +
    +	@Override
    +	public void stopService() {
    +		executorService.shutdown();
    +		registeredConnections.clear();
    +	}
    +
    +	@Override
    +	public void stopServer(RpcGateway selfGateway) {
    +
    +	}
    +
    +	@Override
    +	public <C extends RpcGateway, S extends RpcEndpoint<C>> C startServer(S rpcEndpoint) {
    +		final String address = UUID.randomUUID().toString();
    +
    +		InvocationHandler akkaInvocationHandler = new TestingSerialInvocationHandler(address, rpcEndpoint);
    +		ClassLoader classLoader = getClass().getClassLoader();
    +
    +		@SuppressWarnings("unchecked")
    +		C self = (C) Proxy.newProxyInstance(
    +			classLoader,
    +			new Class<?>[]{
    +				rpcEndpoint.getSelfGatewayType(),
    +				MainThreadExecutor.class,
    +				StartStoppable.class,
    +				RpcGateway.class},
    +			akkaInvocationHandler);
    +
    +		return self;
    +	}
    +
    +	private static class TestingSerialInvocationHandler<C extends RpcGateway, T extends RpcEndpoint<C>> implements InvocationHandler, RpcGateway, MainThreadExecutor, StartStoppable {
    +
    +		private final T rpcEndpoint;
    +
    +		/** default timeout for asks */
    +		private final Timeout timeout;
    +
    +		private final String address;
    +
    +		private TestingSerialInvocationHandler(String address, T rpcEndpoint) {
    +			this(address, rpcEndpoint, new Timeout(new FiniteDuration(10, TimeUnit.SECONDS)));
    +		}
    +
    +		private TestingSerialInvocationHandler(String address, T rpcEndpoint, Timeout timeout) {
    +			this.rpcEndpoint = rpcEndpoint;
    +			this.timeout = timeout;
    +			this.address = address;
    +		}
    +
    +		@Override
    +		public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    +			Class<?> declaringClass = method.getDeclaringClass();
    +			if (declaringClass.equals(MainThreadExecutor.class) ||
    +				declaringClass.equals(Object.class) || declaringClass.equals(StartStoppable.class) ||
    +				declaringClass.equals(RpcGateway.class)) {
    +				return method.invoke(this, args);
    +			} else {
    +				final String methodName = method.getName();
    +				Class<?>[] parameterTypes = method.getParameterTypes();
    +				Annotation[][] parameterAnnotations = method.getParameterAnnotations();
    +				Timeout futureTimeout = extractRpcTimeout(parameterAnnotations, args, timeout);
    +
    +				final Tuple2<Class<?>[], Object[]> filteredArguments = filterArguments(
    +					parameterTypes,
    +					parameterAnnotations,
    +					args);
    +
    +				Class<?> returnType = method.getReturnType();
    +
    +				if (returnType.equals(Future.class)) {
    +					try {
    +						Object result = handleRpcInvocationSync(methodName, filteredArguments.f0, filteredArguments.f1, futureTimeout);
    +						return Futures.successful(result);
    +					} catch (Throwable e) {
    +						return Futures.failed(e);
    +					}
    +				} else {
    +					return handleRpcInvocationSync(methodName, filteredArguments.f0, filteredArguments.f1, futureTimeout);
    +				}
    +			}
    +		}
    +
    +		/**
    +		 * Handle rpc invocations by looking up the rpc method on the rpc endpoint and calling this
    +		 * method with the provided method arguments. If the method has a return value, it is returned
    +		 * to the sender of the call.
    +		 */
    +		private Object handleRpcInvocationSync(final String methodName,
    +			final Class<?>[] parameterTypes,
    +			final Object[] args,
    +			final Timeout futureTimeout) throws Exception {
    +			final Method rpcMethod = lookupRpcMethod(methodName, parameterTypes);
    +			Object result = rpcMethod.invoke(rpcEndpoint, args);
    +
    +			if (result != null && result instanceof Future) {
    +				Future<?> future = (Future<?>) result;
    +				return Await.result(future, futureTimeout.duration());
    +			} else {
    +				return result;
    +			}
    +		}
    +
    +		@Override
    +		public void runAsync(Runnable runnable) {
    +			runnable.run();
    +		}
    +
    +		@Override
    +		public <V> Future<V> callAsync(Callable<V> callable, Timeout callTimeout) {
    +			try {
    +				TimeUnit.MILLISECONDS.sleep(callTimeout.duration().toMillis());
    --- End diff --
    
    Why do you sleep here for the timeout duration? We could think about adding some delay `0 > delay < callTimeout` but for now I think executing immediately is fine.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2427: [FLINK-4516] [cluster management]leader election of resou...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on the issue:

    https://github.com/apache/flink/pull/2427
  
    Thank you. Merging.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2427: [FLINK-4516] [cluster management]leader election o...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2427#discussion_r76608948
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerHATest.java ---
    @@ -0,0 +1,86 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rpc.resourcemanager;
    +
    +import akka.util.Timeout;
    +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
    +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
    +import org.apache.flink.runtime.rpc.RpcService;
    +import org.apache.flink.runtime.rpc.TestingRpcService;
    +import org.junit.After;
    +import org.junit.Assert;
    +import org.junit.Before;
    +import org.junit.Test;
    +import scala.concurrent.Await;
    +import scala.concurrent.Future;
    +
    +import java.util.UUID;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * resourceManager HA test, including grant leadership and revoke leadership
    + */
    +public class ResourceManagerHATest {
    +
    +	private RpcService rpcService;
    +
    +	@Before
    +	public void setup() throws Exception {
    +		rpcService = new TestingRpcService();
    +	}
    +
    +	@After
    +	public void teardown() throws Exception {
    +		rpcService.stopService();
    +	}
    +
    +	@Test
    +	public void testGrantAndRevokeLeadership() throws Exception {
    --- End diff --
    
    I think that you don't need to use a fully-blown `AkkaRpcService` here. Better if you use a `RpcService` implementation which directly executes the asynchronous calls. This does not yet exist. So you would have to implement it. Then you don't have to use the `getLatestLeaderId` method. Instead you can simply call `resourceManager.getLeaderSessionID()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2427: [FLINK-4516] [cluster management]leader election o...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2427#discussion_r76810618
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java ---
    @@ -116,4 +142,78 @@ public SlotAssignment requestSlot(SlotRequest slotRequest) {
     
     		return new TaskExecutorRegistrationSuccess(new InstanceID(), 5000);
     	}
    +
    +	/**
    +	 * Callback method when current resourceManager is granted leadership
    +	 *
    +	 * @param newLeaderSessionID unique leadershipID
    +	 */
    +	void handleGrantLeadership(final UUID newLeaderSessionID) {
    --- End diff --
    
    This method can be private.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2427: [FLINK-4516] [cluster management]leader election o...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2427#discussion_r76745278
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerHATest.java ---
    @@ -0,0 +1,86 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rpc.resourcemanager;
    +
    +import akka.util.Timeout;
    +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
    +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
    +import org.apache.flink.runtime.rpc.RpcService;
    +import org.apache.flink.runtime.rpc.TestingRpcService;
    +import org.junit.After;
    +import org.junit.Assert;
    +import org.junit.Before;
    +import org.junit.Test;
    +import scala.concurrent.Await;
    +import scala.concurrent.Future;
    +
    +import java.util.UUID;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * resourceManager HA test, including grant leadership and revoke leadership
    + */
    +public class ResourceManagerHATest {
    +
    +	private RpcService rpcService;
    +
    +	@Before
    +	public void setup() throws Exception {
    +		rpcService = new TestingRpcService();
    +	}
    +
    +	@After
    +	public void teardown() throws Exception {
    +		rpcService.stopService();
    +	}
    +
    +	@Test
    +	public void testGrantAndRevokeLeadership() throws Exception {
    --- End diff --
    
    No, if you use an execution context like the `DirectExecutorService` or something similar to `CallingThreadDispatcher`, then everything will be executed by the calling thread, which in your case is the test thread. If that is the case, then you have a serial execution.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2427: [FLINK-4516] [cluster management]leader election of resou...

Posted by beyond1920 <gi...@git.apache.org>.
Github user beyond1920 commented on the issue:

    https://github.com/apache/flink/pull/2427
  
    Hi @mxm , thanks so much. I already close the PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2427: [FLINK-4516] [cluster management]leader election o...

Posted by beyond1920 <gi...@git.apache.org>.
Github user beyond1920 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2427#discussion_r76909719
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java ---
    @@ -0,0 +1,336 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rpc;
    +
    +import akka.dispatch.ExecutionContexts;
    +import akka.dispatch.Futures;
    +import akka.util.Timeout;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.runtime.util.DirectExecutorService;
    +import org.apache.flink.util.Preconditions;
    +import scala.concurrent.Await;
    +import scala.concurrent.ExecutionContext;
    +import scala.concurrent.Future;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import java.lang.annotation.Annotation;
    +import java.lang.reflect.InvocationHandler;
    +import java.lang.reflect.Method;
    +import java.lang.reflect.Proxy;
    +import java.util.BitSet;
    +import java.util.UUID;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.TimeUnit;
    +
    +
    +/**
    + * An RPC Service implementation for testing. This RPC service directly executes all asynchronous calls one by one in the main thread.
    + */
    +public class TestingSerialRpcService extends TestingRpcService {
    --- End diff --
    
    yes, of course, i thought the new rpcService could use the registerGateway and connect method in TestingRpcService by extending the TestingRpcService. I could change it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2427: [FLINK-4516] [cluster management]leader election o...

Posted by beyond1920 <gi...@git.apache.org>.
Github user beyond1920 closed the pull request at:

    https://github.com/apache/flink/pull/2427


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2427: [FLINK-4516] [cluster management]leader election of resou...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on the issue:

    https://github.com/apache/flink/pull/2427
  
    Auto-closing from a commit doesn't in the feature branch. Could you please close the PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2427: [FLINK-4516] [cluster management]leader election o...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2427#discussion_r76811799
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java ---
    @@ -116,4 +142,78 @@ public SlotAssignment requestSlot(SlotRequest slotRequest) {
     
     		return new TaskExecutorRegistrationSuccess(new InstanceID(), 5000);
     	}
    +
    +	/**
    +	 * Callback method when current resourceManager is granted leadership
    +	 *
    +	 * @param newLeaderSessionID unique leadershipID
    +	 */
    +	void handleGrantLeadership(final UUID newLeaderSessionID) {
    +		runAsync(new Runnable() {
    +			@Override
    +			public void run() {
    +				log.info("ResourceManager {} was granted leadership with leader session ID {}", getAddress(), newLeaderSessionID);
    +				leaderSessionID = newLeaderSessionID;
    +				// confirming the leader session ID might be blocking,
    +				leaderElectionService.confirmLeaderSessionID(newLeaderSessionID);
    +			}
    +		});
    +	}
    +
    +	/**
    +	 * Callback method when current resourceManager lose leadership.
    +	 */
    +	void handleRevokeLeadership() {
    +		runAsync(new Runnable() {
    +			@Override
    +			public void run() {
    +				log.info("ResourceManager {} was revoked leadership.", getAddress());
    +				jobMasterGateways.clear();
    +				leaderSessionID = null;
    +			}
    +		});
    +	}
    +
    +	/**
    +	 * Callback method when an error happened to current resourceManager on leader election
    +	 * @param e
    +	 */
    +	void onLeaderElectionError(final Throwable e) {
    +		runAsync(new Runnable() {
    +			@Override
    +			public void run() {
    +				log.error("ResourceManager received an error from the LeaderElectionService.", e);
    +				// terminate ResourceManager in case of an error
    +				shutDown();
    +			}
    +		});
    +	}
    +
    +	private class ResourceManagerLeaderContender implements LeaderContender {
    +
    +		@Override
    +		public void grantLeadership(UUID leaderSessionID) {
    +			handleGrantLeadership(leaderSessionID);
    --- End diff --
    
    I wonder why do you call methods in the main class? Couldn't all code in the methods `handleGrantLeadership`, `revokeLeadership`, `onLeaderElectionError` not simply be implemented in this class? Of course they should still call `runAsync` to execute in the main RPC thread.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2427: [FLINK-4516] [cluster management]leader election of resou...

Posted by beyond1920 <gi...@git.apache.org>.
Github user beyond1920 commented on the issue:

    https://github.com/apache/flink/pull/2427
  
    @mxm , Thanks for your review. And I change the ResourceManagerHATest to use a mocked RpcService instead of TestingSerialRpcService.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2427: [FLINK-4516] [cluster management]leader election o...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2427#discussion_r76815420
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java ---
    @@ -0,0 +1,336 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rpc;
    +
    +import akka.dispatch.ExecutionContexts;
    +import akka.dispatch.Futures;
    +import akka.util.Timeout;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.runtime.util.DirectExecutorService;
    +import org.apache.flink.util.Preconditions;
    +import scala.concurrent.Await;
    +import scala.concurrent.ExecutionContext;
    +import scala.concurrent.Future;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import java.lang.annotation.Annotation;
    +import java.lang.reflect.InvocationHandler;
    +import java.lang.reflect.Method;
    +import java.lang.reflect.Proxy;
    +import java.util.BitSet;
    +import java.util.UUID;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.TimeUnit;
    +
    +
    +/**
    + * An RPC Service implementation for testing. This RPC service directly executes all asynchronous calls one by one in the main thread.
    + */
    +public class TestingSerialRpcService extends TestingRpcService {
    +
    +	private final DirectExecutorService executorService;
    +	private final ConcurrentHashMap<String, RpcGateway> registeredConnections;
    +
    +	public TestingSerialRpcService() {
    +		executorService = new DirectExecutorService();
    +		this.registeredConnections = new ConcurrentHashMap<>();
    +	}
    +
    +	@Override
    +	public void scheduleRunnable(final Runnable runnable, final long delay, final TimeUnit unit) {
    +		try {
    +			unit.sleep(delay);
    +			runnable.run();
    +		} catch (Throwable e) {
    +			throw new RuntimeException(e);
    +		}
    +	}
    +
    +	@Override
    +	public ExecutionContext getExecutionContext() {
    +		return ExecutionContexts.fromExecutorService(executorService);
    +	}
    +
    +	@Override
    +	public void stopService() {
    +		executorService.shutdown();
    +		registeredConnections.clear();
    +	}
    +
    +	@Override
    +	public void stopServer(RpcGateway selfGateway) {
    +
    +	}
    +
    +	@Override
    +	public <C extends RpcGateway, S extends RpcEndpoint<C>> C startServer(S rpcEndpoint) {
    +		final String address = UUID.randomUUID().toString();
    +
    +		InvocationHandler akkaInvocationHandler = new TestingSerialInvocationHandler(address, rpcEndpoint);
    +		ClassLoader classLoader = getClass().getClassLoader();
    +
    +		@SuppressWarnings("unchecked")
    +		C self = (C) Proxy.newProxyInstance(
    +			classLoader,
    +			new Class<?>[]{
    +				rpcEndpoint.getSelfGatewayType(),
    +				MainThreadExecutor.class,
    +				StartStoppable.class,
    +				RpcGateway.class},
    +			akkaInvocationHandler);
    +
    +		return self;
    +	}
    +
    +	private static class TestingSerialInvocationHandler<C extends RpcGateway, T extends RpcEndpoint<C>> implements InvocationHandler, RpcGateway, MainThreadExecutor, StartStoppable {
    +
    +		private final T rpcEndpoint;
    +
    +		/** default timeout for asks */
    +		private final Timeout timeout;
    +
    +		private final String address;
    +
    +		private TestingSerialInvocationHandler(String address, T rpcEndpoint) {
    +			this(address, rpcEndpoint, new Timeout(new FiniteDuration(10, TimeUnit.SECONDS)));
    +		}
    +
    +		private TestingSerialInvocationHandler(String address, T rpcEndpoint, Timeout timeout) {
    +			this.rpcEndpoint = rpcEndpoint;
    +			this.timeout = timeout;
    +			this.address = address;
    +		}
    +
    +		@Override
    +		public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    +			Class<?> declaringClass = method.getDeclaringClass();
    +			if (declaringClass.equals(MainThreadExecutor.class) ||
    +				declaringClass.equals(Object.class) || declaringClass.equals(StartStoppable.class) ||
    +				declaringClass.equals(RpcGateway.class)) {
    +				return method.invoke(this, args);
    +			} else {
    +				final String methodName = method.getName();
    +				Class<?>[] parameterTypes = method.getParameterTypes();
    +				Annotation[][] parameterAnnotations = method.getParameterAnnotations();
    +				Timeout futureTimeout = extractRpcTimeout(parameterAnnotations, args, timeout);
    +
    +				final Tuple2<Class<?>[], Object[]> filteredArguments = filterArguments(
    +					parameterTypes,
    +					parameterAnnotations,
    +					args);
    +
    +				Class<?> returnType = method.getReturnType();
    +
    +				if (returnType.equals(Future.class)) {
    +					try {
    +						Object result = handleRpcInvocationSync(methodName, filteredArguments.f0, filteredArguments.f1, futureTimeout);
    +						return Futures.successful(result);
    +					} catch (Throwable e) {
    +						return Futures.failed(e);
    +					}
    +				} else {
    +					return handleRpcInvocationSync(methodName, filteredArguments.f0, filteredArguments.f1, futureTimeout);
    +				}
    +			}
    +		}
    +
    +		/**
    +		 * Handle rpc invocations by looking up the rpc method on the rpc endpoint and calling this
    +		 * method with the provided method arguments. If the method has a return value, it is returned
    +		 * to the sender of the call.
    +		 */
    +		private Object handleRpcInvocationSync(final String methodName,
    +			final Class<?>[] parameterTypes,
    +			final Object[] args,
    +			final Timeout futureTimeout) throws Exception {
    +			final Method rpcMethod = lookupRpcMethod(methodName, parameterTypes);
    +			Object result = rpcMethod.invoke(rpcEndpoint, args);
    +
    +			if (result != null && result instanceof Future) {
    +				Future<?> future = (Future<?>) result;
    +				return Await.result(future, futureTimeout.duration());
    +			} else {
    +				return result;
    +			}
    +		}
    +
    +		@Override
    +		public void runAsync(Runnable runnable) {
    +			runnable.run();
    +		}
    +
    +		@Override
    +		public <V> Future<V> callAsync(Callable<V> callable, Timeout callTimeout) {
    +			try {
    +				TimeUnit.MILLISECONDS.sleep(callTimeout.duration().toMillis());
    +				return Futures.successful(callable.call());
    +			} catch (Throwable e) {
    +				return Futures.failed(e);
    +			}
    +		}
    +
    +		@Override
    +		public void scheduleRunAsync(final Runnable runnable, final long delay) {
    +			try {
    +				TimeUnit.MILLISECONDS.sleep(delay);
    +				runnable.run();
    +			} catch (Throwable e) {
    +				throw new RuntimeException(e);
    +			}
    +		}
    +
    +		@Override
    +		public void start() {
    --- End diff --
    
    No need to implement `StartStoppable` if you don't use the methods.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2427: [FLINK-4516] [cluster management]leader election o...

Posted by beyond1920 <gi...@git.apache.org>.
Github user beyond1920 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2427#discussion_r76910556
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java ---
    @@ -0,0 +1,336 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rpc;
    +
    +import akka.dispatch.ExecutionContexts;
    +import akka.dispatch.Futures;
    +import akka.util.Timeout;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.runtime.util.DirectExecutorService;
    +import org.apache.flink.util.Preconditions;
    +import scala.concurrent.Await;
    +import scala.concurrent.ExecutionContext;
    +import scala.concurrent.Future;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import java.lang.annotation.Annotation;
    +import java.lang.reflect.InvocationHandler;
    +import java.lang.reflect.Method;
    +import java.lang.reflect.Proxy;
    +import java.util.BitSet;
    +import java.util.UUID;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.TimeUnit;
    +
    +
    +/**
    + * An RPC Service implementation for testing. This RPC service directly executes all asynchronous calls one by one in the main thread.
    + */
    +public class TestingSerialRpcService extends TestingRpcService {
    +
    +	private final DirectExecutorService executorService;
    +	private final ConcurrentHashMap<String, RpcGateway> registeredConnections;
    +
    +	public TestingSerialRpcService() {
    +		executorService = new DirectExecutorService();
    +		this.registeredConnections = new ConcurrentHashMap<>();
    +	}
    +
    +	@Override
    +	public void scheduleRunnable(final Runnable runnable, final long delay, final TimeUnit unit) {
    +		try {
    +			unit.sleep(delay);
    +			runnable.run();
    +		} catch (Throwable e) {
    +			throw new RuntimeException(e);
    +		}
    +	}
    +
    +	@Override
    +	public ExecutionContext getExecutionContext() {
    +		return ExecutionContexts.fromExecutorService(executorService);
    +	}
    +
    +	@Override
    +	public void stopService() {
    +		executorService.shutdown();
    +		registeredConnections.clear();
    +	}
    +
    +	@Override
    +	public void stopServer(RpcGateway selfGateway) {
    +
    +	}
    +
    +	@Override
    +	public <C extends RpcGateway, S extends RpcEndpoint<C>> C startServer(S rpcEndpoint) {
    +		final String address = UUID.randomUUID().toString();
    +
    +		InvocationHandler akkaInvocationHandler = new TestingSerialInvocationHandler(address, rpcEndpoint);
    +		ClassLoader classLoader = getClass().getClassLoader();
    +
    +		@SuppressWarnings("unchecked")
    +		C self = (C) Proxy.newProxyInstance(
    +			classLoader,
    +			new Class<?>[]{
    +				rpcEndpoint.getSelfGatewayType(),
    +				MainThreadExecutor.class,
    +				StartStoppable.class,
    +				RpcGateway.class},
    +			akkaInvocationHandler);
    +
    +		return self;
    +	}
    +
    +	private static class TestingSerialInvocationHandler<C extends RpcGateway, T extends RpcEndpoint<C>> implements InvocationHandler, RpcGateway, MainThreadExecutor, StartStoppable {
    +
    +		private final T rpcEndpoint;
    +
    +		/** default timeout for asks */
    +		private final Timeout timeout;
    +
    +		private final String address;
    +
    +		private TestingSerialInvocationHandler(String address, T rpcEndpoint) {
    +			this(address, rpcEndpoint, new Timeout(new FiniteDuration(10, TimeUnit.SECONDS)));
    +		}
    +
    +		private TestingSerialInvocationHandler(String address, T rpcEndpoint, Timeout timeout) {
    +			this.rpcEndpoint = rpcEndpoint;
    +			this.timeout = timeout;
    +			this.address = address;
    +		}
    +
    +		@Override
    +		public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    +			Class<?> declaringClass = method.getDeclaringClass();
    +			if (declaringClass.equals(MainThreadExecutor.class) ||
    +				declaringClass.equals(Object.class) || declaringClass.equals(StartStoppable.class) ||
    +				declaringClass.equals(RpcGateway.class)) {
    +				return method.invoke(this, args);
    +			} else {
    +				final String methodName = method.getName();
    +				Class<?>[] parameterTypes = method.getParameterTypes();
    +				Annotation[][] parameterAnnotations = method.getParameterAnnotations();
    +				Timeout futureTimeout = extractRpcTimeout(parameterAnnotations, args, timeout);
    +
    +				final Tuple2<Class<?>[], Object[]> filteredArguments = filterArguments(
    +					parameterTypes,
    +					parameterAnnotations,
    +					args);
    +
    +				Class<?> returnType = method.getReturnType();
    +
    +				if (returnType.equals(Future.class)) {
    +					try {
    +						Object result = handleRpcInvocationSync(methodName, filteredArguments.f0, filteredArguments.f1, futureTimeout);
    +						return Futures.successful(result);
    +					} catch (Throwable e) {
    +						return Futures.failed(e);
    +					}
    +				} else {
    +					return handleRpcInvocationSync(methodName, filteredArguments.f0, filteredArguments.f1, futureTimeout);
    +				}
    +			}
    +		}
    +
    +		/**
    +		 * Handle rpc invocations by looking up the rpc method on the rpc endpoint and calling this
    +		 * method with the provided method arguments. If the method has a return value, it is returned
    +		 * to the sender of the call.
    +		 */
    +		private Object handleRpcInvocationSync(final String methodName,
    +			final Class<?>[] parameterTypes,
    +			final Object[] args,
    +			final Timeout futureTimeout) throws Exception {
    +			final Method rpcMethod = lookupRpcMethod(methodName, parameterTypes);
    +			Object result = rpcMethod.invoke(rpcEndpoint, args);
    +
    +			if (result != null && result instanceof Future) {
    +				Future<?> future = (Future<?>) result;
    +				return Await.result(future, futureTimeout.duration());
    +			} else {
    +				return result;
    +			}
    +		}
    +
    +		@Override
    +		public void runAsync(Runnable runnable) {
    +			runnable.run();
    +		}
    +
    +		@Override
    +		public <V> Future<V> callAsync(Callable<V> callable, Timeout callTimeout) {
    +			try {
    +				TimeUnit.MILLISECONDS.sleep(callTimeout.duration().toMillis());
    +				return Futures.successful(callable.call());
    +			} catch (Throwable e) {
    +				return Futures.failed(e);
    +			}
    +		}
    +
    +		@Override
    +		public void scheduleRunAsync(final Runnable runnable, final long delay) {
    +			try {
    +				TimeUnit.MILLISECONDS.sleep(delay);
    +				runnable.run();
    +			} catch (Throwable e) {
    +				throw new RuntimeException(e);
    +			}
    +		}
    +
    +		@Override
    +		public void start() {
    --- End diff --
    
    start method in RpcEndpoint class use ((StartStoppable)self).start(), so StartStoppable need to be implemented.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---