You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/09/28 08:20:50 UTC
[07/50] [abbrv] flink git commit: [FLINK-4384] [rpc] Add
"scheduleRunAsync()" to the RpcEndpoint
[FLINK-4384] [rpc] Add "scheduleRunAsync()" to the RpcEndpoint
This closes #2360
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d31e6314
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d31e6314
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d31e6314
Branch: refs/heads/flip-6
Commit: d31e631414ac2e5d90e519c5e60b517f382c357b
Parents: 9e6ff60
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Aug 11 19:10:48 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Sep 27 19:24:55 2016 +0200
----------------------------------------------------------------------
.../flink/runtime/rpc/MainThreadExecutor.java | 9 +
.../apache/flink/runtime/rpc/RpcEndpoint.java | 12 ++
.../runtime/rpc/akka/AkkaInvocationHandler.java | 13 +-
.../flink/runtime/rpc/akka/AkkaRpcActor.java | 15 +-
.../runtime/rpc/akka/messages/RunAsync.java | 24 ++-
.../runtime/rpc/akka/AkkaRpcServiceTest.java | 3 +
.../flink/runtime/rpc/akka/AsyncCallsTest.java | 216 +++++++++++++++++++
7 files changed, 286 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d31e6314/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java
index 882c1b7..4efb382 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java
@@ -52,4 +52,13 @@ public interface MainThreadExecutor {
* @return Future of the callable result
*/
<V> Future<V> callAsync(Callable<V> callable, Timeout callTimeout);
+
+ /**
+ * Execute the runnable in the main thread of the underlying RPC endpoint, with
+ * a delay of the given number of milliseconds.
+ *
+ * @param runnable Runnable to be executed
+ * @param delay The delay, in milliseconds, after which the runnable will be executed
+ */
+ void scheduleRunAsync(Runnable runnable, long delay);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d31e6314/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
index aef0803..44933d5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
@@ -28,6 +28,7 @@ import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -168,6 +169,17 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
}
/**
+ * Execute the runnable in the main thread of the underlying RPC endpoint, with
+ * a delay of the given number of milliseconds.
+ *
+ * @param runnable Runnable to be executed
+ * @param delay The delay after which the runnable will be executed
+ */
+ public void scheduleRunAsync(Runnable runnable, long delay, TimeUnit unit) {
+ ((MainThreadExecutor) self).scheduleRunAsync(runnable, unit.toMillis(delay));
+ }
+
+ /**
* Execute the callable in the main thread of the underlying RPC service, returning a future for
* the result of the callable. If the callable is not completed within the given timeout, then
* the future will be failed with a {@link java.util.concurrent.TimeoutException}.
http://git-wip-us.apache.org/repos/asf/flink/blob/d31e6314/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
index e8e383a..580b161 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
@@ -38,6 +38,9 @@ import java.lang.reflect.Method;
import java.util.BitSet;
import java.util.concurrent.Callable;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
/**
* Invocation handler to be used with a {@link AkkaRpcActor}. The invocation handler wraps the
* rpc in a {@link RpcInvocation} message and then sends it to the {@link AkkaRpcActor} where it is
@@ -106,9 +109,17 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThrea
@Override
public void runAsync(Runnable runnable) {
+ scheduleRunAsync(runnable, 0);
+ }
+
+ @Override
+ public void scheduleRunAsync(Runnable runnable, long delay) {
+ checkNotNull(runnable, "runnable");
+ checkArgument(delay >= 0, "delay must be zero or greater");
+
// Unfortunately I couldn't find a way to allow only local communication. Therefore, the
// runnable field is transient transient
- rpcServer.tell(new RunAsync(runnable), ActorRef.noSender());
+ rpcServer.tell(new RunAsync(runnable, delay), ActorRef.noSender());
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/d31e6314/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
index 57da38a..18ccf1b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.rpc.akka;
+import akka.actor.ActorRef;
import akka.actor.Status;
import akka.actor.UntypedActor;
import akka.pattern.Patterns;
@@ -30,9 +31,11 @@ import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
import java.lang.reflect.Method;
import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
/**
* Akka rpc actor which receives {@link RpcInvocation}, {@link RunAsync} and {@link CallAsync}
@@ -152,13 +155,23 @@ class AkkaRpcActor<C extends RpcGateway, T extends RpcEndpoint<C>> extends Untyp
"{} is only supported with local communication.",
runAsync.getClass().getName(),
runAsync.getClass().getName());
- } else {
+ }
+ else if (runAsync.getDelay() == 0) {
+ // run immediately
try {
runAsync.getRunnable().run();
} catch (final Throwable e) {
LOG.error("Caught exception while executing runnable in main thread.", e);
}
}
+ else {
+ // schedule for later. send a new message after the delay, which will then be immediately executed
+ FiniteDuration delay = new FiniteDuration(runAsync.getDelay(), TimeUnit.MILLISECONDS);
+ RunAsync message = new RunAsync(runAsync.getRunnable(), 0);
+
+ getContext().system().scheduler().scheduleOnce(delay, getSelf(), message,
+ getContext().dispatcher(), ActorRef.noSender());
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/d31e6314/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunAsync.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunAsync.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunAsync.java
index fb95852..c18906c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunAsync.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunAsync.java
@@ -18,23 +18,39 @@
package org.apache.flink.runtime.rpc.akka.messages;
-import org.apache.flink.util.Preconditions;
-
import java.io.Serializable;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
/**
* Message for asynchronous runnable invocations
*/
public final class RunAsync implements Serializable {
private static final long serialVersionUID = -3080595100695371036L;
+ /** The runnable to be executed. Transient, so it gets lost upon serialization */
private final transient Runnable runnable;
- public RunAsync(Runnable runnable) {
- this.runnable = Preconditions.checkNotNull(runnable);
+ /** The delay after which the runnable should be called */
+ private final long delay;
+
+ /**
+ *
+ * @param runnable The Runnable to run.
+ * @param delay The delay in milliseconds. Zero indicates immediate execution.
+ */
+ public RunAsync(Runnable runnable, long delay) {
+ checkArgument(delay >= 0);
+ this.runnable = checkNotNull(runnable);
+ this.delay = delay;
}
public Runnable getRunnable() {
return runnable;
}
+
+ public long getDelay() {
+ return delay;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d31e6314/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
index a4e1d7f..5e37e10 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
@@ -21,6 +21,9 @@ package org.apache.flink.runtime.rpc.akka;
import akka.actor.ActorSystem;
import akka.util.Timeout;
import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.jobmaster.JobMaster;
import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.rpc.resourcemanager.ResourceManager;
http://git-wip-us.apache.org/repos/asf/flink/blob/d31e6314/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AsyncCallsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AsyncCallsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AsyncCallsTest.java
new file mode 100644
index 0000000..f2ce52d
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AsyncCallsTest.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka;
+
+import akka.actor.ActorSystem;
+import akka.util.Timeout;
+
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcMethod;
+import org.apache.flink.runtime.rpc.RpcService;
+
+import org.junit.AfterClass;
+import org.junit.Test;
+
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.junit.Assert.*;
+
+public class AsyncCallsTest {
+
+ // ------------------------------------------------------------------------
+ // shared test members
+ // ------------------------------------------------------------------------
+
+ private static ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem();
+
+ private static AkkaRpcService akkaRpcService =
+ new AkkaRpcService(actorSystem, new Timeout(10000, TimeUnit.MILLISECONDS));
+
+ @AfterClass
+ public static void shutdown() {
+ akkaRpcService.stopService();
+ actorSystem.shutdown();
+ }
+
+
+ // ------------------------------------------------------------------------
+ // tests
+ // ------------------------------------------------------------------------
+
+ @Test
+ public void testScheduleWithNoDelay() throws Exception {
+
+ // to collect all the thread references
+ final ReentrantLock lock = new ReentrantLock();
+ final AtomicBoolean concurrentAccess = new AtomicBoolean(false);
+
+ TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService, lock);
+ TestGateway gateway = testEndpoint.getSelf();
+
+ // a bunch of gateway calls
+ gateway.someCall();
+ gateway.anotherCall();
+ gateway.someCall();
+
+ // run something asynchronously
+ for (int i = 0; i < 10000; i++) {
+ testEndpoint.runAsync(new Runnable() {
+ @Override
+ public void run() {
+ boolean holdsLock = lock.tryLock();
+ if (holdsLock) {
+ lock.unlock();
+ } else {
+ concurrentAccess.set(true);
+ }
+ }
+ });
+ }
+
+ Future<String> result = testEndpoint.callAsync(new Callable<String>() {
+ @Override
+ public String call() throws Exception {
+ boolean holdsLock = lock.tryLock();
+ if (holdsLock) {
+ lock.unlock();
+ } else {
+ concurrentAccess.set(true);
+ }
+ return "test";
+ }
+ }, new Timeout(30, TimeUnit.SECONDS));
+ String str = Await.result(result, new FiniteDuration(30, TimeUnit.SECONDS));
+ assertEquals("test", str);
+
+ // validate that no concurrent access happened
+ assertFalse("Rpc Endpoint had concurrent access", testEndpoint.hasConcurrentAccess());
+ assertFalse("Rpc Endpoint had concurrent access", concurrentAccess.get());
+
+ akkaRpcService.stopServer(testEndpoint.getSelf());
+ }
+
+ @Test
+ public void testScheduleWithDelay() throws Exception {
+
+ // to collect all the thread references
+ final ReentrantLock lock = new ReentrantLock();
+ final AtomicBoolean concurrentAccess = new AtomicBoolean(false);
+ final OneShotLatch latch = new OneShotLatch();
+
+ final long delay = 200;
+
+ TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService, lock);
+
+ // run something asynchronously
+ testEndpoint.runAsync(new Runnable() {
+ @Override
+ public void run() {
+ boolean holdsLock = lock.tryLock();
+ if (holdsLock) {
+ lock.unlock();
+ } else {
+ concurrentAccess.set(true);
+ }
+ }
+ });
+
+ final long start = System.nanoTime();
+
+ testEndpoint.scheduleRunAsync(new Runnable() {
+ @Override
+ public void run() {
+ boolean holdsLock = lock.tryLock();
+ if (holdsLock) {
+ lock.unlock();
+ } else {
+ concurrentAccess.set(true);
+ }
+ latch.trigger();
+ }
+ }, delay, TimeUnit.MILLISECONDS);
+
+ latch.await();
+ final long stop = System.nanoTime();
+
+ // validate that no concurrent access happened
+ assertFalse("Rpc Endpoint had concurrent access", testEndpoint.hasConcurrentAccess());
+ assertFalse("Rpc Endpoint had concurrent access", concurrentAccess.get());
+
+ assertTrue("call was not properly delayed", ((stop - start) / 1000000) >= delay);
+ }
+
+ // ------------------------------------------------------------------------
+ // test RPC endpoint
+ // ------------------------------------------------------------------------
+
+ interface TestGateway extends RpcGateway {
+
+ void someCall();
+
+ void anotherCall();
+ }
+
+ @SuppressWarnings("unused")
+ public static class TestEndpoint extends RpcEndpoint<TestGateway> {
+
+ private final ReentrantLock lock;
+
+ private volatile boolean concurrentAccess;
+
+ public TestEndpoint(RpcService rpcService, ReentrantLock lock) {
+ super(rpcService);
+ this.lock = lock;
+ }
+
+ @RpcMethod
+ public void someCall() {
+ boolean holdsLock = lock.tryLock();
+ if (holdsLock) {
+ lock.unlock();
+ } else {
+ concurrentAccess = true;
+ }
+ }
+
+ @RpcMethod
+ public void anotherCall() {
+ boolean holdsLock = lock.tryLock();
+ if (holdsLock) {
+ lock.unlock();
+ } else {
+ concurrentAccess = true;
+ }
+ }
+
+ public boolean hasConcurrentAccess() {
+ return concurrentAccess;
+ }
+ }
+}