You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/11/01 08:40:48 UTC
[21/50] [abbrv] flink git commit: [FLINK-4694] [rpc] Add termination
futures to RpcEndpoint and RpcService
[FLINK-4694] [rpc] Add termination futures to RpcEndpoint and RpcService
The termination futures can be used to wait for the termination of the respective component.
This closes #2558.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1a9e6447
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1a9e6447
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1a9e6447
Branch: refs/heads/flip-6
Commit: 1a9e6447dbb63d138f97b55c15d1158b66e32286
Parents: 8adceed
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Sep 27 18:17:42 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Nov 1 09:39:30 2016 +0100
----------------------------------------------------------------------
.../concurrent/impl/FlinkCompletableFuture.java | 9 ++---
.../apache/flink/runtime/rpc/RpcEndpoint.java | 9 +++++
.../apache/flink/runtime/rpc/RpcService.java | 7 ++++
.../apache/flink/runtime/rpc/SelfGateway.java | 34 ++++++++++++++++++
.../runtime/rpc/akka/AkkaInvocationHandler.java | 22 ++++++++++--
.../flink/runtime/rpc/akka/AkkaRpcActor.java | 17 ++++++++-
.../flink/runtime/rpc/akka/AkkaRpcService.java | 32 +++++++++++++++--
.../runtime/rpc/TestingSerialRpcService.java | 10 +++++-
.../runtime/rpc/akka/AkkaRpcActorTest.java | 36 ++++++++++++++++++++
.../runtime/rpc/akka/AkkaRpcServiceTest.java | 29 ++++++++++++++++
10 files changed, 193 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/1a9e6447/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkCompletableFuture.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkCompletableFuture.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkCompletableFuture.java
index c8b86ed..14686d7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkCompletableFuture.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkCompletableFuture.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.concurrent.impl;
import akka.dispatch.Futures;
import org.apache.flink.runtime.concurrent.CompletableFuture;
-import org.apache.flink.util.Preconditions;
import scala.concurrent.Promise;
import scala.concurrent.Promise$;
@@ -63,10 +62,12 @@ public class FlinkCompletableFuture<T> extends FlinkFuture<T> implements Complet
@Override
public boolean completeExceptionally(Throwable t) {
- Preconditions.checkNotNull(t);
-
try {
- promise.failure(t);
+ if (t == null) {
+ promise.failure(new NullPointerException("Throwable was null."));
+ } else {
+ promise.failure(t);
+ }
return true;
} catch (IllegalStateException e) {
http://git-wip-us.apache.org/repos/asf/flink/blob/1a9e6447/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
index 79961f7..f93a2e2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
@@ -173,6 +173,15 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
return rpcService;
}
+ /**
+ * Return a future which is completed when the rpc endpoint has been terminated.
+ *
+ * @return Future which is completed when the rpc endpoint has been terminated.
+ */
+ public Future<Void> getTerminationFuture() {
+ return ((SelfGateway)self).getTerminationFuture();
+ }
+
// ------------------------------------------------------------------------
// Asynchronous executions
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/1a9e6447/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
index 96844ed..2052f98 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
@@ -77,6 +77,13 @@ public interface RpcService {
void stopService();
/**
+ * Returns a future indicating when the RPC service has been shut down.
+ *
+ * @return Termination future
+ */
+ Future<Void> getTerminationFuture();
+
+ /**
* Gets the executor, provided by this RPC service. This executor can be used for example for
* the {@code handleAsync(...)} or {@code thenAcceptAsync(...)} methods of futures.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/1a9e6447/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/SelfGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/SelfGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/SelfGateway.java
new file mode 100644
index 0000000..ed8ef9d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/SelfGateway.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import org.apache.flink.runtime.concurrent.Future;
+
+/**
+ * Interface for self gateways
+ */
+public interface SelfGateway {
+
+ /**
+ * Return a future which is completed when the rpc endpoint has been terminated.
+ *
+ * @return Future indicating when the rpc endpoint has been terminated
+ */
+ Future<Void> getTerminationFuture();
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1a9e6447/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
index 8f4deff..709ff92 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
import org.apache.flink.runtime.rpc.MainThreadExecutable;
+import org.apache.flink.runtime.rpc.SelfGateway;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcTimeout;
import org.apache.flink.runtime.rpc.StartStoppable;
@@ -52,7 +53,7 @@ import static org.apache.flink.util.Preconditions.checkArgument;
* rpc in a {@link LocalRpcInvocation} message and then sends it to the {@link AkkaRpcActor} where it is
* executed.
*/
-class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThreadExecutable, StartStoppable {
+class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThreadExecutable, StartStoppable, SelfGateway {
private static final Logger LOG = Logger.getLogger(AkkaInvocationHandler.class);
private final String address;
@@ -67,12 +68,22 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThrea
private final long maximumFramesize;
- AkkaInvocationHandler(String address, ActorRef rpcEndpoint, Time timeout, long maximumFramesize) {
+ // null if gateway; otherwise non-null
+ private final Future<Void> terminationFuture;
+
+ AkkaInvocationHandler(
+ String address,
+ ActorRef rpcEndpoint,
+ Time timeout,
+ long maximumFramesize,
+ Future<Void> terminationFuture) {
+
this.address = Preconditions.checkNotNull(address);
this.rpcEndpoint = Preconditions.checkNotNull(rpcEndpoint);
this.isLocal = this.rpcEndpoint.path().address().hasLocalScope();
this.timeout = Preconditions.checkNotNull(timeout);
this.maximumFramesize = maximumFramesize;
+ this.terminationFuture = terminationFuture;
}
@Override
@@ -83,7 +94,7 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThrea
if (declaringClass.equals(AkkaGateway.class) || declaringClass.equals(MainThreadExecutable.class) ||
declaringClass.equals(Object.class) || declaringClass.equals(StartStoppable.class) ||
- declaringClass.equals(RpcGateway.class)) {
+ declaringClass.equals(RpcGateway.class) || declaringClass.equals(SelfGateway.class)) {
result = method.invoke(this, args);
} else {
String methodName = method.getName();
@@ -300,4 +311,9 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThrea
public String getAddress() {
return address;
}
+
+ @Override
+ public Future<Void> getTerminationFuture() {
+ return terminationFuture;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1a9e6447/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
index 1b456a7..c21383a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
@@ -24,6 +24,7 @@ import akka.actor.UntypedActorWithStash;
import akka.dispatch.Futures;
import akka.japi.Procedure;
import akka.pattern.Patterns;
+import org.apache.flink.runtime.concurrent.CompletableFuture;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
import org.apache.flink.runtime.rpc.MainThreadValidatorUtil;
@@ -76,9 +77,23 @@ class AkkaRpcActor<C extends RpcGateway, T extends RpcEndpoint<C>> extends Untyp
/** the helper that tracks whether calls come from the main thread */
private final MainThreadValidatorUtil mainThreadValidator;
- AkkaRpcActor(final T rpcEndpoint) {
+ private final CompletableFuture<Void> terminationFuture;
+
+ AkkaRpcActor(final T rpcEndpoint, final CompletableFuture<Void> terminationFuture) {
this.rpcEndpoint = checkNotNull(rpcEndpoint, "rpc endpoint");
this.mainThreadValidator = new MainThreadValidatorUtil(rpcEndpoint);
+ this.terminationFuture = checkNotNull(terminationFuture);
+ }
+
+ @Override
+ public void postStop() {
+ super.postStop();
+
+ // IMPORTANT: This only works if we don't use a restarting supervisor strategy. Otherwise
+ // we would complete the future and let the actor system restart the actor with a completed
+ // future.
+ // Complete the termination future so that others know that we've stopped.
+ terminationFuture.complete(null);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/1a9e6447/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index fb7896a..44719c8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -32,9 +32,12 @@ import akka.dispatch.Mapper;
import akka.pattern.Patterns;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.concurrent.CompletableFuture;
import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
import org.apache.flink.runtime.rpc.MainThreadExecutable;
+import org.apache.flink.runtime.rpc.SelfGateway;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
@@ -131,7 +134,12 @@ public class AkkaRpcService implements RpcService {
final String address = AkkaUtils.getAkkaURL(actorSystem, actorRef);
- InvocationHandler akkaInvocationHandler = new AkkaInvocationHandler(address, actorRef, timeout, maximumFramesize);
+ InvocationHandler akkaInvocationHandler = new AkkaInvocationHandler(
+ address,
+ actorRef,
+ timeout,
+ maximumFramesize,
+ null);
// Rather than using the System ClassLoader directly, we derive the ClassLoader
// from this class . That works better in cases where Flink runs embedded and all Flink
@@ -156,7 +164,8 @@ public class AkkaRpcService implements RpcService {
public <C extends RpcGateway, S extends RpcEndpoint<C>> C startServer(S rpcEndpoint) {
checkNotNull(rpcEndpoint, "rpc endpoint");
- Props akkaRpcActorProps = Props.create(AkkaRpcActor.class, rpcEndpoint);
+ CompletableFuture<Void> terminationFuture = new FlinkCompletableFuture<>();
+ Props akkaRpcActorProps = Props.create(AkkaRpcActor.class, rpcEndpoint, terminationFuture);
ActorRef actorRef;
synchronized (lock) {
@@ -169,7 +178,12 @@ public class AkkaRpcService implements RpcService {
final String address = AkkaUtils.getAkkaURL(actorSystem, actorRef);
- InvocationHandler akkaInvocationHandler = new AkkaInvocationHandler(address, actorRef, timeout, maximumFramesize);
+ InvocationHandler akkaInvocationHandler = new AkkaInvocationHandler(
+ address,
+ actorRef,
+ timeout,
+ maximumFramesize,
+ terminationFuture);
// Rather than using the System ClassLoader directly, we derive the ClassLoader
// from this class . That works better in cases where Flink runs embedded and all Flink
@@ -181,6 +195,7 @@ public class AkkaRpcService implements RpcService {
classLoader,
new Class<?>[]{
rpcEndpoint.getSelfGatewayType(),
+ SelfGateway.class,
MainThreadExecutable.class,
StartStoppable.class,
AkkaGateway.class},
@@ -231,6 +246,17 @@ public class AkkaRpcService implements RpcService {
}
@Override
+ public Future<Void> getTerminationFuture() {
+ return FlinkFuture.supplyAsync(new Callable<Void>(){
+ @Override
+ public Void call() throws Exception {
+ actorSystem.awaitTermination();
+ return null;
+ }
+ }, getExecutor());
+ }
+
+ @Override
public Executor getExecutor() {
return actorSystem.dispatcher();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1a9e6447/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
index 2a004c5..88906a7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.rpc;
import akka.dispatch.Futures;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.concurrent.CompletableFuture;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.util.DirectExecutorService;
@@ -39,7 +40,6 @@ import java.util.concurrent.TimeUnit;
import static org.apache.flink.util.Preconditions.checkNotNull;
-
/**
* An RPC Service implementation for testing. This RPC service directly executes all asynchronous
* calls one by one in the calling thread.
@@ -48,10 +48,12 @@ public class TestingSerialRpcService implements RpcService {
private final DirectExecutorService executorService;
private final ConcurrentHashMap<String, RpcGateway> registeredConnections;
+ private final CompletableFuture<Void> terminationFuture;
public TestingSerialRpcService() {
executorService = new DirectExecutorService();
this.registeredConnections = new ConcurrentHashMap<>(16);
+ this.terminationFuture = new FlinkCompletableFuture<>();
}
@Override
@@ -89,6 +91,12 @@ public class TestingSerialRpcService implements RpcService {
public void stopService() {
executorService.shutdown();
registeredConnections.clear();
+ terminationFuture.complete(null);
+ }
+
+ @Override
+ public Future<Void> getTerminationFuture() {
+ return terminationFuture;
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/1a9e6447/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
index 5d76024..ba8eb11 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
@@ -22,6 +22,8 @@ import akka.actor.ActorSystem;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcMethod;
@@ -32,9 +34,15 @@ import org.hamcrest.core.Is;
import org.junit.AfterClass;
import org.junit.Test;
+import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -148,6 +156,34 @@ public class AkkaRpcActorTest extends TestLogger {
}
}
+ /**
+ * Tests that we can wait for a RpcEndpoint to terminate.
+ *
+ * @throws ExecutionException
+ * @throws InterruptedException
+ */
+ @Test(timeout=1000)
+ public void testRpcEndpointTerminationFuture() throws ExecutionException, InterruptedException {
+ final DummyRpcEndpoint rpcEndpoint = new DummyRpcEndpoint(akkaRpcService);
+ rpcEndpoint.start();
+
+ Future<Void> terminationFuture = rpcEndpoint.getTerminationFuture();
+
+ assertFalse(terminationFuture.isDone());
+
+ FlinkFuture.supplyAsync(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ rpcEndpoint.shutDown();
+
+ return null;
+ }
+ }, actorSystem.dispatcher());
+
+ // wait until the rpc endpoint has terminated
+ terminationFuture.get();
+ }
+
private interface DummyRpcGateway extends RpcGateway {
Future<Integer> foobar();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1a9e6447/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
index 3388011..7c8defa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
@@ -34,6 +35,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class AkkaRpcServiceTest extends TestLogger {
@@ -120,4 +122,31 @@ public class AkkaRpcServiceTest extends TestLogger {
public void testGetAddress() {
assertEquals(AkkaUtils.getAddress(actorSystem).host().get(), akkaRpcService.getAddress());
}
+
+ /**
+ * Tests that we can wait for the termination of the rpc service
+ *
+ * @throws ExecutionException
+ * @throws InterruptedException
+ */
+ @Test(timeout = 1000)
+ public void testTerminationFuture() throws ExecutionException, InterruptedException {
+ final ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem();
+ final AkkaRpcService rpcService = new AkkaRpcService(actorSystem, Time.milliseconds(1000));
+
+ Future<Void> terminationFuture = rpcService.getTerminationFuture();
+
+ assertFalse(terminationFuture.isDone());
+
+ FlinkFuture.supplyAsync(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ rpcService.stopService();
+
+ return null;
+ }
+ }, actorSystem.dispatcher());
+
+ terminationFuture.get();
+ }
}