You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/01/10 11:28:39 UTC
flink git commit: [FLINK-8392] [rpc] Let termination future be
completed by AkkaRpcActor#postStop
Repository: flink
Updated Branches:
refs/heads/master 6033de01a -> 51a278778
[FLINK-8392] [rpc] Let termination future be completed by AkkaRpcActor#postStop
Revert the changes introduced by FLINK-7754. An RpcEndpoint's termination future is now
completed from the AkkaRpcActor#postStop method.
This closes #5266.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/51a27877
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/51a27877
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/51a27877
Branch: refs/heads/master
Commit: 51a278778c3536aa9f5030a8f43a7faea6889992
Parents: 6033de0
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Jan 9 17:50:37 2018 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jan 10 12:27:50 2018 +0100
----------------------------------------------------------------------
.../runtime/rpc/akka/AkkaBasedEndpoint.java | 11 +--
.../runtime/rpc/akka/AkkaInvocationHandler.java | 30 +++-----
.../flink/runtime/rpc/akka/AkkaRpcActor.java | 24 +++---
.../flink/runtime/rpc/akka/AkkaRpcService.java | 79 +++-----------------
.../rpc/akka/FencedAkkaInvocationHandler.java | 3 +-
.../runtime/rpc/akka/FencedAkkaRpcActor.java | 6 +-
.../runtime/rpc/akka/AkkaRpcServiceTest.java | 7 +-
7 files changed, 40 insertions(+), 120 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/51a27877/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaBasedEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaBasedEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaBasedEndpoint.java
index 499de1e..7493507 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaBasedEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaBasedEndpoint.java
@@ -22,10 +22,8 @@ import org.apache.flink.runtime.rpc.RpcGateway;
import akka.actor.ActorRef;
-import java.util.concurrent.CompletableFuture;
-
/**
- * Interface for Akka based rpc gateways
+ * Interface for Akka based rpc gateways.
*/
interface AkkaBasedEndpoint extends RpcGateway {
@@ -35,11 +33,4 @@ interface AkkaBasedEndpoint extends RpcGateway {
* @return the {@link ActorRef} of the underlying RPC actor
*/
ActorRef getActorRef();
-
- /**
- * Returns the internal termination future.
- *
- * @return Internal termination future
- */
- CompletableFuture<Void> getInternalTerminationFuture();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/51a27877/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
index 37f46e3..863b780 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
@@ -18,23 +18,24 @@
package org.apache.flink.runtime.rpc.akka;
-import akka.actor.ActorRef;
-import akka.pattern.Patterns;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.rpc.FencedRpcGateway;
import org.apache.flink.runtime.rpc.MainThreadExecutable;
-import org.apache.flink.runtime.rpc.RpcServer;
import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcServer;
import org.apache.flink.runtime.rpc.RpcTimeout;
import org.apache.flink.runtime.rpc.StartStoppable;
+import org.apache.flink.runtime.rpc.akka.messages.Processing;
import org.apache.flink.runtime.rpc.messages.CallAsync;
import org.apache.flink.runtime.rpc.messages.LocalRpcInvocation;
-import org.apache.flink.runtime.rpc.akka.messages.Processing;
import org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation;
import org.apache.flink.runtime.rpc.messages.RpcInvocation;
import org.apache.flink.runtime.rpc.messages.RunAsync;
import org.apache.flink.util.Preconditions;
+
+import akka.actor.ActorRef;
+import akka.pattern.Patterns;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,8 +50,8 @@ import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
-import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Invocation handler to be used with an {@link AkkaRpcActor}. The invocation handler wraps the
@@ -85,18 +86,13 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, Rpc
@Nullable
private final CompletableFuture<Boolean> terminationFuture;
- // null if gateway; otherwise non-null
- @Nullable
- private final CompletableFuture<Void> internalTerminationFuture;
-
AkkaInvocationHandler(
String address,
String hostname,
ActorRef rpcEndpoint,
Time timeout,
long maximumFramesize,
- @Nullable CompletableFuture<Boolean> terminationFuture,
- @Nullable CompletableFuture<Void> internalTerminationFuture) {
+ @Nullable CompletableFuture<Boolean> terminationFuture) {
this.address = Preconditions.checkNotNull(address);
this.hostname = Preconditions.checkNotNull(hostname);
@@ -105,7 +101,6 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, Rpc
this.timeout = Preconditions.checkNotNull(timeout);
this.maximumFramesize = maximumFramesize;
this.terminationFuture = terminationFuture;
- this.internalTerminationFuture = internalTerminationFuture;
}
@Override
@@ -159,7 +154,7 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, Rpc
@Override
public <V> CompletableFuture<V> callAsync(Callable<V> callable, Time callTimeout) {
- if(isLocal) {
+ if (isLocal) {
@SuppressWarnings("unchecked")
CompletableFuture<V> resultFuture = (CompletableFuture<V>) ask(new CallAsync(callable), callTimeout);
@@ -208,7 +203,7 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, Rpc
tell(rpcInvocation);
result = null;
- } else if (Objects.equals(returnType,CompletableFuture.class)) {
+ } else if (Objects.equals(returnType, CompletableFuture.class)) {
// execute an asynchronous call
result = ask(rpcInvocation, futureTimeout);
} else {
@@ -298,7 +293,7 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, Rpc
}
/**
- * Checks whether any of the annotations is of type {@link RpcTimeout}
+ * Checks whether any of the annotations is of type {@link RpcTimeout}.
*
* @param annotations Array of annotations
* @return True if {@link RpcTimeout} was found; otherwise false
@@ -349,9 +344,4 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, Rpc
public CompletableFuture<Boolean> getTerminationFuture() {
return terminationFuture;
}
-
- @Override
- public CompletableFuture<Void> getInternalTerminationFuture() {
- return internalTerminationFuture;
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/51a27877/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
index f7488ab..da7ce35 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
@@ -53,14 +53,14 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Akka rpc actor which receives {@link LocalRpcInvocation}, {@link RunAsync} and {@link CallAsync}
* {@link Processing} messages.
- * <p>
- * The {@link LocalRpcInvocation} designates a rpc and is dispatched to the given {@link RpcEndpoint}
+ *
+ * <p>The {@link LocalRpcInvocation} designates a rpc and is dispatched to the given {@link RpcEndpoint}
* instance.
- * <p>
- * The {@link RunAsync} and {@link CallAsync} messages contain executable code which is executed
+ *
+ * <p>The {@link RunAsync} and {@link CallAsync} messages contain executable code which is executed
* in the context of the actor thread.
- * <p>
- * The {@link Processing} message controls the processing behaviour of the akka rpc actor. A
+ *
+ * <p>The {@link Processing} message controls the processing behaviour of the akka rpc actor. A
* {@link Processing#START} starts processing incoming messages. A {@link Processing#STOP} message
* stops processing messages. All messages which arrive when the processing is stopped, will be
* discarded.
@@ -68,7 +68,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* @param <T> Type of the {@link RpcEndpoint}
*/
class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor {
-
+
protected final Logger log = LoggerFactory.getLogger(getClass());
/** the endpoint to invoke the methods on. */
@@ -77,12 +77,12 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor {
/** the helper that tracks whether calls come from the main thread. */
private final MainThreadValidatorUtil mainThreadValidator;
- private final CompletableFuture<Void> internalTerminationFuture;
+ private final CompletableFuture<Boolean> terminationFuture;
- AkkaRpcActor(final T rpcEndpoint, final CompletableFuture<Void> internalTerminationFuture) {
+ AkkaRpcActor(final T rpcEndpoint, final CompletableFuture<Boolean> terminationFuture) {
this.rpcEndpoint = checkNotNull(rpcEndpoint, "rpc endpoint");
this.mainThreadValidator = new MainThreadValidatorUtil(rpcEndpoint);
- this.internalTerminationFuture = checkNotNull(internalTerminationFuture);
+ this.terminationFuture = checkNotNull(terminationFuture);
}
@Override
@@ -106,9 +106,9 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor {
// Complete the termination future so that others know that we've stopped.
if (shutdownThrowable != null) {
- internalTerminationFuture.completeExceptionally(shutdownThrowable);
+ terminationFuture.completeExceptionally(shutdownThrowable);
} else {
- internalTerminationFuture.complete(null);
+ terminationFuture.complete(null);
}
} finally {
mainThreadValidator.exitMainThread();
http://git-wip-us.apache.org/repos/asf/flink/blob/51a27877/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index 68b5aaa..7ff08f7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -54,10 +54,8 @@ import javax.annotation.concurrent.ThreadSafe;
import java.io.Serializable;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
@@ -65,7 +63,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import scala.Option;
@@ -165,7 +162,6 @@ public class AkkaRpcService implements RpcService {
actorRef,
timeout,
maximumFramesize,
- null,
null);
});
}
@@ -186,7 +182,6 @@ public class AkkaRpcService implements RpcService {
timeout,
maximumFramesize,
null,
- null,
() -> fencingToken);
});
}
@@ -196,13 +191,12 @@ public class AkkaRpcService implements RpcService {
checkNotNull(rpcEndpoint, "rpc endpoint");
CompletableFuture<Boolean> terminationFuture = new CompletableFuture<>();
- CompletableFuture<Void> internalTerminationFuture = new CompletableFuture<>();
final Props akkaRpcActorProps;
if (rpcEndpoint instanceof FencedRpcEndpoint) {
- akkaRpcActorProps = Props.create(FencedAkkaRpcActor.class, rpcEndpoint, internalTerminationFuture);
+ akkaRpcActorProps = Props.create(FencedAkkaRpcActor.class, rpcEndpoint, terminationFuture);
} else {
- akkaRpcActorProps = Props.create(AkkaRpcActor.class, rpcEndpoint, internalTerminationFuture);
+ akkaRpcActorProps = Props.create(AkkaRpcActor.class, rpcEndpoint, terminationFuture);
}
ActorRef actorRef;
@@ -240,7 +234,6 @@ public class AkkaRpcService implements RpcService {
timeout,
maximumFramesize,
terminationFuture,
- internalTerminationFuture,
((FencedRpcEndpoint<?>) rpcEndpoint)::getFencingToken);
implementedRpcGateways.add(FencedMainThreadExecutable.class);
@@ -251,8 +244,7 @@ public class AkkaRpcService implements RpcService {
actorRef,
timeout,
maximumFramesize,
- terminationFuture,
- internalTerminationFuture);
+ terminationFuture);
}
// Rather than using the System ClassLoader directly, we derive the ClassLoader
@@ -280,7 +272,6 @@ public class AkkaRpcService implements RpcService {
timeout,
maximumFramesize,
null,
- null,
() -> fencingToken);
// Rather than using the System ClassLoader directly, we derive the ClassLoader
@@ -300,43 +291,19 @@ public class AkkaRpcService implements RpcService {
@Override
public void stopServer(RpcServer selfGateway) {
if (selfGateway instanceof AkkaBasedEndpoint) {
- AkkaBasedEndpoint akkaClient = (AkkaBasedEndpoint) selfGateway;
+ final AkkaBasedEndpoint akkaClient = (AkkaBasedEndpoint) selfGateway;
+ final RpcEndpoint rpcEndpoint;
- boolean fromThisService;
synchronized (lock) {
if (stopped) {
return;
} else {
- fromThisService = actors.remove(akkaClient.getActorRef()) != null;
+ rpcEndpoint = actors.remove(akkaClient.getActorRef());
}
}
- if (fromThisService) {
- ActorRef selfActorRef = akkaClient.getActorRef();
- LOG.info("Trigger shut down of RPC endpoint {}.", selfGateway.getAddress());
-
- CompletableFuture<Boolean> akkaTerminationFuture = FutureUtils.toJava(
- Patterns.gracefulStop(
- selfActorRef,
- FutureUtils.toFiniteDuration(timeout),
- Kill.getInstance()));
-
- akkaTerminationFuture
- .thenCombine(
- akkaClient.getInternalTerminationFuture(),
- (Boolean terminated, Void ignored) -> true)
- .whenComplete(
- (Boolean terminated, Throwable throwable) -> {
- if (throwable != null) {
- LOG.debug("Graceful RPC endpoint shutdown failed. Shutting endpoint down hard now.", throwable);
-
- actorSystem.stop(selfActorRef);
- selfGateway.getTerminationFuture().completeExceptionally(throwable);
- } else {
- LOG.info("RPC endpoint {} has been shut down.", selfGateway.getAddress());
- selfGateway.getTerminationFuture().complete(null);
- }
- });
+ if (rpcEndpoint != null) {
+ akkaClient.getActorRef().tell(Kill.getInstance(), ActorRef.noSender());
} else {
LOG.debug("RPC endpoint {} already stopped or from different RPC service", selfGateway.getAddress());
}
@@ -347,8 +314,6 @@ public class AkkaRpcService implements RpcService {
public void stopService() {
LOG.info("Stopping Akka RPC service.");
- final List<RpcEndpoint> actorsToTerminate;
-
synchronized (lock) {
if (stopped) {
return;
@@ -356,35 +321,13 @@ public class AkkaRpcService implements RpcService {
stopped = true;
- actorSystem.shutdown();
-
- actorsToTerminate = new ArrayList<>(actors.values());
-
- actors.clear();
}
+ actorSystem.shutdown();
actorSystem.awaitTermination();
- // complete the termination futures of all actors
- for (RpcEndpoint rpcEndpoint : actorsToTerminate) {
- final CompletableFuture<Boolean> terminationFuture = rpcEndpoint.getTerminationFuture();
-
- AkkaBasedEndpoint akkaBasedEndpoint = rpcEndpoint.getSelfGateway(AkkaBasedEndpoint.class);
-
- CompletableFuture<Void> internalTerminationFuture = akkaBasedEndpoint.getInternalTerminationFuture();
-
- internalTerminationFuture.whenComplete(
- (Void ignored, Throwable throwable) -> {
- if (throwable != null) {
- terminationFuture.completeExceptionally(throwable);
- } else {
- terminationFuture.complete(true);
- }
- });
-
- // make sure that if the internal termination futures haven't completed yet, then they time out
- internalTerminationFuture.completeExceptionally(
- new TimeoutException("The RpcEndpoint " + rpcEndpoint.getAddress() + " did not terminate in time."));
+ synchronized (lock) {
+ actors.clear();
}
LOG.info("Stopped Akka RPC service.");
http://git-wip-us.apache.org/repos/asf/flink/blob/51a27877/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaInvocationHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaInvocationHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaInvocationHandler.java
index 03534ae..3ca75e2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaInvocationHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaInvocationHandler.java
@@ -61,9 +61,8 @@ public class FencedAkkaInvocationHandler<F extends Serializable> extends AkkaInv
Time timeout,
long maximumFramesize,
@Nullable CompletableFuture<Boolean> terminationFuture,
- @Nullable CompletableFuture<Void> internalTerminationFuture,
Supplier<F> fencingTokenSupplier) {
- super(address, hostname, rpcEndpoint, timeout, maximumFramesize, terminationFuture, internalTerminationFuture);
+ super(address, hostname, rpcEndpoint, timeout, maximumFramesize, terminationFuture);
this.fencingTokenSupplier = Preconditions.checkNotNull(fencingTokenSupplier);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/51a27877/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java
index fa83e4f..57280fd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java
@@ -19,9 +19,9 @@
package org.apache.flink.runtime.rpc.akka;
import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.akka.exceptions.AkkaUnknownMessageException;
import org.apache.flink.runtime.rpc.exceptions.FencingTokenException;
-import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.messages.FencedMessage;
import org.apache.flink.runtime.rpc.messages.UnfencedMessage;
@@ -38,8 +38,8 @@ import java.util.concurrent.CompletableFuture;
*/
public class FencedAkkaRpcActor<F extends Serializable, T extends FencedRpcEndpoint<F> & RpcGateway> extends AkkaRpcActor<T> {
- public FencedAkkaRpcActor(T rpcEndpoint, CompletableFuture<Void> internalTerminationFuture) {
- super(rpcEndpoint, internalTerminationFuture);
+ public FencedAkkaRpcActor(T rpcEndpoint, CompletableFuture<Boolean> terminationFuture) {
+ super(rpcEndpoint, terminationFuture);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/51a27877/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
index a5c41ef..c4259f4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
@@ -137,12 +137,9 @@ public class AkkaRpcServiceTest extends TestLogger {
/**
* Tests that we can wait for the termination of the rpc service
- *
- * @throws ExecutionException
- * @throws InterruptedException
*/
@Test(timeout = 60000)
- public void testTerminationFuture() throws ExecutionException, InterruptedException {
+ public void testTerminationFuture() throws Exception {
final ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem();
final AkkaRpcService rpcService = new AkkaRpcService(actorSystem, Time.milliseconds(1000));
@@ -150,7 +147,7 @@ public class AkkaRpcServiceTest extends TestLogger {
assertFalse(terminationFuture.isDone());
- CompletableFuture.runAsync(() -> rpcService.stopService(), actorSystem.dispatcher());
+ CompletableFuture.runAsync(rpcService::stopService, actorSystem.dispatcher());
terminationFuture.get();
}