You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by az...@apache.org on 2019/08/30 07:24:13 UTC

[flink] branch release-1.9 updated (c175cc4 -> b682e9a)

This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a change to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from c175cc4  [hotfix][FLINK-13901][docs] Fix documentation links check errors
     new 1a83765  [FLINK-13819][coordination] Introduce isRunning state for RpcEndpoint
     new b682e9a  [FLINK-13769][Coordination] Close RM connection in TaskExecutor.onStop and do not reconnect

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/flink/runtime/rpc/RpcEndpoint.java  | 85 ++++++++++++++++++++--
 .../flink/runtime/rpc/akka/AkkaRpcActor.java       |  4 +-
 .../flink/runtime/taskexecutor/TaskExecutor.java   | 14 ++--
 .../apache/flink/runtime/rpc/RpcEndpointTest.java  | 72 ++++++++++++++++++
 4 files changed, 160 insertions(+), 15 deletions(-)


[flink] 01/02: [FLINK-13819][coordination] Introduce isRunning state for RpcEndpoint

Posted by az...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1a837659b71dd02ef6775bd2a4de331aab3ddc2e
Author: Andrey Zagrebin <az...@gmail.com>
AuthorDate: Wed Aug 21 11:19:27 2019 +0200

    [FLINK-13819][coordination] Introduce isRunning state for RpcEndpoint
    
    To better reflect the lifecycle of RpcEndpoint, we suggest to introduce its running state.
    We can use the non-running state e.g. to make decision about how to react on API
    calls if it is already known that the RpcEndpoint is terminating.
---
 .../org/apache/flink/runtime/rpc/RpcEndpoint.java  | 85 ++++++++++++++++++++--
 .../flink/runtime/rpc/akka/AkkaRpcActor.java       |  4 +-
 .../apache/flink/runtime/rpc/RpcEndpointTest.java  | 72 ++++++++++++++++++
 3 files changed, 153 insertions(+), 8 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
