You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/04/04 13:50:34 UTC
[2/2] flink git commit: [FLINK-9128] [flip6] Add support for
scheduleRunAsync for FencedRpcEndpoints
[FLINK-9128] [flip6] Add support for scheduleRunAsync for FencedRpcEndpoints
Wrap self messages in the FencedRpcEndpoints in a LocalFencedMessage to not drop them
due to a missing fencing token.
This closes #5812.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/826d51d5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/826d51d5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/826d51d5
Branch: refs/heads/master
Commit: 826d51d5c4e2501bf1681d0809693576140876bd
Parents: 6ffe22d
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Apr 4 00:02:52 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Apr 4 15:49:52 2018 +0200
----------------------------------------------------------------------
.../flink/runtime/rpc/akka/AkkaRpcActor.java | 14 +-
.../runtime/rpc/akka/FencedAkkaRpcActor.java | 8 +
.../flink/runtime/rpc/AsyncCallsTest.java | 198 +++++++++++++------
3 files changed, 154 insertions(+), 66 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/826d51d5/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
index 9f68ede..022dea3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
@@ -303,7 +303,9 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor {
FiniteDuration delay = new FiniteDuration(delayNanos, TimeUnit.NANOSECONDS);
RunAsync message = new RunAsync(runAsync.getRunnable(), timeToRun);
- getContext().system().scheduler().scheduleOnce(delay, getSelf(), message,
+ final Object envelopedSelfMessage = envelopeSelfMessage(message);
+
+ getContext().system().scheduler().scheduleOnce(delay, getSelf(), envelopedSelfMessage,
getContext().dispatcher(), ActorRef.noSender());
}
}
@@ -332,4 +334,14 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor {
getSender().tell(new Status.Failure(throwable), getSelf());
}
}
+
+ /**
+ * Hook to envelope self messages.
+ *
+ * @param message to envelope
+ * @return enveloped message
+ */
+ protected Object envelopeSelfMessage(Object message) {
+ return message;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/826d51d5/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java
index 6096439..d4cc16e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.akka.exceptions.AkkaUnknownMessageException;
import org.apache.flink.runtime.rpc.exceptions.FencingTokenException;
import org.apache.flink.runtime.rpc.messages.FencedMessage;
+import org.apache.flink.runtime.rpc.messages.LocalFencedMessage;
import org.apache.flink.runtime.rpc.messages.UnfencedMessage;
import java.io.Serializable;
@@ -92,4 +93,11 @@ public class FencedAkkaRpcActor<F extends Serializable, T extends FencedRpcEndpo
FencedMessage.class.getSimpleName() + " nor " + UnfencedMessage.class.getSimpleName() + '.'));
}
}
+
+ @Override
+ protected Object envelopeSelfMessage(Object message) {
+ final F fencingToken = rpcEndpoint.getFencingToken();
+
+ return new LocalFencedMessage<>(fencingToken, message);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/826d51d5/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
index 5f5ba44..c194d44 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
@@ -83,88 +83,99 @@ public class AsyncCallsTest extends TestLogger {
@Test
public void testScheduleWithNoDelay() throws Exception {
+ runScheduleWithNoDelayTest(TestEndpoint::new);
+ }
+ @Test
+ public void testFencedScheduleWithNoDelay() throws Exception {
+ runScheduleWithNoDelayTest(FencedTestEndpoint::new);
+ }
+
+ private void runScheduleWithNoDelayTest(RpcEndpointFactory factory) throws Exception {
// to collect all the thread references
final ReentrantLock lock = new ReentrantLock();
final AtomicBoolean concurrentAccess = new AtomicBoolean(false);
- TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService, lock);
- testEndpoint.start();
- TestGateway gateway = testEndpoint.getSelfGateway(TestGateway.class);
+ RpcEndpoint rpcEndpoint = factory.create(akkaRpcService, lock, concurrentAccess);
+ rpcEndpoint.start();
- // a bunch of gateway calls
- gateway.someCall();
- gateway.anotherCall();
- gateway.someCall();
+ try {
+ TestGateway gateway = rpcEndpoint.getSelfGateway(TestGateway.class);
- // run something asynchronously
- for (int i = 0; i < 10000; i++) {
- testEndpoint.runAsync(new Runnable() {
- @Override
- public void run() {
+ // a bunch of gateway calls
+ gateway.someCall();
+ gateway.anotherCall();
+ gateway.someCall();
+
+ // run something asynchronously
+ for (int i = 0; i < 10000; i++) {
+ rpcEndpoint.runAsync(() -> {
boolean holdsLock = lock.tryLock();
if (holdsLock) {
lock.unlock();
} else {
concurrentAccess.set(true);
}
- }
- });
- }
-
- CompletableFuture<String> result = testEndpoint.callAsync(
- () -> {
- boolean holdsLock = lock.tryLock();
- if (holdsLock) {
- lock.unlock();
- } else {
- concurrentAccess.set(true);
- }
- return "test";
- },
- Time.seconds(30L));
+ });
+ }
- String str = result.get(30, TimeUnit.SECONDS);
- assertEquals("test", str);
+ CompletableFuture<String> result = rpcEndpoint.callAsync(
+ () -> {
+ boolean holdsLock = lock.tryLock();
+ if (holdsLock) {
+ lock.unlock();
+ } else {
+ concurrentAccess.set(true);
+ }
+ return "test";
+ },
+ Time.seconds(30L));
- // validate that no concurrent access happened
- assertFalse("Rpc Endpoint had concurrent access", testEndpoint.hasConcurrentAccess());
- assertFalse("Rpc Endpoint had concurrent access", concurrentAccess.get());
+ String str = result.get(30, TimeUnit.SECONDS);
+ assertEquals("test", str);
- testEndpoint.shutDown();
+ // validate that no concurrent access happened
+ assertFalse("Rpc Endpoint had concurrent access", concurrentAccess.get());
+ } finally {
+ rpcEndpoint.shutDown();
+ }
}
@Test
public void testScheduleWithDelay() throws Exception {
+ runScheduleWithDelayTest(TestEndpoint::new);
+ }
+ @Test
+ public void testFencedScheduleWithDelay() throws Exception {
+ runScheduleWithDelayTest(FencedTestEndpoint::new);
+ }
+
+ private void runScheduleWithDelayTest(RpcEndpointFactory factory) throws Exception {
// to collect all the thread references
final ReentrantLock lock = new ReentrantLock();
final AtomicBoolean concurrentAccess = new AtomicBoolean(false);
final OneShotLatch latch = new OneShotLatch();
- final long delay = 100;
+ final long delay = 10L;
- TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService, lock);
- testEndpoint.start();
+ RpcEndpoint rpcEndpoint = factory.create(akkaRpcService, lock, concurrentAccess);
+ rpcEndpoint.start();
- // run something asynchronously
- testEndpoint.runAsync(new Runnable() {
- @Override
- public void run() {
+ try {
+ // run something asynchronously
+ rpcEndpoint.runAsync(() -> {
boolean holdsLock = lock.tryLock();
if (holdsLock) {
lock.unlock();
} else {
concurrentAccess.set(true);
}
- }
- });
+ });
- final long start = System.nanoTime();
+ final long start = System.nanoTime();
- testEndpoint.scheduleRunAsync(new Runnable() {
- @Override
- public void run() {
+ rpcEndpoint.scheduleRunAsync(() -> {
boolean holdsLock = lock.tryLock();
if (holdsLock) {
lock.unlock();
@@ -172,17 +183,23 @@ public class AsyncCallsTest extends TestLogger {
concurrentAccess.set(true);
}
latch.trigger();
- }
- }, delay, TimeUnit.MILLISECONDS);
+ }, delay, TimeUnit.MILLISECONDS);
- latch.await();
- final long stop = System.nanoTime();
+ latch.await();
+ final long stop = System.nanoTime();
- // validate that no concurrent access happened
- assertFalse("Rpc Endpoint had concurrent access", testEndpoint.hasConcurrentAccess());
- assertFalse("Rpc Endpoint had concurrent access", concurrentAccess.get());
+ // validate that no concurrent access happened
+ assertFalse("Rpc Endpoint had concurrent access", concurrentAccess.get());
+
+ assertTrue("call was not properly delayed", ((stop - start) / 1_000_000) >= delay);
+ } finally {
+ RpcUtils.terminateRpcEndpoint(rpcEndpoint, timeout);
+ }
+ }
- assertTrue("call was not properly delayed", ((stop - start) / 1_000_000) >= delay);
+ @FunctionalInterface
+ private interface RpcEndpointFactory {
+ RpcEndpoint create(RpcService rpcService, ReentrantLock lock, AtomicBoolean concurrentAccess);
}
/**
@@ -321,16 +338,16 @@ public class AsyncCallsTest extends TestLogger {
void anotherCall();
}
- @SuppressWarnings("unused")
- public static class TestEndpoint extends RpcEndpoint implements TestGateway {
+ private static class TestEndpoint extends RpcEndpoint implements TestGateway {
private final ReentrantLock lock;
- private volatile boolean concurrentAccess;
+ private final AtomicBoolean concurrentAccess;
- public TestEndpoint(RpcService rpcService, ReentrantLock lock) {
+ TestEndpoint(RpcService rpcService, ReentrantLock lock, AtomicBoolean concurrentAccess) {
super(rpcService);
this.lock = lock;
+ this.concurrentAccess = concurrentAccess;
}
@Override
@@ -339,7 +356,7 @@ public class AsyncCallsTest extends TestLogger {
if (holdsLock) {
lock.unlock();
} else {
- concurrentAccess = true;
+ concurrentAccess.set(true);
}
}
@@ -349,36 +366,67 @@ public class AsyncCallsTest extends TestLogger {
if (holdsLock) {
lock.unlock();
} else {
- concurrentAccess = true;
+ concurrentAccess.set(true);
}
}
- public boolean hasConcurrentAccess() {
- return concurrentAccess;
- }
-
@Override
public CompletableFuture<Void> postStop() {
return CompletableFuture.completedFuture(null);
}
}
- public interface FencedTestGateway extends FencedRpcGateway<UUID> {
+ public interface FencedTestGateway extends FencedRpcGateway<UUID>, TestGateway {
CompletableFuture<Acknowledge> setNewFencingToken(UUID fencingToken, @RpcTimeout Time timeout);
}
public static class FencedTestEndpoint extends FencedRpcEndpoint<UUID> implements FencedTestGateway {
+ private final ReentrantLock lock;
+ private final AtomicBoolean concurrentAccess;
+
private final OneShotLatch enteringSetNewFencingToken;
private final OneShotLatch triggerSetNewFencingToken;
protected FencedTestEndpoint(
RpcService rpcService,
+ ReentrantLock lock,
+ AtomicBoolean concurrentAccess) {
+ this(
+ rpcService,
+ lock,
+ concurrentAccess,
+ UUID.randomUUID(),
+ new OneShotLatch(),
+ new OneShotLatch());
+ }
+
+ protected FencedTestEndpoint(
+ RpcService rpcService,
+ UUID initialFencingToken,
+ OneShotLatch enteringSetNewFencingToken,
+ OneShotLatch triggerSetNewFencingToken) {
+ this(
+ rpcService,
+ new ReentrantLock(),
+ new AtomicBoolean(false),
+ initialFencingToken,
+ enteringSetNewFencingToken,
+ triggerSetNewFencingToken);
+ }
+
+ private FencedTestEndpoint(
+ RpcService rpcService,
+ ReentrantLock lock,
+ AtomicBoolean concurrentAccess,
UUID initialFencingToken,
OneShotLatch enteringSetNewFencingToken,
OneShotLatch triggerSetNewFencingToken) {
super(rpcService);
+ this.lock = lock;
+ this.concurrentAccess = concurrentAccess;
+
this.enteringSetNewFencingToken = enteringSetNewFencingToken;
this.triggerSetNewFencingToken = triggerSetNewFencingToken;
@@ -410,5 +458,25 @@ public class AsyncCallsTest extends TestLogger {
public CompletableFuture<Void> postStop() {
return CompletableFuture.completedFuture(null);
}
+
+ @Override
+ public void someCall() {
+ boolean holdsLock = lock.tryLock();
+ if (holdsLock) {
+ lock.unlock();
+ } else {
+ concurrentAccess.set(true);
+ }
+ }
+
+ @Override
+ public void anotherCall() {
+ boolean holdsLock = lock.tryLock();
+ if (holdsLock) {
+ lock.unlock();
+ } else {
+ concurrentAccess.set(true);
+ }
+ }
}
}