You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/10/06 11:48:38 UTC

[18/50] [abbrv] flink git commit: [FLINK-4528] [rpc] Marks main thread execution methods in RpcEndpoint as protected

[FLINK-4528] [rpc] Marks main thread execution methods in RpcEndpoint as protected

Give main thread execution context into the TaskExecutorToResourceManagerConnection


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/99228c21
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/99228c21
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/99228c21

Branch: refs/heads/flip-6
Commit: 99228c2104347728f198c1b50728771088a8a744
Parents: aed3f38
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Aug 29 15:49:59 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Oct 6 13:38:40 2016 +0200

----------------------------------------------------------------------
 .../apache/flink/runtime/rpc/RpcEndpoint.java   |   8 +-
 .../runtime/taskexecutor/TaskExecutor.java      |   7 +-
 ...TaskExecutorToResourceManagerConnection.java |  26 ++-
 .../flink/runtime/rpc/AsyncCallsTest.java       | 216 ++++++++++++++++++
 .../flink/runtime/rpc/akka/AsyncCallsTest.java  | 219 -------------------
 5 files changed, 242 insertions(+), 234 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/99228c21/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
index 7b3f8a1..e9e2b2c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
@@ -161,7 +161,7 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
 	 *
 	 * @return Main thread execution context
 	 */
