You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/09/21 09:52:54 UTC

[21/50] [abbrv] flink git commit: [FLINK-4384] [rpc] Add "scheduleRunAsync()" to the RpcEndpoint

[FLINK-4384] [rpc] Add "scheduleRunAsync()" to the RpcEndpoint

This closes #2360


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f5614a4c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f5614a4c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f5614a4c

Branch: refs/heads/flip-6
Commit: f5614a4c352692e39218de866ad68331b28fb7fe
Parents: f5cf6b5
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Aug 11 19:10:48 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Sep 21 11:39:13 2016 +0200

----------------------------------------------------------------------
 .../flink/runtime/rpc/MainThreadExecutor.java   |   9 +
 .../apache/flink/runtime/rpc/RpcEndpoint.java   |  12 ++
 .../runtime/rpc/akka/AkkaInvocationHandler.java |  13 +-
 .../flink/runtime/rpc/akka/AkkaRpcActor.java    |  15 +-
 .../runtime/rpc/akka/messages/RunAsync.java     |  24 ++-
 .../runtime/rpc/akka/AkkaRpcServiceTest.java    |   3 +
 .../flink/runtime/rpc/akka/AsyncCallsTest.java  | 216 +++++++++++++++++++
 7 files changed, 286 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f5614a4c/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java
