You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by az...@apache.org on 2019/08/30 07:24:14 UTC
[flink] 01/02: [FLINK-13819][coordination] Introduce isRunning
state for RpcEndpoint
This is an automated email from the ASF dual-hosted git repository.
azagrebin pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 1a837659b71dd02ef6775bd2a4de331aab3ddc2e
Author: Andrey Zagrebin <az...@gmail.com>
AuthorDate: Wed Aug 21 11:19:27 2019 +0200
[FLINK-13819][coordination] Introduce isRunning state for RpcEndpoint
To better reflect the lifecycle of RpcEndpoint, we suggest to introduce its running state.
We can use the non-running state e.g. to make decision about how to react on API
calls if it is already known that the RpcEndpoint is terminating.
---
.../org/apache/flink/runtime/rpc/RpcEndpoint.java | 85 ++++++++++++++++++++--
.../flink/runtime/rpc/akka/AkkaRpcActor.java | 4 +-
.../apache/flink/runtime/rpc/RpcEndpointTest.java | 72 ++++++++++++++++++
3 files changed, 153 insertions(+), 8 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
index 5c14a54..7c7a4c1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
@@ -56,6 +56,36 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*
* <p>The RPC endpoint provides {@link #runAsync(Runnable)}, {@link #callAsync(Callable, Time)}
* and the {@link #getMainThreadExecutor()} to execute code in the RPC endpoint's main thread.
+ *
+ * <h1>Lifecycle</h1>
+ *
+ * <p>The RPC endpoint has the following stages:
+ * <ul>
+ * <li>
+ * The RPC endpoint is created in a non-running state and does not serve any RPC requests.
+ * </li>
+ * <li>
+ * Calling the {@link #start()} method triggers the start of the RPC endpoint and schedules overridable
+ * {@link #onStart()} method call to the main thread.
+ * </li>
+ * <li>
+ * When the start operation ends the RPC endpoint is moved to the running state
+ * and starts to serve and complete RPC requests.
+ * </li>
+ * <li>
+ * Calling the {@link #closeAsync()} method triggers the termination of the RPC endpoint and schedules overridable
+ * {@link #onStop()} method call to the main thread.
+ * </li>
+ * <li>
+ * When {@link #onStop()} method is called, it triggers an asynchronous stop operation.
+ * The RPC endpoint is not in the running state anymore but it continues to serve RPC requests.
+ * </li>
+ * <li>
+ * When the asynchronous stop operation ends, the RPC endpoint terminates completely
+ * and does not serve RPC requests anymore.
+ * </li>
+ * </ul>
+ * The running state can be queried in a RPC method handler or in the main thread by calling {@link #isRunning()} method.
*/
public abstract class RpcEndpoint implements RpcGateway, AutoCloseableAsync {
@@ -80,6 +110,13 @@ public abstract class RpcEndpoint implements RpcGateway, AutoCloseableAsync {
private final MainThreadExecutor mainThreadExecutor;
/**
+ * Indicates whether the RPC endpoint is started and not stopped or being stopped.
+ *
+ * <p>IMPORTANT: the running state is not thread safe and can be used only in the main thread of the rpc endpoint.
+ */
+ private boolean isRunning;
+
+ /**
* Initializes the RPC endpoint.
*
* @param rpcService The RPC server that dispatches calls to this RPC endpoint.
@@ -112,12 +149,22 @@ public abstract class RpcEndpoint implements RpcGateway, AutoCloseableAsync {
return endpointId;
}
+ /**
+ * Returns whether the RPC endpoint is started and not stopped or being stopped.
+ *
+ * @return whether the RPC endpoint is started and not stopped or being stopped.
+ */
+ protected boolean isRunning() {
+ validateRunsInMainThread();
+ return isRunning;
+ }
+
// ------------------------------------------------------------------------
// Start & shutdown & lifecycle callbacks
// ------------------------------------------------------------------------
/**
- * Starts the rpc endpoint. This tells the underlying rpc server that the rpc endpoint is ready
+ * Triggers start of the rpc endpoint. This tells the underlying rpc server that the rpc endpoint is ready
* to process remote procedure calls.
*
* @throws Exception indicating that something went wrong while starting the RPC endpoint
@@ -127,20 +174,33 @@ public abstract class RpcEndpoint implements RpcGateway, AutoCloseableAsync {
}
/**
- * User overridable callback.
+ * Internal method which is called by the RpcService implementation to start the RpcEndpoint.
+ *
+ * @throws Exception indicating that the rpc endpoint could not be started. If an exception occurs,
+ * then the rpc endpoint will automatically terminate.
+ */
+ public final void internalCallOnStart() throws Exception {
+ validateRunsInMainThread();
+ isRunning = true;
+ onStart();
+ }
+
+ /**
+ * User overridable callback which is called from {@link #internalCallOnStart()}.
*
* <p>This method is called when the RpcEndpoint is being started. The method is guaranteed
* to be executed in the main thread context and can be used to start the rpc endpoint in the
* context of the rpc endpoint's main thread.
*
* <p>IMPORTANT: This method should never be called directly by the user.
+ *
* @throws Exception indicating that the rpc endpoint could not be started. If an exception occurs,
* then the rpc endpoint will automatically terminate.
*/
- public void onStart() throws Exception {}
+ protected void onStart() throws Exception {}
/**
- * Stops the rpc endpoint. This tells the underlying rpc server that the rpc endpoint is
+ * Triggers stop of the rpc endpoint. This tells the underlying rpc server that the rpc endpoint is
* no longer ready to process remote procedure calls.
*/
protected final void stop() {
@@ -148,7 +208,20 @@ public abstract class RpcEndpoint implements RpcGateway, AutoCloseableAsync {
}
/**
- * User overridable callback.
+ * Internal method which is called by the RpcService implementation to stop the RpcEndpoint.
+ *
+ * @return Future which is completed once all post stop actions are completed. If an error
+ * occurs this future is completed exceptionally
+ */
+ public final CompletableFuture<Void> internalCallOnStop() {
+ validateRunsInMainThread();
+ CompletableFuture<Void> stopFuture = onStop();
+ isRunning = false;
+ return stopFuture;
+ }
+
+ /**
+ * User overridable callback which is called from {@link #internalCallOnStop()}.
*
* <p>This method is called when the RpcEndpoint is being shut down. The method is guaranteed
* to be executed in the main thread context and can be used to clean up internal state.
@@ -158,7 +231,7 @@ public abstract class RpcEndpoint implements RpcGateway, AutoCloseableAsync {
* @return Future which is completed once all post stop actions are completed. If an error
* occurs this future is completed exceptionally
*/
- public CompletableFuture<Void> onStop() {
+ protected CompletableFuture<Void> onStop() {
return CompletableFuture.completedFuture(null);
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
index 71de3a8..25fb38b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
@@ -506,7 +506,7 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends AbstractActor {
CompletableFuture<Void> terminationFuture;
try {
- terminationFuture = akkaRpcActor.rpcEndpoint.onStop();
+ terminationFuture = akkaRpcActor.rpcEndpoint.internalCallOnStop();
} catch (Throwable t) {
terminationFuture = FutureUtils.completedExceptionally(
new AkkaRpcException(
@@ -541,7 +541,7 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends AbstractActor {
akkaRpcActor.mainThreadValidator.enterMainThread();
try {
- akkaRpcActor.rpcEndpoint.onStart();
+ akkaRpcActor.rpcEndpoint.internalCallOnStart();
} catch (Throwable throwable) {
akkaRpcActor.stop(
RpcEndpointTerminationResult.failure(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
index 31b5407..532e9f5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
@@ -33,9 +33,14 @@ import org.junit.Test;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
/**
@@ -135,6 +140,42 @@ public class RpcEndpointTest extends TestLogger {
}
}
+ /**
+ * Tests that the RPC is running after it has been started.
+ */
+ @Test
+ public void testRunningState() throws InterruptedException, ExecutionException, TimeoutException {
+ RunningStateTestingEndpoint endpoint = new RunningStateTestingEndpoint(
+ rpcService,
+ CompletableFuture.completedFuture(null));
+ RunningStateTestingEndpointGateway gateway = endpoint.getSelfGateway(RunningStateTestingEndpointGateway.class);
+
+ try {
+ endpoint.start();
+ assertThat(gateway.queryIsRunningFlag().get(), is(true));
+ } finally {
+ RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+ }
+ }
+
+ /**
+ * Tests that the RPC is not running if it is being stopped.
+ */
+ @Test
+ public void testNotRunningState() throws InterruptedException, ExecutionException, TimeoutException {
+ CompletableFuture<Void> stopFuture = new CompletableFuture<>();
+ RunningStateTestingEndpoint endpoint = new RunningStateTestingEndpoint(rpcService, stopFuture);
+ RunningStateTestingEndpointGateway gateway = endpoint.getSelfGateway(RunningStateTestingEndpointGateway.class);
+
+ endpoint.start();
+ CompletableFuture<Void> terminationFuture = endpoint.closeAndWaitUntilOnStopCalled();
+
+ assertThat(gateway.queryIsRunningFlag().get(), is(false));
+
+ stopFuture.complete(null);
+ terminationFuture.get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
+ }
+
public interface BaseGateway extends RpcGateway {
CompletableFuture<Integer> foobar();
}
@@ -186,4 +227,35 @@ public class RpcEndpointTest extends TestLogger {
return CompletableFuture.completedFuture(fooString);
}
}
+
+ public interface RunningStateTestingEndpointGateway extends RpcGateway {
+ CompletableFuture<Boolean> queryIsRunningFlag();
+ }
+
+ private static final class RunningStateTestingEndpoint extends RpcEndpoint implements RunningStateTestingEndpointGateway {
+ private final CountDownLatch onStopCalled;
+ private final CompletableFuture<Void> stopFuture;
+
+ RunningStateTestingEndpoint(RpcService rpcService, CompletableFuture<Void> stopFuture) {
+ super(rpcService);
+ this.stopFuture = stopFuture;
+ this.onStopCalled = new CountDownLatch(1);
+ }
+
+ @Override
+ public CompletableFuture<Void> onStop() {
+ onStopCalled.countDown();
+ return stopFuture;
+ }
+
+ CompletableFuture<Void> closeAndWaitUntilOnStopCalled() throws InterruptedException {
+ CompletableFuture<Void> terminationFuture = closeAsync();
+ onStopCalled.await();
+ return terminationFuture;
+ }
+
+ public CompletableFuture<Boolean> queryIsRunningFlag() {
+ return CompletableFuture.completedFuture(isRunning());
+ }
+ }
}