index 5c14a54..7c7a4c1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
@@ -56,6 +56,36 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  *
  * <p>The RPC endpoint provides {@link #runAsync(Runnable)}, {@link #callAsync(Callable, Time)}
  * and the {@link #getMainThreadExecutor()} to execute code in the RPC endpoint's main thread.
+ *
+ * <h1>Lifecycle</h1>
+ *
+ * <p>The RPC endpoint has the following stages:
+ * <ul>
+ *    <li>
+ *        The RPC endpoint is created in a non-running state and does not serve any RPC requests.
+ *    </li>
+ *    <li>
+ *        Calling the {@link #start()} method triggers the start of the RPC endpoint and schedules overridable
+ *        {@link #onStart()} method call to the main thread.
+ *    </li>
+ *    <li>
+ *        When the start operation ends the RPC endpoint is moved to the running state
+ *        and starts to serve and complete RPC requests.
+ *    </li>
+ *    <li>
+ *        Calling the {@link #closeAsync()} method triggers the termination of the RPC endpoint and schedules overridable
+ *        {@link #onStop()} method call to the main thread.
+ *    </li>
+ *    <li>
+ *        When {@link #onStop()} method is called, it triggers an asynchronous stop operation.
+ *        The RPC endpoint is not in the running state anymore but it continues to serve RPC requests.
+ *    </li>
+ *    <li>
+ *        When the asynchronous stop operation ends, the RPC endpoint terminates completely
+ *        and does not serve RPC requests anymore.
+ *    </li>
+ * </ul>
+ * The running state can be queried in a RPC method handler or in the main thread by calling {@link #isRunning()} method.
  */
 public abstract class RpcEndpoint implements RpcGateway, AutoCloseableAsync {
 
@@ -80,6 +110,13 @@ public abstract class RpcEndpoint implements RpcGateway, AutoCloseableAsync {
 	private final MainThreadExecutor mainThreadExecutor;
 
 	/**
+	 * Indicates whether the RPC endpoint is started and not stopped or being stopped.
+	 *
+	 * <p>IMPORTANT: the running state is not thread safe and can be used only in the main thread of the rpc endpoint.
+	 */
+	private boolean isRunning;
+
+	/**
 	 * Initializes the RPC endpoint.
 	 *
 	 * @param rpcService The RPC server that dispatches calls to this RPC endpoint.
@@ -112,12 +149,22 @@ public abstract class RpcEndpoint implements RpcGateway, AutoCloseableAsync {
 		return endpointId;
 	}
 
+	/**
+	 * Returns whether the RPC endpoint is started and not stopped or being stopped.
+	 *
+	 * @return whether the RPC endpoint is started and not stopped or being stopped.
+	 */
+	protected boolean isRunning() {
+		validateRunsInMainThread();
+		return isRunning;
+	}
+
 	// ------------------------------------------------------------------------
 	//  Start & shutdown & lifecycle callbacks
 	// ------------------------------------------------------------------------
 
 	/**
-	 * Starts the rpc endpoint. This tells the underlying rpc server that the rpc endpoint is ready
+	 * Triggers start of the rpc endpoint. This tells the underlying rpc server that the rpc endpoint is ready
 	 * to process remote procedure calls.
 	 *
 	 * @throws Exception indicating that something went wrong while starting the RPC endpoint
@@ -127,20 +174,33 @@ public abstract class RpcEndpoint implements RpcGateway, AutoCloseableAsync {
 	}
 
 	/**
-	 * User overridable callback.
+	 * Internal method which is called by the RpcService implementation to start the RpcEndpoint.
+	 *
+	 * @throws Exception indicating that the rpc endpoint could not be started. If an exception occurs,
+	 * then the rpc endpoint will automatically terminate.
+	 */
+	public final void internalCallOnStart() throws Exception {
+		validateRunsInMainThread();
+		isRunning = true;
+		onStart();
+	}
+
+	/**
+	 * User overridable callback which is called from {@link #internalCallOnStart()}.
 	 *
 	 * <p>This method is called when the RpcEndpoint is being started. The method is guaranteed
 	 * to be executed in the main thread context and can be used to start the rpc endpoint in the
 	 * context of the rpc endpoint's main thread.
 	 *
 	 * <p>IMPORTANT: This method should never be called directly by the user.
+	 *
 	 * @throws Exception indicating that the rpc endpoint could not be started. If an exception occurs,
 	 * then the rpc endpoint will automatically terminate.
 	 */
-	public void onStart() throws Exception {}
+	protected void onStart() throws Exception {}
 
 	/**
-	 * Stops the rpc endpoint. This tells the underlying rpc server that the rpc endpoint is
+	 * Triggers stop of the rpc endpoint. This tells the underlying rpc server that the rpc endpoint is
 	 * no longer ready to process remote procedure calls.
 	 */
 	protected final void stop() {
@@ -148,7 +208,20 @@ public abstract class RpcEndpoint implements RpcGateway, AutoCloseableAsync {
 	}
 
 	/**
-	 * User overridable callback.
+	 * Internal method which is called by the RpcService implementation to stop the RpcEndpoint.
+	 *
+	 * @return Future which is completed once all post stop actions are completed. If an error
+	 * occurs this future is completed exceptionally
+	 */
+	public final CompletableFuture<Void> internalCallOnStop() {
+		validateRunsInMainThread();
+		CompletableFuture<Void> stopFuture = onStop();
+		isRunning = false;
+		return stopFuture;
+	}
+
+	/**
+	 * User overridable callback which is called from {@link #internalCallOnStop()}.
 	 *
 	 * <p>This method is called when the RpcEndpoint is being shut down. The method is guaranteed
 	 * to be executed in the main thread context and can be used to clean up internal state.
@@ -158,7 +231,7 @@ public abstract class RpcEndpoint implements RpcGateway, AutoCloseableAsync {
 	 * @return Future which is completed once all post stop actions are completed. If an error
 	 * occurs this future is completed exceptionally
 	 */
-	public CompletableFuture<Void> onStop() {
+	protected CompletableFuture<Void> onStop() {
 		return CompletableFuture.completedFuture(null);
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
index 71de3a8..25fb38b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
@@ -506,7 +506,7 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends AbstractActor {
 
 			CompletableFuture<Void> terminationFuture;
 			try {
-				terminationFuture = akkaRpcActor.rpcEndpoint.onStop();
+				terminationFuture = akkaRpcActor.rpcEndpoint.internalCallOnStop();
 			} catch (Throwable t) {
 				terminationFuture = FutureUtils.completedExceptionally(
 					new AkkaRpcException(
@@ -541,7 +541,7 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends AbstractActor {
 			akkaRpcActor.mainThreadValidator.enterMainThread();
 
 			try {
-				akkaRpcActor.rpcEndpoint.onStart();
+				akkaRpcActor.rpcEndpoint.internalCallOnStart();
 			} catch (Throwable throwable) {
 				akkaRpcActor.stop(
 					RpcEndpointTerminationResult.failure(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
index 31b5407..532e9f5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
@@ -33,9 +33,14 @@ import org.junit.Test;
 
 import java.util.Arrays;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
+import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 
 /**
@@ -135,6 +140,42 @@ public class RpcEndpointTest extends TestLogger {
 		}
 	}
 
+	/**
+	 * Tests that the RPC is running after it has been started.
+	 */
+	@Test
+	public void testRunningState() throws InterruptedException, ExecutionException, TimeoutException {
+		RunningStateTestingEndpoint endpoint = new RunningStateTestingEndpoint(
+			rpcService,
+			CompletableFuture.completedFuture(null));
+		RunningStateTestingEndpointGateway gateway = endpoint.getSelfGateway(RunningStateTestingEndpointGateway.class);
+
+		try {
+			endpoint.start();
+			assertThat(gateway.queryIsRunningFlag().get(), is(true));
+		} finally {
+			RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+		}
+	}
+
+	/**
+	 * Tests that the RPC is not running if it is being stopped.
+	 */
+	@Test
+	public void testNotRunningState() throws InterruptedException, ExecutionException, TimeoutException {
+		CompletableFuture<Void> stopFuture = new CompletableFuture<>();
+		RunningStateTestingEndpoint endpoint = new RunningStateTestingEndpoint(rpcService, stopFuture);
+		RunningStateTestingEndpointGateway gateway = endpoint.getSelfGateway(RunningStateTestingEndpointGateway.class);
+
+		endpoint.start();
+		CompletableFuture<Void> terminationFuture = endpoint.closeAndWaitUntilOnStopCalled();
+
+		assertThat(gateway.queryIsRunningFlag().get(), is(false));
+
+		stopFuture.complete(null);
+		terminationFuture.get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
+	}
+
 	public interface BaseGateway extends RpcGateway {
 		CompletableFuture<Integer> foobar();
 	}
@@ -186,4 +227,35 @@ public class RpcEndpointTest extends TestLogger {
 			return CompletableFuture.completedFuture(fooString);
 		}
 	}
+
+	public interface RunningStateTestingEndpointGateway extends RpcGateway {
+		CompletableFuture<Boolean> queryIsRunningFlag();
+	}
+
+	private static final class RunningStateTestingEndpoint extends RpcEndpoint implements RunningStateTestingEndpointGateway {
+		private final CountDownLatch onStopCalled;
+		private final CompletableFuture<Void> stopFuture;
+
+		RunningStateTestingEndpoint(RpcService rpcService, CompletableFuture<Void> stopFuture) {
+			super(rpcService);
+			this.stopFuture = stopFuture;
+			this.onStopCalled = new CountDownLatch(1);
+		}
+
+		@Override
+		public CompletableFuture<Void> onStop() {
+			onStopCalled.countDown();
+			return stopFuture;
+		}
+
+		CompletableFuture<Void> closeAndWaitUntilOnStopCalled() throws InterruptedException {
+			CompletableFuture<Void> terminationFuture = closeAsync();
+			onStopCalled.await();
+			return terminationFuture;
+		}
+
+		public CompletableFuture<Boolean> queryIsRunningFlag() {
+			return CompletableFuture.completedFuture(isRunning());
+		}
+	}
 }


[flink] 02/02: [FLINK-13769][Coordination] Close RM connection in TaskExecutor.onStop and do not reconnect

Posted by az...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b682e9a316669f30fa4bfcaa32a8fa0d3ac1dc02
Author: Andrey Zagrebin <az...@gmail.com>
AuthorDate: Mon Aug 19 16:20:39 2019 +0200

    [FLINK-13769][Coordination] Close RM connection in TaskExecutor.onStop and do not reconnect
    
    This prevents JM from acquiring slots which belong to the stopped TM.
---
 .../apache/flink/runtime/taskexecutor/TaskExecutor.java    | 14 +++++++-------
 1 file changed, 7 insertions(+), 7 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index c8bcaf9..b1238ec 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -340,11 +340,10 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 
 		Throwable jobManagerDisconnectThrowable = null;
 
-		if (resourceManagerConnection != null) {
-			resourceManagerConnection.close();
-		}
-
 		FlinkException cause = new FlinkException("The TaskExecutor is shutting down.");
+
+		closeResourceManagerConnection(cause);
+
 		for (JobManagerConnection jobManagerConnection : jobManagerConnections.values()) {
 			try {
 				disassociateFromJobManager(jobManagerConnection, cause);
@@ -958,7 +957,9 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 
 	@Override
 	public void disconnectResourceManager(Exception cause) {
-		reconnectToResourceManager(cause);
+		if (isRunning()) {
+			reconnectToResourceManager(cause);
+		}
 	}
 
 	// ======================================================================
@@ -986,6 +987,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 
 	private void reconnectToResourceManager(Exception cause) {
 		closeResourceManagerConnection(cause);
+		startRegistrationTimeout();
 		tryConnectToResourceManager();
 	}
 
@@ -1098,8 +1100,6 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 			resourceManagerConnection.close();
 			resourceManagerConnection = null;
 		}
-
-		startRegistrationTimeout();
 	}
 
 	private void startRegistrationTimeout() {