You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2020/03/31 10:46:18 UTC

[flink] branch release-1.9 updated (9b2de89 -> b71c400)

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a change to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 9b2de89  [FLINK-16018] Increase default value of web.timeout to 10 minutes
     new 7840e60  [hotfix][rpc] Add proper error reporting to AkkaRpcActor#handleControlMessage
     new b71c400  [FLINK-16703][rpc] Set AkkaRpcActor state to TERMINATING when terminating

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/runtime/rpc/akka/AkkaRpcActor.java       | 64 +++++++++++++---------
 .../flink/runtime/rpc/akka/AkkaRpcActorTest.java   | 64 ++++++++++++++++++++++
 2 files changed, 101 insertions(+), 27 deletions(-)


[flink] 02/02: [FLINK-16703][rpc] Set AkkaRpcActor state to TERMINATING when terminating

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b71c4002d3942f48807d2b5c0ac64ee3e55b2f59
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Fri Mar 27 14:34:07 2020 +0100

    [FLINK-16703][rpc] Set AkkaRpcActor state to TERMINATING when terminating
    
    This commit fixes a bug where we did not update the state of the AkkaRpcActor
    in case of terminating it. Moreover, this commit fixes the problem that the
    onStop action could have been called multiple times. Last but not least, it
    changes the enum names of the state implementations for better diagnostics.
    
    This closes #11549.
---
 .../flink/runtime/rpc/akka/AkkaRpcActor.java       |  5 ++
 .../flink/runtime/rpc/akka/AkkaRpcActorTest.java   | 64 ++++++++++++++++++++++
 2 files changed, 69 insertions(+)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
index 0397172..7ad3776 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
@@ -578,6 +578,11 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends AbstractActor {
 		TERMINATING;
 
 		@Override
+		public State terminate(AkkaRpcActor<?> akkaRpcActor) {
+			return TERMINATING;
+		}
+
+		@Override
 		public boolean isRunning() {
 			return true;
 		}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
index c010810..c912c79 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
@@ -36,6 +36,7 @@ import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
 
+import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import org.hamcrest.core.Is;
 import org.junit.AfterClass;
@@ -397,6 +398,38 @@ public class AkkaRpcActorTest extends TestLogger {
 		}
 	}
 
+	/**
+	 * Tests that multiple termination calls won't trigger the onStop action multiple times.
+	 * Note that this test is a probabilistic test which only fails sometimes without the fix.
+	 * See FLINK-16703.
+	 */
+	@Test
+	public void callsOnStopOnlyOnce() throws Exception {
+		final CompletableFuture<Void> onStopFuture = new CompletableFuture<>();
+		final OnStopCountingRpcEndpoint endpoint = new OnStopCountingRpcEndpoint(akkaRpcService, onStopFuture);
+
+		try {
+			endpoint.start();
+
+			final AkkaBasedEndpoint selfGateway = endpoint.getSelfGateway(AkkaBasedEndpoint.class);
+
+			// try to terminate the actor twice
+			selfGateway.getActorRef().tell(ControlMessages.TERMINATE, ActorRef.noSender());
+			selfGateway.getActorRef().tell(ControlMessages.TERMINATE, ActorRef.noSender());
+
+			endpoint.waitUntilOnStopHasBeenCalled();
+
+			onStopFuture.complete(null);
+
+			endpoint.getTerminationFuture().get();
+
+			assertThat(endpoint.getNumOnStopCalls(), is(1));
+		} finally {
+			onStopFuture.complete(null);
+			RpcUtils.terminateRpcEndpoint(endpoint, timeout);
+		}
+	}
+
 	// ------------------------------------------------------------------------
 	//  Test Actors and Interfaces
 	// ------------------------------------------------------------------------
@@ -611,4 +644,35 @@ public class AkkaRpcActorTest extends TestLogger {
 			countDownLatch.await();
 		}
 	}
+
+	// ------------------------------------------------------------------------
+
+	private static final class OnStopCountingRpcEndpoint extends RpcEndpoint {
+
+		private final AtomicInteger numOnStopCalls = new AtomicInteger(0);
+
+		private final OneShotLatch onStopHasBeenCalled = new OneShotLatch();
+
+		private final CompletableFuture<Void> onStopFuture;
+
+		private OnStopCountingRpcEndpoint(RpcService rpcService, CompletableFuture<Void> onStopFuture) {
+			super(rpcService);
+			this.onStopFuture = onStopFuture;
+		}
+
+		@Override
+		protected CompletableFuture<Void> onStop() {
+			onStopHasBeenCalled.trigger();
+			numOnStopCalls.incrementAndGet();
+			return onStopFuture;
+		}
+
+		private int getNumOnStopCalls() {
+			return numOnStopCalls.get();
+		}
+
+		private void waitUntilOnStopHasBeenCalled() throws InterruptedException {
+			onStopHasBeenCalled.await();
+		}
+	}
 }


