You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/10/02 21:58:07 UTC
[02/50] [abbrv] flink git commit: [FLINK-4392] [rpc] Make RPC Service
thread-safe
[FLINK-4392] [rpc] Make RPC Service thread-safe
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/29fadada
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/29fadada
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/29fadada
Branch: refs/heads/flip-6
Commit: 29fadada67b74876c5db5660c9885310cc756b21
Parents: dd4ad2d
Author: Stephan Ewen <se...@apache.org>
Authored: Sat Aug 13 19:11:47 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Sun Oct 2 23:44:40 2016 +0200
----------------------------------------------------------------------
.../flink/runtime/rpc/akka/AkkaGateway.java | 3 +-
.../flink/runtime/rpc/akka/AkkaRpcService.java | 92 +++++++++++++++-----
2 files changed, 70 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/29fadada/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaGateway.java
index a826e7d..ec3091c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaGateway.java
@@ -19,11 +19,12 @@
package org.apache.flink.runtime.rpc.akka;
import akka.actor.ActorRef;
+import org.apache.flink.runtime.rpc.RpcGateway;
/**
* Interface for Akka based rpc gateways
*/
-interface AkkaGateway {
+interface AkkaGateway extends RpcGateway {
ActorRef getRpcServer();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/29fadada/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index 17983d0..448216c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -28,47 +28,61 @@ import akka.actor.Props;
import akka.dispatch.Mapper;
import akka.pattern.AskableActorSelection;
import akka.util.Timeout;
+
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.rpc.MainThreadExecutor;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.util.Preconditions;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import scala.concurrent.Future;
+import javax.annotation.concurrent.ThreadSafe;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;
-import java.util.Collection;
import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
/**
- * Akka based {@link RpcService} implementation. The rpc service starts an Akka actor to receive
- * rpcs from a {@link RpcGateway}.
+ * Akka based {@link RpcService} implementation. The RPC service starts an Akka actor to receive
+ * RPC invocations from a {@link RpcGateway}.
*/
+@ThreadSafe
public class AkkaRpcService implements RpcService {
+
private static final Logger LOG = LoggerFactory.getLogger(AkkaRpcService.class);
+ private final Object lock = new Object();
+
private final ActorSystem actorSystem;
private final Timeout timeout;
- private final Collection<ActorRef> actors = new HashSet<>(4);
+ private final Set<ActorRef> actors = new HashSet<>(4);
+
+ private volatile boolean stopped;
public AkkaRpcService(final ActorSystem actorSystem, final Timeout timeout) {
- this.actorSystem = Preconditions.checkNotNull(actorSystem, "actor system");
- this.timeout = Preconditions.checkNotNull(timeout, "timeout");
+ this.actorSystem = checkNotNull(actorSystem, "actor system");
+ this.timeout = checkNotNull(timeout, "timeout");
}
+ // this method does not mutate state and is thus thread-safe
@Override
public <C extends RpcGateway> Future<C> connect(final String address, final Class<C> clazz) {
- LOG.info("Try to connect to remote rpc server with address {}. Returning a {} gateway.", address, clazz.getName());
+ checkState(!stopped, "RpcService is stopped");
- final ActorSelection actorSel = actorSystem.actorSelection(address);
+ LOG.debug("Try to connect to remote RPC endpoint with address {}. Returning a {} gateway.",
+ address, clazz.getName());
+ final ActorSelection actorSel = actorSystem.actorSelection(address);
final AskableActorSelection asker = new AskableActorSelection(actorSel);
final Future<Object> identify = asker.ask(new Identify(42), timeout);
-
return identify.map(new Mapper<Object, C>(){
@Override
public C apply(Object obj) {
@@ -89,20 +103,29 @@ public class AkkaRpcService implements RpcService {
@Override
public <C extends RpcGateway, S extends RpcEndpoint<C>> C startServer(S rpcEndpoint) {
- Preconditions.checkNotNull(rpcEndpoint, "rpc endpoint");
-
- LOG.info("Start Akka rpc actor to handle rpcs for {}.", rpcEndpoint.getClass().getName());
+ checkNotNull(rpcEndpoint, "rpc endpoint");
Props akkaRpcActorProps = Props.create(AkkaRpcActor.class, rpcEndpoint);
+ ActorRef actorRef;
+
+ synchronized (lock) {
+ checkState(!stopped, "RpcService is stopped");
+ actorRef = actorSystem.actorOf(akkaRpcActorProps);
+ actors.add(actorRef);
+ }
- ActorRef actorRef = actorSystem.actorOf(akkaRpcActorProps);
- actors.add(actorRef);
+ LOG.info("Starting RPC endpoint for {} at {} .", rpcEndpoint.getClass().getName(), actorRef.path());
InvocationHandler akkaInvocationHandler = new AkkaInvocationHandler(actorRef, timeout);
+ // Rather than using the System ClassLoader directly, we derive the ClassLoader
+ // from this class . That works better in cases where Flink runs embedded and all Flink
+ // code is loaded dynamically (for example from an OSGI bundle) through a custom ClassLoader
+ ClassLoader classLoader = getClass().getClassLoader();
+
@SuppressWarnings("unchecked")
C self = (C) Proxy.newProxyInstance(
- ClassLoader.getSystemClassLoader(),
+ classLoader,
new Class<?>[]{rpcEndpoint.getSelfGatewayType(), MainThreadExecutor.class, AkkaGateway.class},
akkaInvocationHandler);
@@ -110,35 +133,56 @@ public class AkkaRpcService implements RpcService {
}
@Override
- public <C extends RpcGateway> void stopServer(C selfGateway) {
+ public void stopServer(RpcGateway selfGateway) {
if (selfGateway instanceof AkkaGateway) {
AkkaGateway akkaClient = (AkkaGateway) selfGateway;
- if (actors.contains(akkaClient.getRpcServer())) {
- ActorRef selfActorRef = akkaClient.getRpcServer();
-
- LOG.info("Stop Akka rpc actor {}.", selfActorRef.path());
+ boolean fromThisService;
+ synchronized (lock) {
+ if (stopped) {
+ return;
+ } else {
+ fromThisService = actors.remove(akkaClient.getRpcServer());
+ }
+ }
+ if (fromThisService) {
+ ActorRef selfActorRef = akkaClient.getRpcServer();
+ LOG.info("Stopping RPC endpoint {}.", selfActorRef.path());
selfActorRef.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ } else {
+ LOG.debug("RPC endpoint {} already stopped or from different RPC service");
}
}
}
@Override
public void stopService() {
- LOG.info("Stop Akka rpc service.");
- actorSystem.shutdown();
+ LOG.info("Stopping Akka RPC service.");
+
+ synchronized (lock) {
+ if (stopped) {
+ return;
+ }
+
+ stopped = true;
+ actorSystem.shutdown();
+ actors.clear();
+ }
+
actorSystem.awaitTermination();
}
@Override
public <C extends RpcGateway> String getAddress(C selfGateway) {
+ checkState(!stopped, "RpcService is stopped");
+
if (selfGateway instanceof AkkaGateway) {
ActorRef actorRef = ((AkkaGateway) selfGateway).getRpcServer();
return AkkaUtils.getAkkaURL(actorSystem, actorRef);
} else {
String className = AkkaGateway.class.getName();
- throw new RuntimeException("Cannot get address for non " + className + '.');
+ throw new IllegalArgumentException("Cannot get address for non " + className + '.');
}
}
}