You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by az...@apache.org on 2019/08/30 07:24:14 UTC

[flink] 01/02: [FLINK-13819][coordination] Introduce isRunning state for RpcEndpoint

This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1a837659b71dd02ef6775bd2a4de331aab3ddc2e
Author: Andrey Zagrebin <az...@gmail.com>
AuthorDate: Wed Aug 21 11:19:27 2019 +0200

    [FLINK-13819][coordination] Introduce isRunning state for RpcEndpoint
    
    To better reflect the lifecycle of RpcEndpoint, we suggest to introduce its running state.
    We can use the non-running state e.g. to make decision about how to react on API
    calls if it is already known that the RpcEndpoint is terminating.
---
 .../org/apache/flink/runtime/rpc/RpcEndpoint.java  | 85 ++++++++++++++++++++--
 .../flink/runtime/rpc/akka/AkkaRpcActor.java       |  4 +-
 .../apache/flink/runtime/rpc/RpcEndpointTest.java  | 72 ++++++++++++++++++
 3 files changed, 153 insertions(+), 8 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
index 5c14a54..7c7a4c1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
@@ -56,6 +56,36 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  *
  * <p>The RPC endpoint provides {@link #runAsync(Runnable)}, {@link #callAsync(Callable, Time)}
  * and the {@link #getMainThreadExecutor()} to execute code in the RPC endpoint's main thread.
+ *
+ * <h1>Lifecycle</h1>
+ *
+ * <p>The RPC endpoint has the following stages:
+ * <ul>
+ *    <li>
+ *        The RPC endpoint is created in a non-running state and does not serve any RPC requests.
+ *    </li>
+ *    <li>
+ *        Calling the {@link #start()} method triggers the start of the RPC endpoint and schedules overridable
+ *        {@link #onStart()} method call to the main thread.
+ *    </li>
+ *    <li>
+ *        When the start operation ends the RPC endpoint is moved to the running state
+ *        and starts to serve and complete RPC requests.
+ *    </li>
+ *    <li>
+ *        Calling the {@link #closeAsync()} method triggers the termination of the RPC endpoint and schedules overridable
+ *        {@link #onStop()} method call to the main thread.
+ *    </li>
+ *    <li>
+ *        When {@link #onStop()} method is called, it triggers an asynchronous stop operation.
+ *        The RPC endpoint is not in the running state anymore but it continues to serve RPC requests.
+ *    </li>
+ *    <li>
+ *        When the asynchronous stop operation ends, the RPC endpoint terminates completely
+ *        and does not serve RPC requests anymore.
+ *    </li>
+ * </ul>
+ * The running state can be queried in a RPC method handler or in the main thread by calling {@link #isRunning()} method.
  */
 public abstract class RpcEndpoint implements RpcGateway, AutoCloseableAsync {
 
@@ -80,6 +110,13 @@ public abstract class RpcEndpoint implements RpcGateway, AutoCloseableAsync {
 	private final MainThreadExecutor mainThreadExecutor;
 
 	/**
+	 * Indicates whether the RPC endpoint is started and not stopped or being stopped.
+	 *
+	 * <p>IMPORTANT: the running state is not thread safe and can be used only in the main thread of the rpc endpoint.
+	 */
+	private boolean isRunning;
+
+	/**
 	 * Initializes the RPC endpoint.
 	 *
 	 * @param rpcService The RPC server that dispatches calls to this RPC endpoint.
@@ -112,12 +149,22 @@ public abstract class RpcEndpoint implements RpcGateway, AutoCloseableAsync {
 		return endpointId;
 	}
 
+	/**
+	 * Returns whether the RPC endpoint is started and not stopped or being stopped.
+	 *
+	 * @return whether the RPC endpoint is started and not stopped or being stopped.
+	 */
+	protected boolean isRunning() {
+		validateRunsInMainThread();
+		return isRunning;
+	}
+
 	// ------------------------------------------------------------------------
 	//  Start & shutdown & lifecycle callbacks
 	// ------------------------------------------------------------------------
 
 	/**
-	 * Starts the rpc endpoint. This tells the underlying rpc server that the rpc endpoint is ready
+	 * Triggers start of the rpc endpoint. This tells the underlying rpc server that the rpc endpoint is ready
 	 * to process remote procedure calls.
 	 *
 	 * @throws Exception indicating that something went wrong while starting the RPC endpoint
@@ -127,20 +174,33 @@ public abstract class RpcEndpoint implements RpcGateway, AutoCloseableAsync {
 	}
 
 	/**
-	 * User overridable callback.
+	 * Internal method which is called by the RpcService implementation to start the RpcEndpoint.
+	 *
+	 * @throws Exception indicating that the rpc endpoint could not be started. If an exception occurs,
+	 * then the rpc endpoint will automatically terminate.
+	 */
+	public final void internalCallOnStart() throws Exception {
+		validateRunsInMainThread();
+		isRunning = true;
+		onStart();
+	}
+
+	/**
+	 * User overridable callback which is called from {@link #internalCallOnStart()}.
 	 *
 	 * <p>This method is called when the RpcEndpoint is being started. The method is guaranteed
 	 * to be executed in the main thread context and can be used to start the rpc endpoint in the
 	 * context of the rpc endpoint's main thread.
 	 *
 	 * <p>IMPORTANT: This method should never be called directly by the user.
+	 *
 	 * @throws Exception indicating that the rpc endpoint could not be started. If an exception occurs,
 	 * then the rpc endpoint will automatically terminate.
 	 */
-	public void onStart() throws Exception {}
+	protected void onStart() throws Exception {}
 
 	/**
-	 * Stops the rpc endpoint. This tells the underlying rpc server that the rpc endpoint is
+	 * Triggers stop of the rpc endpoint. This tells the underlying rpc server that the rpc endpoint is
 	 * no longer ready to process remote procedure calls.
 	 */
 	protected final void stop() {
@@ -148,7 +208,20 @@ public abstract class RpcEndpoint implements RpcGateway, AutoCloseableAsync {
 	}
 
 	/**
-	 * User overridable callback.
+	 * Internal method which is called by the RpcService implementation to stop the RpcEndpoint.
+	 *
+	 * @return Future which is completed once all post stop actions are completed. If an error
+	 * occurs this future is completed exceptionally
+	 */
+	public final CompletableFuture<Void> internalCallOnStop() {
+		validateRunsInMainThread();
+		CompletableFuture<Void> stopFuture = onStop();
+		isRunning = false;
+		return stopFuture;
+	}
+
+	/**
+	 * User overridable callback which is called from {@link #internalCallOnStop()}.
 	 *
 	 * <p>This method is called when the RpcEndpoint is being shut down. The method is guaranteed
 	 * to be executed in the main thread context and can be used to clean up internal state.
@@ -158,7 +231,7 @@ public abstract class RpcEndpoint implements RpcGateway, AutoCloseableAsync {
 	 * @return Future which is completed once all post stop actions are completed. If an error
 	 * occurs this future is completed exceptionally
 	 */
-	public CompletableFuture<Void> onStop() {
+	protected CompletableFuture<Void> onStop() {
 		return CompletableFuture.completedFuture(null);
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
index 71de3a8..25fb38b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
@@ -506,7 +506,7 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends AbstractActor {
 
 			CompletableFuture<Void> terminationFuture;
 			try {
-				terminationFuture = akkaRpcActor.rpcEndpoint.onStop();
+				terminationFuture = akkaRpcActor.rpcEndpoint.internalCallOnStop();
 			} catch (Throwable t) {
 				terminationFuture = FutureUtils.completedExceptionally(
 					new AkkaRpcException(
@@ -541,7 +541,7 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends AbstractActor {
 			akkaRpcActor.mainThreadValidator.enterMainThread();
 
 			try {
-				akkaRpcActor.rpcEndpoint.onStart();
+				akkaRpcActor.rpcEndpoint.internalCallOnStart();
 			} catch (Throwable throwable) {
 				akkaRpcActor.stop(
 					RpcEndpointTerminationResult.failure(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
index 31b5407..532e9f5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
@@ -33,9 +33,14 @@ import org.junit.Test;
 
 import java.util.Arrays;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
+import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 
 /**
@@ -135,6 +140,42 @@ public class RpcEndpointTest extends TestLogger {
 		}
 	}
 
+	/**
+	 * Tests that the RPC is running after it has been started.
+	 */
+	@Test
+	public void testRunningState() throws InterruptedException, ExecutionException, TimeoutException {
+		RunningStateTestingEndpoint endpoint = new RunningStateTestingEndpoint(
+			rpcService,
+			CompletableFuture.completedFuture(null));
+		RunningStateTestingEndpointGateway gateway = endpoint.getSelfGateway(RunningStateTestingEndpointGateway.class);
+
+		try {
+			endpoint.start();
+			assertThat(gateway.queryIsRunningFlag().get(), is(true));
+		} finally {
+			RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+		}
+	}
+
+	/**
+	 * Tests that the RPC is not running if it is being stopped.
+	 */
+	@Test
+	public void testNotRunningState() throws InterruptedException, ExecutionException, TimeoutException {
+		CompletableFuture<Void> stopFuture = new CompletableFuture<>();
+		RunningStateTestingEndpoint endpoint = new RunningStateTestingEndpoint(rpcService, stopFuture);
+		RunningStateTestingEndpointGateway gateway = endpoint.getSelfGateway(RunningStateTestingEndpointGateway.class);
+
+		endpoint.start();
+		CompletableFuture<Void> terminationFuture = endpoint.closeAndWaitUntilOnStopCalled();
+
+		assertThat(gateway.queryIsRunningFlag().get(), is(false));
+
+		stopFuture.complete(null);
+		terminationFuture.get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
+	}
+
 	public interface BaseGateway extends RpcGateway {
 		CompletableFuture<Integer> foobar();
 	}
@@ -186,4 +227,35 @@ public class RpcEndpointTest extends TestLogger {
 			return CompletableFuture.completedFuture(fooString);
 		}
 	}
+
+	public interface RunningStateTestingEndpointGateway extends RpcGateway {
+		CompletableFuture<Boolean> queryIsRunningFlag();
+	}
+
+	private static final class RunningStateTestingEndpoint extends RpcEndpoint implements RunningStateTestingEndpointGateway {
+		private final CountDownLatch onStopCalled;
+		private final CompletableFuture<Void> stopFuture;
+
+		RunningStateTestingEndpoint(RpcService rpcService, CompletableFuture<Void> stopFuture) {
+			super(rpcService);
+			this.stopFuture = stopFuture;
+			this.onStopCalled = new CountDownLatch(1);
+		}
+
+		@Override
+		public CompletableFuture<Void> onStop() {
+			onStopCalled.countDown();
+			return stopFuture;
+		}
+
+		CompletableFuture<Void> closeAndWaitUntilOnStopCalled() throws InterruptedException {
+			CompletableFuture<Void> terminationFuture = closeAsync();
+			onStopCalled.await();
+			return terminationFuture;
+		}
+
+		public CompletableFuture<Boolean> queryIsRunningFlag() {
+			return CompletableFuture.completedFuture(isRunning());
+		}
+	}
 }