[flink] 01/02: [hotfix][rpc] Add proper error reporting to AkkaRpcActor#handleControlMessage

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7840e60b3c5f3caf63bb617a68857a6ce15c30aa
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Fri Mar 27 14:30:35 2020 +0100

    [hotfix][rpc] Add proper error reporting to AkkaRpcActor#handleControlMessage
---
 .../flink/runtime/rpc/akka/AkkaRpcActor.java       | 59 ++++++++++++----------
 1 file changed, 32 insertions(+), 27 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
index 25fb38b..0397172 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
@@ -119,7 +119,7 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends AbstractActor {
 		this.rpcEndpointTerminationResult = RpcEndpointTerminationResult.failure(
 			new AkkaRpcException(
 				String.format("RpcEndpoint %s has not been properly stopped.", rpcEndpoint.getEndpointId())));
-		this.state = StoppedState.INSTANCE;
+		this.state = StoppedState.STOPPED;
 	}
 
 	@Override
@@ -164,18 +164,23 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends AbstractActor {
 	}
 
 	private void handleControlMessage(ControlMessages controlMessage) {
-		switch (controlMessage) {
-			case START:
-				state = state.start(this);
-				break;
-			case STOP:
-				state = state.stop();
-				break;
-			case TERMINATE:
-				state.terminate(this);
-				break;
-			default:
-				handleUnknownControlMessage(controlMessage);
+		try {
+			switch (controlMessage) {
+				case START:
+					state = state.start(this);
+					break;
+				case STOP:
+					state = state.stop();
+					break;
+				case TERMINATE:
+					state = state.terminate(this);
+					break;
+				default:
+					handleUnknownControlMessage(controlMessage);
+			}
+		} catch (Exception e) {
+			this.rpcEndpointTerminationResult = RpcEndpointTerminationResult.failure(e);
+			throw e;
 		}
 	}
 
@@ -462,19 +467,19 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends AbstractActor {
 
 	interface State {
 		default State start(AkkaRpcActor<?> akkaRpcActor) {
-			throw new AkkaRpcInvalidStateException(invalidStateTransitionMessage(StartedState.INSTANCE));
+			throw new AkkaRpcInvalidStateException(invalidStateTransitionMessage(StartedState.STARTED));
 		}
 
 		default State stop() {
-			throw new AkkaRpcInvalidStateException(invalidStateTransitionMessage(StoppedState.INSTANCE));
+			throw new AkkaRpcInvalidStateException(invalidStateTransitionMessage(StoppedState.STOPPED));
 		}
 
 		default State terminate(AkkaRpcActor<?> akkaRpcActor) {
-			throw new AkkaRpcInvalidStateException(invalidStateTransitionMessage(TerminatingState.INSTANCE));
+			throw new AkkaRpcInvalidStateException(invalidStateTransitionMessage(TerminatingState.TERMINATING));
 		}
 
 		default State finishTermination() {
-			return TerminatedState.INSTANCE;
+			return TerminatedState.TERMINATED;
 		}
 
 		default boolean isRunning() {
@@ -488,16 +493,16 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends AbstractActor {
 
 	@SuppressWarnings("Singleton")
 	enum StartedState implements State {
-		INSTANCE;
+		STARTED;
 
 		@Override
 		public State start(AkkaRpcActor<?> akkaRpcActor) {
-			return INSTANCE;
+			return STARTED;
 		}
 
 		@Override
 		public State stop() {
-			return StoppedState.INSTANCE;
+			return StoppedState.STOPPED;
 		}
 
 		@Override
@@ -523,7 +528,7 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends AbstractActor {
 
 			terminationFuture.whenComplete((ignored, throwable) -> akkaRpcActor.stop(RpcEndpointTerminationResult.of(throwable)));
 
-			return TerminatingState.INSTANCE;
+			return TerminatingState.TERMINATING;
 		}
 
 		@Override
@@ -534,7 +539,7 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends AbstractActor {
 
 	@SuppressWarnings("Singleton")
 	enum StoppedState implements State {
-		INSTANCE;
+		STOPPED;
 
 		@Override
 		public State start(AkkaRpcActor<?> akkaRpcActor) {
@@ -552,25 +557,25 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends AbstractActor {
 				akkaRpcActor.mainThreadValidator.exitMainThread();
 			}
 
-			return StartedState.INSTANCE;
+			return StartedState.STARTED;
 		}
 
 		@Override
 		public State stop() {
-			return INSTANCE;
+			return STOPPED;
 		}
 
 		@Override
 		public State terminate(AkkaRpcActor<?> akkaRpcActor) {
 			akkaRpcActor.stop(RpcEndpointTerminationResult.success());
 
-			return TerminatingState.INSTANCE;
+			return TerminatingState.TERMINATING;
 		}
 	}
 
 	@SuppressWarnings("Singleton")
 	enum TerminatingState implements State {
-		INSTANCE;
+		TERMINATING;
 
 		@Override
 		public boolean isRunning() {
@@ -579,7 +584,7 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends AbstractActor {
 	}
 
 	enum TerminatedState implements State {
-		INSTANCE
+		TERMINATED
 	}
 
 	private static final class RpcEndpointTerminationResult {