You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/04/04 13:50:34 UTC

[2/2] flink git commit: [FLINK-9128] [flip6] Add support for scheduleRunAsync for FencedRpcEndpoints

[FLINK-9128] [flip6] Add support for scheduleRunAsync for FencedRpcEndpoints

Wrap self messages in the FencedRpcEndpoints in a LocalFencedMessage to not drop them
due to a missing fencing token.

This closes #5812.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/826d51d5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/826d51d5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/826d51d5

Branch: refs/heads/master
Commit: 826d51d5c4e2501bf1681d0809693576140876bd
Parents: 6ffe22d
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Apr 4 00:02:52 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Apr 4 15:49:52 2018 +0200

----------------------------------------------------------------------
 .../flink/runtime/rpc/akka/AkkaRpcActor.java    |  14 +-
 .../runtime/rpc/akka/FencedAkkaRpcActor.java    |   8 +
 .../flink/runtime/rpc/AsyncCallsTest.java       | 198 +++++++++++++------
 3 files changed, 154 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/826d51d5/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
index 9f68ede..022dea3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
@@ -303,7 +303,9 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor {
 				FiniteDuration delay = new FiniteDuration(delayNanos, TimeUnit.NANOSECONDS);
 				RunAsync message = new RunAsync(runAsync.getRunnable(), timeToRun);
 
-				getContext().system().scheduler().scheduleOnce(delay, getSelf(), message,
+				final Object envelopedSelfMessage = envelopeSelfMessage(message);
+
+				getContext().system().scheduler().scheduleOnce(delay, getSelf(), envelopedSelfMessage,
 						getContext().dispatcher(), ActorRef.noSender());
 			}
 		}
@@ -332,4 +334,14 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor {
 			getSender().tell(new Status.Failure(throwable), getSelf());
 		}
 	}
+
+	/**
+	 * Hook to envelope self messages.
+	 *
+	 * @param message to envelope
+	 * @return enveloped message
+	 */
+	protected Object envelopeSelfMessage(Object message) {
+		return message;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/826d51d5/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java
index 6096439..d4cc16e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.akka.exceptions.AkkaUnknownMessageException;
 import org.apache.flink.runtime.rpc.exceptions.FencingTokenException;
 import org.apache.flink.runtime.rpc.messages.FencedMessage;
+import org.apache.flink.runtime.rpc.messages.LocalFencedMessage;
 import org.apache.flink.runtime.rpc.messages.UnfencedMessage;
 
 import java.io.Serializable;
@@ -92,4 +93,11 @@ public class FencedAkkaRpcActor<F extends Serializable, T extends FencedRpcEndpo
 				FencedMessage.class.getSimpleName() + " nor " + UnfencedMessage.class.getSimpleName() + '.'));
 		}
 	}