-	public ExecutionContext getMainThreadExecutionContext() {
+	protected ExecutionContext getMainThreadExecutionContext() {
 		return mainThreadExecutionContext;
 	}
 
@@ -184,7 +184,7 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
 	 *
 	 * @param runnable Runnable to be executed in the main thread of the underlying RPC endpoint
 	 */
-	public void runAsync(Runnable runnable) {
+	protected void runAsync(Runnable runnable) {
 		((MainThreadExecutor) self).runAsync(runnable);
 	}
 
@@ -195,7 +195,7 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
 	 * @param runnable Runnable to be executed
 	 * @param delay    The delay after which the runnable will be executed
 	 */
-	public void scheduleRunAsync(Runnable runnable, long delay, TimeUnit unit) {
+	protected void scheduleRunAsync(Runnable runnable, long delay, TimeUnit unit) {
 		((MainThreadExecutor) self).scheduleRunAsync(runnable, unit.toMillis(delay));
 	}
 
@@ -209,7 +209,7 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
 	 * @param <V> Return type of the callable
 	 * @return Future for the result of the callable.
 	 */
-	public <V> Future<V> callAsync(Callable<V> callable, Timeout timeout) {
+	protected <V> Future<V> callAsync(Callable<V> callable, Timeout timeout) {
 		return ((MainThreadExecutor) self).callAsync(callable, timeout);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/99228c21/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 4871b96..735730b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -176,7 +176,12 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 		if (newLeaderAddress != null) {
 			log.info("Attempting to register at ResourceManager {}", newLeaderAddress);
 			resourceManagerConnection =
-				new TaskExecutorToResourceManagerConnection(log, this, newLeaderAddress, newLeaderId);
+				new TaskExecutorToResourceManagerConnection(
+					log,
+					this,
+					newLeaderAddress,
+					newLeaderId,
+					getMainThreadExecutionContext());
 			resourceManagerConnection.start();
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/99228c21/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
index 25332a0..28062b6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 
 import org.slf4j.Logger;
 
+import scala.concurrent.ExecutionContext;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -55,9 +56,12 @@ public class TaskExecutorToResourceManagerConnection {
 
 	private final String resourceManagerAddress;
 
+	/** Execution context to be used to execute the on complete action of the ResourceManagerRegistration */
+	private final ExecutionContext executionContext;
+
 	private TaskExecutorToResourceManagerConnection.ResourceManagerRegistration pendingRegistration;
 
-	private ResourceManagerGateway registeredResourceManager;
+	private volatile ResourceManagerGateway registeredResourceManager;
 
 	private InstanceID registrationId;
 
@@ -66,15 +70,17 @@ public class TaskExecutorToResourceManagerConnection {
 
 
 	public TaskExecutorToResourceManagerConnection(
-			Logger log,
-			TaskExecutor taskExecutor,
-			String resourceManagerAddress,
-			UUID resourceManagerLeaderId) {
+		Logger log,
+		TaskExecutor taskExecutor,
+		String resourceManagerAddress,
+		UUID resourceManagerLeaderId,
+		ExecutionContext executionContext) {
 
 		this.log = checkNotNull(log);
 		this.taskExecutor = checkNotNull(taskExecutor);
 		this.resourceManagerAddress = checkNotNull(resourceManagerAddress);
 		this.resourceManagerLeaderId = checkNotNull(resourceManagerLeaderId);
+		this.executionContext = checkNotNull(executionContext);
 	}
 
 	// ------------------------------------------------------------------------
@@ -93,22 +99,22 @@ public class TaskExecutorToResourceManagerConnection {
 		pendingRegistration.startRegistration();
 
 		Future<Tuple2<ResourceManagerGateway, TaskExecutorRegistrationSuccess>> future = pendingRegistration.getFuture();
-		
+
 		future.onSuccess(new OnSuccess<Tuple2<ResourceManagerGateway, TaskExecutorRegistrationSuccess>>() {
 			@Override
 			public void onSuccess(Tuple2<ResourceManagerGateway, TaskExecutorRegistrationSuccess> result) {
-				registeredResourceManager = result.f0;
 				registrationId = result.f1.getRegistrationId();
+				registeredResourceManager = result.f0;
 			}
-		}, taskExecutor.getMainThreadExecutionContext());
+		}, executionContext);
 		
 		// this future should only ever fail if there is a bug, not if the registration is declined
 		future.onFailure(new OnFailure() {
 			@Override
 			public void onFailure(Throwable failure) {
-				taskExecutor.onFatalError(failure);
+				taskExecutor.onFatalErrorAsync(failure);
 			}
-		}, taskExecutor.getMainThreadExecutionContext());
+		}, executionContext);
 	}
 
 	public void close() {

http://git-wip-us.apache.org/repos/asf/flink/blob/99228c21/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
new file mode 100644
index 0000000..1791056
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import akka.actor.ActorSystem;
+import akka.util.Timeout;
+
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.akka.AkkaUtils;
+
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.util.TestLogger;
+import org.junit.AfterClass;
+import org.junit.Test;
+
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.junit.Assert.*;
+
+public class AsyncCallsTest extends TestLogger {
+
+	// ------------------------------------------------------------------------
+	//  shared test members
+	// ------------------------------------------------------------------------
+
+	private static ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem();
+
+	private static AkkaRpcService akkaRpcService =
+			new AkkaRpcService(actorSystem, new Timeout(10000, TimeUnit.MILLISECONDS));
+
+	@AfterClass
+	public static void shutdown() {
+		akkaRpcService.stopService();
+		actorSystem.shutdown();
+	}
+
+
+	// ------------------------------------------------------------------------
+	//  tests
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testScheduleWithNoDelay() throws Exception {
+
+		// to collect all the thread references
+		final ReentrantLock lock = new ReentrantLock();
+		final AtomicBoolean concurrentAccess = new AtomicBoolean(false);
+
+		TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService, lock);
+		testEndpoint.start();
+		TestGateway gateway = testEndpoint.getSelf();
+
+		// a bunch of gateway calls
+		gateway.someCall();
+		gateway.anotherCall();
+		gateway.someCall();
+
+		// run something asynchronously
+		for (int i = 0; i < 10000; i++) {
+			testEndpoint.runAsync(new Runnable() {
+				@Override
+				public void run() {
+					boolean holdsLock = lock.tryLock();
+					if (holdsLock) {
+						lock.unlock();
+					} else {
+						concurrentAccess.set(true);
+					}
+				}
+			});
+		}
+	
+		Future<String> result = testEndpoint.callAsync(new Callable<String>() {
+			@Override
+			public String call() throws Exception {
+				boolean holdsLock = lock.tryLock();
+				if (holdsLock) {
+					lock.unlock();
+				} else {
+					concurrentAccess.set(true);
+				}
+				return "test";
+			}
+		}, new Timeout(30, TimeUnit.SECONDS));
+		String str = Await.result(result, new FiniteDuration(30, TimeUnit.SECONDS));
+		assertEquals("test", str);
+
+		// validate that no concurrent access happened
+		assertFalse("Rpc Endpoint had concurrent access", testEndpoint.hasConcurrentAccess());
+		assertFalse("Rpc Endpoint had concurrent access", concurrentAccess.get());
+
+		akkaRpcService.stopServer(testEndpoint.getSelf());
+	}
+
+	@Test
+	public void testScheduleWithDelay() throws Exception {
+
+		// to collect all the thread references
+		final ReentrantLock lock = new ReentrantLock();
+		final AtomicBoolean concurrentAccess = new AtomicBoolean(false);
+		final OneShotLatch latch = new OneShotLatch();
+
+		final long delay = 200;
+
+		TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService, lock);
+		testEndpoint.start();
+
+		// run something asynchronously
+		testEndpoint.runAsync(new Runnable() {
+			@Override
+			public void run() {
+				boolean holdsLock = lock.tryLock();
+				if (holdsLock) {
+					lock.unlock();
+				} else {
+					concurrentAccess.set(true);
+				}
+			}
+		});
+
+		final long start = System.nanoTime();
+
+		testEndpoint.scheduleRunAsync(new Runnable() {
+			@Override
+			public void run() {
+				boolean holdsLock = lock.tryLock();
+				if (holdsLock) {
+					lock.unlock();
+				} else {
+					concurrentAccess.set(true);
+				}
+				latch.trigger();
+			}
+		}, delay, TimeUnit.MILLISECONDS);
+
+		latch.await();
+		final long stop = System.nanoTime();
+
+		// validate that no concurrent access happened
+		assertFalse("Rpc Endpoint had concurrent access", testEndpoint.hasConcurrentAccess());
+		assertFalse("Rpc Endpoint had concurrent access", concurrentAccess.get());
+
+		assertTrue("call was not properly delayed", ((stop - start) / 1000000) >= delay);
+	}
+
+	// ------------------------------------------------------------------------
+	//  test RPC endpoint
+	// ------------------------------------------------------------------------
+	
+	public interface TestGateway extends RpcGateway {
+
+		void someCall();
+
+		void anotherCall();
+	}
+
+	@SuppressWarnings("unused")
+	public static class TestEndpoint extends RpcEndpoint<TestGateway> {
+
+		private final ReentrantLock lock;
+
+		private volatile boolean concurrentAccess;
+
+		public TestEndpoint(RpcService rpcService, ReentrantLock lock) {
+			super(rpcService);
+			this.lock = lock;
+		}
+
+		@RpcMethod
+		public void someCall() {
+			boolean holdsLock = lock.tryLock();
+			if (holdsLock) {
+				lock.unlock();
+			} else {
+				concurrentAccess = true;
+			}
+		}
+
+		@RpcMethod
+		public void anotherCall() {
+			boolean holdsLock = lock.tryLock();
+			if (holdsLock) {
+				lock.unlock();
+			} else {
+				concurrentAccess = true;
+			}
+		}
+
+		public boolean hasConcurrentAccess() {
+			return concurrentAccess;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/99228c21/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AsyncCallsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AsyncCallsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AsyncCallsTest.java
deleted file mode 100644
index d33987c..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AsyncCallsTest.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.akka;
-
-import akka.actor.ActorSystem;
-import akka.util.Timeout;
-
-import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.rpc.RpcEndpoint;
-import org.apache.flink.runtime.rpc.RpcGateway;
-import org.apache.flink.runtime.rpc.RpcMethod;
-import org.apache.flink.runtime.rpc.RpcService;
-
-import org.apache.flink.util.TestLogger;
-import org.junit.AfterClass;
-import org.junit.Test;
-
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReentrantLock;
-
-import static org.junit.Assert.*;
-
-public class AsyncCallsTest extends TestLogger {
-
-	// ------------------------------------------------------------------------
-	//  shared test members
-	// ------------------------------------------------------------------------
-
-	private static ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem();
-
-	private static AkkaRpcService akkaRpcService = 
-			new AkkaRpcService(actorSystem, new Timeout(10000, TimeUnit.MILLISECONDS));
-
-	@AfterClass
-	public static void shutdown() {
-		akkaRpcService.stopService();
-		actorSystem.shutdown();
-	}
-
-
-	// ------------------------------------------------------------------------
-	//  tests
-	// ------------------------------------------------------------------------
-
-	@Test
-	public void testScheduleWithNoDelay() throws Exception {
-
-		// to collect all the thread references
-		final ReentrantLock lock = new ReentrantLock();
-		final AtomicBoolean concurrentAccess = new AtomicBoolean(false);
-
-		TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService, lock);
-		testEndpoint.start();
-		TestGateway gateway = testEndpoint.getSelf();
-
-		// a bunch of gateway calls
-		gateway.someCall();
-		gateway.anotherCall();
-		gateway.someCall();
-
-		// run something asynchronously
-		for (int i = 0; i < 10000; i++) {
-			testEndpoint.runAsync(new Runnable() {
-				@Override
-				public void run() {
-					boolean holdsLock = lock.tryLock();
-					if (holdsLock) {
-						lock.unlock();
-					} else {
-						concurrentAccess.set(true);
-					}
-				}
-			});
-		}
-	
-		Future<String> result = testEndpoint.callAsync(new Callable<String>() {
-			@Override
-			public String call() throws Exception {
-				boolean holdsLock = lock.tryLock();
-				if (holdsLock) {
-					lock.unlock();
-				} else {
-					concurrentAccess.set(true);
-				}
-				return "test";
-			}
-		}, new Timeout(30, TimeUnit.SECONDS));
-		String str = Await.result(result, new FiniteDuration(30, TimeUnit.SECONDS));
-		assertEquals("test", str);
-
-		// validate that no concurrent access happened
-		assertFalse("Rpc Endpoint had concurrent access", testEndpoint.hasConcurrentAccess());
-		assertFalse("Rpc Endpoint had concurrent access", concurrentAccess.get());
-
-		akkaRpcService.stopServer(testEndpoint.getSelf());
-	}
-
-	@Test
-	public void testScheduleWithDelay() throws Exception {
-
-		// to collect all the thread references
-		final ReentrantLock lock = new ReentrantLock();
-		final AtomicBoolean concurrentAccess = new AtomicBoolean(false);
-		final OneShotLatch latch = new OneShotLatch();
-
-		final long delay = 200;
-
-		TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService, lock);
-		testEndpoint.start();
-
-		// run something asynchronously
-		testEndpoint.runAsync(new Runnable() {
-			@Override
-			public void run() {
-				boolean holdsLock = lock.tryLock();
-				if (holdsLock) {
-					lock.unlock();
-				} else {
-					concurrentAccess.set(true);
-				}
-			}
-		});
-
-		final long start = System.nanoTime();
-
-		testEndpoint.scheduleRunAsync(new Runnable() {
-			@Override
-			public void run() {
-				boolean holdsLock = lock.tryLock();
-				if (holdsLock) {
-					lock.unlock();
-				} else {
-					concurrentAccess.set(true);
-				}
-				latch.trigger();
-			}
-		}, delay, TimeUnit.MILLISECONDS);
-
-		latch.await();
-		final long stop = System.nanoTime();
-
-		// validate that no concurrent access happened
-		assertFalse("Rpc Endpoint had concurrent access", testEndpoint.hasConcurrentAccess());
-		assertFalse("Rpc Endpoint had concurrent access", concurrentAccess.get());
-
-		assertTrue("call was not properly delayed", ((stop - start) / 1000000) >= delay);
-	}
-
-	// ------------------------------------------------------------------------
-	//  test RPC endpoint
-	// ------------------------------------------------------------------------
-	
-	interface TestGateway extends RpcGateway {
-
-		void someCall();
-
-		void anotherCall();
-	}
-
-	@SuppressWarnings("unused")
-	public static class TestEndpoint extends RpcEndpoint<TestGateway> {
-
-		private final ReentrantLock lock;
-
-		private volatile boolean concurrentAccess;
-
-		public TestEndpoint(RpcService rpcService, ReentrantLock lock) {
-			super(rpcService);
-			this.lock = lock;
-		}
-
-		@RpcMethod
-		public void someCall() {
-			boolean holdsLock = lock.tryLock();
-			if (holdsLock) {
-				lock.unlock();
-			} else {
-				concurrentAccess = true;
-			}
-		}
-
-		@RpcMethod
-		public void anotherCall() {
-			boolean holdsLock = lock.tryLock();
-			if (holdsLock) {
-				lock.unlock();
-			} else {
-				concurrentAccess = true;
-			}
-		}
-
-		public boolean hasConcurrentAccess() {
-			return concurrentAccess;
-		}
-	}
-}