index 882c1b7..4efb382 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java
@@ -52,4 +52,13 @@ public interface MainThreadExecutor {
 	 * @return Future of the callable result
 	 */
 	<V> Future<V> callAsync(Callable<V> callable, Timeout callTimeout);
+
+	/**
+	 * Execute the runnable in the main thread of the underlying RPC endpoint, with
+	 * a delay of the given number of milliseconds.
+	 *
+	 * @param runnable Runnable to be executed
+	 * @param delay    The delay, in milliseconds, after which the runnable will be executed
+	 */
+	void scheduleRunAsync(Runnable runnable, long delay);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f5614a4c/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
index aef0803..44933d5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
@@ -28,6 +28,7 @@ import scala.concurrent.ExecutionContext;
 import scala.concurrent.Future;
 
 import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -168,6 +169,17 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
 	}
 
 	/**
+	 * Execute the runnable in the main thread of the underlying RPC endpoint, with
+	 * a delay of the given number of milliseconds.
+	 *
+	 * @param runnable Runnable to be executed
+	 * @param delay    The delay after which the runnable will be executed
+	 */
+	public void scheduleRunAsync(Runnable runnable, long delay, TimeUnit unit) {
+		((MainThreadExecutor) self).scheduleRunAsync(runnable, unit.toMillis(delay));
+	}
+
+	/**
 	 * Execute the callable in the main thread of the underlying RPC service, returning a future for
 	 * the result of the callable. If the callable is not completed within the given timeout, then
 	 * the future will be failed with a {@link java.util.concurrent.TimeoutException}.

http://git-wip-us.apache.org/repos/asf/flink/blob/f5614a4c/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
index e8e383a..580b161 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
@@ -38,6 +38,9 @@ import java.lang.reflect.Method;
 import java.util.BitSet;
 import java.util.concurrent.Callable;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
 /**
  * Invocation handler to be used with a {@link AkkaRpcActor}. The invocation handler wraps the
  * rpc in a {@link RpcInvocation} message and then sends it to the {@link AkkaRpcActor} where it is
@@ -106,9 +109,17 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThrea
 
 	@Override
 	public void runAsync(Runnable runnable) {
+		scheduleRunAsync(runnable, 0);
+	}
+
+	@Override
+	public void scheduleRunAsync(Runnable runnable, long delay) {
+		checkNotNull(runnable, "runnable");
+		checkArgument(delay >= 0, "delay must be zero or greater");
+		
 		// Unfortunately I couldn't find a way to allow only local communication. Therefore, the
 		// runnable field is transient transient
-		rpcServer.tell(new RunAsync(runnable), ActorRef.noSender());
+		rpcServer.tell(new RunAsync(runnable, delay), ActorRef.noSender());
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/f5614a4c/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
index 57da38a..18ccf1b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.rpc.akka;
 
+import akka.actor.ActorRef;
 import akka.actor.Status;
 import akka.actor.UntypedActor;
 import akka.pattern.Patterns;
@@ -30,9 +31,11 @@ import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
 
 import java.lang.reflect.Method;
 import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Akka rpc actor which receives {@link RpcInvocation}, {@link RunAsync} and {@link CallAsync}
@@ -152,13 +155,23 @@ class AkkaRpcActor<C extends RpcGateway, T extends RpcEndpoint<C>> extends Untyp
 				"{} is only supported with local communication.",
 				runAsync.getClass().getName(),
 				runAsync.getClass().getName());
-		} else {
+		}
+		else if (runAsync.getDelay() == 0) {
+			// run immediately
 			try {
 				runAsync.getRunnable().run();
 			} catch (final Throwable e) {
 				LOG.error("Caught exception while executing runnable in main thread.", e);
 			}
 		}
+		else {
+			// schedule for later. send a new message after the delay, which will then be immediately executed 
+			FiniteDuration delay = new FiniteDuration(runAsync.getDelay(), TimeUnit.MILLISECONDS);
+			RunAsync message = new RunAsync(runAsync.getRunnable(), 0);
+
+			getContext().system().scheduler().scheduleOnce(delay, getSelf(), message,
+					getContext().dispatcher(), ActorRef.noSender());
+		}
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/f5614a4c/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunAsync.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunAsync.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunAsync.java
index fb95852..c18906c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunAsync.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunAsync.java
@@ -18,23 +18,39 @@
 
 package org.apache.flink.runtime.rpc.akka.messages;
 
-import org.apache.flink.util.Preconditions;
-
 import java.io.Serializable;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
 /**
  * Message for asynchronous runnable invocations
  */
 public final class RunAsync implements Serializable {
 	private static final long serialVersionUID = -3080595100695371036L;
 
+	/** The runnable to be executed. Transient, so it gets lost upon serialization */ 
 	private final transient Runnable runnable;
 
-	public RunAsync(Runnable runnable) {
-		this.runnable = Preconditions.checkNotNull(runnable);
+	/** The delay after which the runnable should be called */
+	private final long delay;
+
+	/**
+	 * 
+	 * @param runnable  The Runnable to run.
+	 * @param delay     The delay in milliseconds. Zero indicates immediate execution.
+	 */
+	public RunAsync(Runnable runnable, long delay) {
+		checkArgument(delay >= 0);
+		this.runnable = checkNotNull(runnable);
+		this.delay = delay;
 	}
 
 	public Runnable getRunnable() {
 		return runnable;
 	}
+
+	public long getDelay() {
+		return delay;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f5614a4c/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
index a4e1d7f..5e37e10 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
@@ -21,6 +21,9 @@ package org.apache.flink.runtime.rpc.akka;
 import akka.actor.ActorSystem;
 import akka.util.Timeout;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.jobmaster.JobMaster;
 import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rpc.resourcemanager.ResourceManager;

http://git-wip-us.apache.org/repos/asf/flink/blob/f5614a4c/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AsyncCallsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AsyncCallsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AsyncCallsTest.java
new file mode 100644
index 0000000..f2ce52d
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AsyncCallsTest.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka;
+
+import akka.actor.ActorSystem;
+import akka.util.Timeout;
+
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcMethod;
+import org.apache.flink.runtime.rpc.RpcService;
+
+import org.junit.AfterClass;
+import org.junit.Test;
+
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.junit.Assert.*;
+
+public class AsyncCallsTest {
+
+	// ------------------------------------------------------------------------
+	//  shared test members
+	// ------------------------------------------------------------------------
+
+	private static ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem();
+
+	private static AkkaRpcService akkaRpcService = 
+			new AkkaRpcService(actorSystem, new Timeout(10000, TimeUnit.MILLISECONDS));
+
+	@AfterClass
+	public static void shutdown() {
+		akkaRpcService.stopService();
+		actorSystem.shutdown();
+	}
+
+
+	// ------------------------------------------------------------------------
+	//  tests
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testScheduleWithNoDelay() throws Exception {
+
+		// to collect all the thread references
+		final ReentrantLock lock = new ReentrantLock();
+		final AtomicBoolean concurrentAccess = new AtomicBoolean(false);
+
+		TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService, lock);
+		TestGateway gateway = testEndpoint.getSelf();
+
+		// a bunch of gateway calls
+		gateway.someCall();
+		gateway.anotherCall();
+		gateway.someCall();
+
+		// run something asynchronously
+		for (int i = 0; i < 10000; i++) {
+			testEndpoint.runAsync(new Runnable() {
+				@Override
+				public void run() {
+					boolean holdsLock = lock.tryLock();
+					if (holdsLock) {
+						lock.unlock();
+					} else {
+						concurrentAccess.set(true);
+					}
+				}
+			});
+		}
+	
+		Future<String> result = testEndpoint.callAsync(new Callable<String>() {
+			@Override
+			public String call() throws Exception {
+				boolean holdsLock = lock.tryLock();
+				if (holdsLock) {
+					lock.unlock();
+				} else {
+					concurrentAccess.set(true);
+				}
+				return "test";
+			}
+		}, new Timeout(30, TimeUnit.SECONDS));
+		String str = Await.result(result, new FiniteDuration(30, TimeUnit.SECONDS));
+		assertEquals("test", str);
+
+		// validate that no concurrent access happened
+		assertFalse("Rpc Endpoint had concurrent access", testEndpoint.hasConcurrentAccess());
+		assertFalse("Rpc Endpoint had concurrent access", concurrentAccess.get());
+
+		akkaRpcService.stopServer(testEndpoint.getSelf());
+	}
+
+	@Test
+	public void testScheduleWithDelay() throws Exception {
+
+		// to collect all the thread references
+		final ReentrantLock lock = new ReentrantLock();
+		final AtomicBoolean concurrentAccess = new AtomicBoolean(false);
+		final OneShotLatch latch = new OneShotLatch();
+
+		final long delay = 200;
+
+		TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService, lock);
+
+		// run something asynchronously
+		testEndpoint.runAsync(new Runnable() {
+			@Override
+			public void run() {
+				boolean holdsLock = lock.tryLock();
+				if (holdsLock) {
+					lock.unlock();
+				} else {
+					concurrentAccess.set(true);
+				}
+			}
+		});
+
+		final long start = System.nanoTime();
+
+		testEndpoint.scheduleRunAsync(new Runnable() {
+			@Override
+			public void run() {
+				boolean holdsLock = lock.tryLock();
+				if (holdsLock) {
+					lock.unlock();
+				} else {
+					concurrentAccess.set(true);
+				}
+				latch.trigger();
+			}
+		}, delay, TimeUnit.MILLISECONDS);
+
+		latch.await();
+		final long stop = System.nanoTime();
+
+		// validate that no concurrent access happened
+		assertFalse("Rpc Endpoint had concurrent access", testEndpoint.hasConcurrentAccess());
+		assertFalse("Rpc Endpoint had concurrent access", concurrentAccess.get());
+
+		assertTrue("call was not properly delayed", ((stop - start) / 1000000) >= delay);
+	}
+
+	// ------------------------------------------------------------------------
+	//  test RPC endpoint
+	// ------------------------------------------------------------------------
+	
+	interface TestGateway extends RpcGateway {
+
+		void someCall();
+
+		void anotherCall();
+	}
+
+	@SuppressWarnings("unused")
+	public static class TestEndpoint extends RpcEndpoint<TestGateway> {
+
+		private final ReentrantLock lock;
+
+		private volatile boolean concurrentAccess;
+
+		public TestEndpoint(RpcService rpcService, ReentrantLock lock) {
+			super(rpcService);
+			this.lock = lock;
+		}
+
+		@RpcMethod
+		public void someCall() {
+			boolean holdsLock = lock.tryLock();
+			if (holdsLock) {
+				lock.unlock();
+			} else {
+				concurrentAccess = true;
+			}
+		}
+
+		@RpcMethod
+		public void anotherCall() {
+			boolean holdsLock = lock.tryLock();
+			if (holdsLock) {
+				lock.unlock();
+			} else {
+				concurrentAccess = true;
+			}
+		}
+
+		public boolean hasConcurrentAccess() {
+			return concurrentAccess;
+		}
+	}
+}