+
+	@Override
+	protected Object envelopeSelfMessage(Object message) {
+		final F fencingToken = rpcEndpoint.getFencingToken();
+
+		return new LocalFencedMessage<>(fencingToken, message);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/826d51d5/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
index 5f5ba44..c194d44 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
@@ -83,88 +83,99 @@ public class AsyncCallsTest extends TestLogger {
 
 	@Test
 	public void testScheduleWithNoDelay() throws Exception {
+		runScheduleWithNoDelayTest(TestEndpoint::new);
+	}
 
+	@Test
+	public void testFencedScheduleWithNoDelay() throws Exception {
+		runScheduleWithNoDelayTest(FencedTestEndpoint::new);
+	}
+
+	private void runScheduleWithNoDelayTest(RpcEndpointFactory factory) throws Exception {
 		// to collect all the thread references
 		final ReentrantLock lock = new ReentrantLock();
 		final AtomicBoolean concurrentAccess = new AtomicBoolean(false);
 
-		TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService, lock);
-		testEndpoint.start();
-		TestGateway gateway = testEndpoint.getSelfGateway(TestGateway.class);
+		RpcEndpoint rpcEndpoint = factory.create(akkaRpcService, lock, concurrentAccess);
+		rpcEndpoint.start();
 
-		// a bunch of gateway calls
-		gateway.someCall();
-		gateway.anotherCall();
-		gateway.someCall();
+		try {
+			TestGateway gateway = rpcEndpoint.getSelfGateway(TestGateway.class);
 
-		// run something asynchronously
-		for (int i = 0; i < 10000; i++) {
-			testEndpoint.runAsync(new Runnable() {
-				@Override
-				public void run() {
+			// a bunch of gateway calls
+			gateway.someCall();
+			gateway.anotherCall();
+			gateway.someCall();
+
+			// run something asynchronously
+			for (int i = 0; i < 10000; i++) {
+				rpcEndpoint.runAsync(() -> {
 					boolean holdsLock = lock.tryLock();
 					if (holdsLock) {
 						lock.unlock();
 					} else {
 						concurrentAccess.set(true);
 					}
-				}
-			});
-		}
-	
-		CompletableFuture<String> result = testEndpoint.callAsync(
-			() -> {
-				boolean holdsLock = lock.tryLock();
-				if (holdsLock) {
-					lock.unlock();
-				} else {
-					concurrentAccess.set(true);
-				}
-				return "test";
-			},
-			Time.seconds(30L));
+				});
+			}
 
-		String str = result.get(30, TimeUnit.SECONDS);
-		assertEquals("test", str);
+			CompletableFuture<String> result = rpcEndpoint.callAsync(
+				() -> {
+					boolean holdsLock = lock.tryLock();
+					if (holdsLock) {
+						lock.unlock();
+					} else {
+						concurrentAccess.set(true);
+					}
+					return "test";
+				},
+				Time.seconds(30L));
 
-		// validate that no concurrent access happened
-		assertFalse("Rpc Endpoint had concurrent access", testEndpoint.hasConcurrentAccess());
-		assertFalse("Rpc Endpoint had concurrent access", concurrentAccess.get());
+			String str = result.get(30, TimeUnit.SECONDS);
+			assertEquals("test", str);
 
-		testEndpoint.shutDown();
+			// validate that no concurrent access happened
+			assertFalse("Rpc Endpoint had concurrent access", concurrentAccess.get());
+		} finally {
+			rpcEndpoint.shutDown();
+		}
 	}
 
 	@Test
 	public void testScheduleWithDelay() throws Exception {
+		runScheduleWithDelayTest(TestEndpoint::new);
+	}
 
+	@Test
+	public void testFencedScheduleWithDelay() throws Exception {
+		runScheduleWithDelayTest(FencedTestEndpoint::new);
+	}
+
+	private void runScheduleWithDelayTest(RpcEndpointFactory factory) throws Exception {
 		// to collect all the thread references
 		final ReentrantLock lock = new ReentrantLock();
 		final AtomicBoolean concurrentAccess = new AtomicBoolean(false);
 		final OneShotLatch latch = new OneShotLatch();
 
-		final long delay = 100;
+		final long delay = 10L;
 
-		TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService, lock);
-		testEndpoint.start();
+		RpcEndpoint rpcEndpoint = factory.create(akkaRpcService, lock, concurrentAccess);
+		rpcEndpoint.start();
 
-		// run something asynchronously
-		testEndpoint.runAsync(new Runnable() {
-			@Override
-			public void run() {
+		try {
+			// run something asynchronously
+			rpcEndpoint.runAsync(() -> {
 				boolean holdsLock = lock.tryLock();
 				if (holdsLock) {
 					lock.unlock();
 				} else {
 					concurrentAccess.set(true);
 				}
-			}
-		});
+			});
 
-		final long start = System.nanoTime();
+			final long start = System.nanoTime();
 
-		testEndpoint.scheduleRunAsync(new Runnable() {
-			@Override
-			public void run() {
+			rpcEndpoint.scheduleRunAsync(() -> {
 				boolean holdsLock = lock.tryLock();
 				if (holdsLock) {
 					lock.unlock();
@@ -172,17 +183,23 @@ public class AsyncCallsTest extends TestLogger {
 					concurrentAccess.set(true);
 				}
 				latch.trigger();
-			}
-		}, delay, TimeUnit.MILLISECONDS);
+			}, delay, TimeUnit.MILLISECONDS);
 
-		latch.await();
-		final long stop = System.nanoTime();
+			latch.await();
+			final long stop = System.nanoTime();
 
-		// validate that no concurrent access happened
-		assertFalse("Rpc Endpoint had concurrent access", testEndpoint.hasConcurrentAccess());
-		assertFalse("Rpc Endpoint had concurrent access", concurrentAccess.get());
+			// validate that no concurrent access happened
+			assertFalse("Rpc Endpoint had concurrent access", concurrentAccess.get());
+
+			assertTrue("call was not properly delayed", ((stop - start) / 1_000_000) >= delay);
+		} finally {
+			RpcUtils.terminateRpcEndpoint(rpcEndpoint, timeout);
+		}
+	}
 
-		assertTrue("call was not properly delayed", ((stop - start) / 1_000_000) >= delay);
+	@FunctionalInterface
+	private interface RpcEndpointFactory {
+		RpcEndpoint create(RpcService rpcService, ReentrantLock lock, AtomicBoolean concurrentAccess);
 	}
 
 	/**
@@ -321,16 +338,16 @@ public class AsyncCallsTest extends TestLogger {
 		void anotherCall();
 	}
 
-	@SuppressWarnings("unused")
-	public static class TestEndpoint extends RpcEndpoint implements TestGateway {
+	private static class TestEndpoint extends RpcEndpoint implements TestGateway {
 
 		private final ReentrantLock lock;
 
-		private volatile boolean concurrentAccess;
+		private final AtomicBoolean concurrentAccess;
 
-		public TestEndpoint(RpcService rpcService, ReentrantLock lock) {
+		TestEndpoint(RpcService rpcService, ReentrantLock lock, AtomicBoolean concurrentAccess) {
 			super(rpcService);
 			this.lock = lock;
+			this.concurrentAccess = concurrentAccess;
 		}
 
 		@Override
@@ -339,7 +356,7 @@ public class AsyncCallsTest extends TestLogger {
 			if (holdsLock) {
 				lock.unlock();
 			} else {
-				concurrentAccess = true;
+				concurrentAccess.set(true);
 			}
 		}
 
@@ -349,36 +366,67 @@ public class AsyncCallsTest extends TestLogger {
 			if (holdsLock) {
 				lock.unlock();
 			} else {
-				concurrentAccess = true;
+				concurrentAccess.set(true);
 			}
 		}
 
-		public boolean hasConcurrentAccess() {
-			return concurrentAccess;
-		}
-
 		@Override
 		public CompletableFuture<Void> postStop() {
 			return CompletableFuture.completedFuture(null);
 		}
 	}
 
-	public interface FencedTestGateway extends FencedRpcGateway<UUID> {
+	public interface FencedTestGateway extends FencedRpcGateway<UUID>, TestGateway {
 		CompletableFuture<Acknowledge> setNewFencingToken(UUID fencingToken, @RpcTimeout Time timeout);
 	}
 
 	public static class FencedTestEndpoint extends FencedRpcEndpoint<UUID> implements FencedTestGateway {
 
+		private final ReentrantLock lock;
+		private final AtomicBoolean concurrentAccess;
+
 		private final OneShotLatch enteringSetNewFencingToken;
 		private final OneShotLatch triggerSetNewFencingToken;
 
 		protected FencedTestEndpoint(
 				RpcService rpcService,
+				ReentrantLock lock,
+				AtomicBoolean concurrentAccess) {
+			this(
+				rpcService,
+				lock,
+				concurrentAccess,
+				UUID.randomUUID(),
+				new OneShotLatch(),
+				new OneShotLatch());
+		}
+
+		protected FencedTestEndpoint(
+				RpcService rpcService,
+				UUID initialFencingToken,
+				OneShotLatch enteringSetNewFencingToken,
+				OneShotLatch triggerSetNewFencingToken) {
+			this(
+				rpcService,
+				new ReentrantLock(),
+				new AtomicBoolean(false),
+				initialFencingToken,
+				enteringSetNewFencingToken,
+				triggerSetNewFencingToken);
+		}
+
+		private FencedTestEndpoint(
+				RpcService rpcService,
+				ReentrantLock lock,
+				AtomicBoolean concurrentAccess,
 				UUID initialFencingToken,
 				OneShotLatch enteringSetNewFencingToken,
 				OneShotLatch triggerSetNewFencingToken) {
 			super(rpcService);
 
+			this.lock = lock;
+			this.concurrentAccess = concurrentAccess;
+
 			this.enteringSetNewFencingToken = enteringSetNewFencingToken;
 			this.triggerSetNewFencingToken = triggerSetNewFencingToken;
 
@@ -410,5 +458,25 @@ public class AsyncCallsTest extends TestLogger {
 		public CompletableFuture<Void> postStop() {
 			return CompletableFuture.completedFuture(null);
 		}
+
+		@Override
+		public void someCall() {
+			boolean holdsLock = lock.tryLock();
+			if (holdsLock) {
+				lock.unlock();
+			} else {
+				concurrentAccess.set(true);
+			}
+		}
+
+		@Override
+		public void anotherCall() {
+			boolean holdsLock = lock.tryLock();
+			if (holdsLock) {
+				lock.unlock();
+			} else {
+				concurrentAccess.set(true);
+			}
+		}
 	}
 }