You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2020/03/31 10:44:05 UTC

[flink] 02/02: [FLINK-16703][rpc] Set AkkaRpcActor state to TERMINATING when terminating

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8773895b6b7cf75ee642ebd1496c0566a0ddaa40
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Fri Mar 27 14:34:07 2020 +0100

    [FLINK-16703][rpc] Set AkkaRpcActor state to TERMINATING when terminating
    
    This commit fixes a bug where we did not update the state of the AkkaRpcActor
    in case of terminating it. Moreover, this commit fixes the problem that the
    onStop action could have been called multiple times. Last but not least, it
    changes the enum names of the state implementations for better diagnostics.
    
    This closes #11549.
---
 .../flink/runtime/rpc/akka/AkkaRpcActor.java       |  5 ++
 .../flink/runtime/rpc/akka/AkkaRpcActorTest.java   | 64 ++++++++++++++++++++++
 2 files changed, 69 insertions(+)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
index 0397172..7ad3776 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
@@ -578,6 +578,11 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends AbstractActor {
 		TERMINATING;
 
 		@Override
+		public State terminate(AkkaRpcActor<?> akkaRpcActor) {
+			return TERMINATING;
+		}
+
+		@Override
 		public boolean isRunning() {
 			return true;
 		}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
index 0597b01..64270c4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
@@ -36,6 +36,7 @@ import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
 
+import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import org.hamcrest.core.Is;
 import org.junit.AfterClass;
@@ -395,6 +396,38 @@ public class AkkaRpcActorTest extends TestLogger {
 		}
 	}
 
+	/**
+	 * Tests that multiple termination calls won't trigger the onStop action multiple times.
+	 * Note that this test is a probabilistic test which only fails sometimes without the fix.
+	 * See FLINK-16703.
+	 */
+	@Test
+	public void callsOnStopOnlyOnce() throws Exception {
+		final CompletableFuture<Void> onStopFuture = new CompletableFuture<>();
+		final OnStopCountingRpcEndpoint endpoint = new OnStopCountingRpcEndpoint(akkaRpcService, onStopFuture);
+
+		try {
+			endpoint.start();
+
+			final AkkaBasedEndpoint selfGateway = endpoint.getSelfGateway(AkkaBasedEndpoint.class);
+
+			// try to terminate the actor twice
+			selfGateway.getActorRef().tell(ControlMessages.TERMINATE, ActorRef.noSender());
+			selfGateway.getActorRef().tell(ControlMessages.TERMINATE, ActorRef.noSender());
+
+			endpoint.waitUntilOnStopHasBeenCalled();
+
+			onStopFuture.complete(null);
+
+			endpoint.getTerminationFuture().get();
+
+			assertThat(endpoint.getNumOnStopCalls(), is(1));
+		} finally {
+			onStopFuture.complete(null);
+			RpcUtils.terminateRpcEndpoint(endpoint, timeout);
+		}
+	}
+
 	// ------------------------------------------------------------------------
 	//  Test Actors and Interfaces
 	// ------------------------------------------------------------------------
@@ -609,4 +642,35 @@ public class AkkaRpcActorTest extends TestLogger {
 			countDownLatch.await();
 		}
 	}
+
+	// ------------------------------------------------------------------------
+
+	private static final class OnStopCountingRpcEndpoint extends RpcEndpoint {
+
+		private final AtomicInteger numOnStopCalls = new AtomicInteger(0);
+
+		private final OneShotLatch onStopHasBeenCalled = new OneShotLatch();
+
+		private final CompletableFuture<Void> onStopFuture;
+
+		private OnStopCountingRpcEndpoint(RpcService rpcService, CompletableFuture<Void> onStopFuture) {
+			super(rpcService);
+			this.onStopFuture = onStopFuture;
+		}
+
+		@Override
+		protected CompletableFuture<Void> onStop() {
+			onStopHasBeenCalled.trigger();
+			numOnStopCalls.incrementAndGet();
+			return onStopFuture;
+		}
+
+		private int getNumOnStopCalls() {
+			return numOnStopCalls.get();
+		}
+
+		private void waitUntilOnStopHasBeenCalled() throws InterruptedException {
+			onStopHasBeenCalled.await();
+		}
+	}
 }