You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/09/28 08:21:04 UTC
[21/50] [abbrv] flink git commit: [FLINK-4528] [rpc] Marks main
thread execution methods in RpcEndpoint as protected
[FLINK-4528] [rpc] Marks main thread execution methods in RpcEndpoint as protected
Give main thread execution context into the TaskExecutorToResourceManagerConnection
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/156f0ff5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/156f0ff5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/156f0ff5
Branch: refs/heads/flip-6
Commit: 156f0ff5403b86be6dba63d38117952a1b08350f
Parents: 171cfd3
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Aug 29 15:49:59 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Sep 27 19:24:58 2016 +0200
----------------------------------------------------------------------
.../apache/flink/runtime/rpc/RpcEndpoint.java | 8 +-
.../runtime/taskexecutor/TaskExecutor.java | 7 +-
...TaskExecutorToResourceManagerConnection.java | 26 ++-
.../flink/runtime/rpc/AsyncCallsTest.java | 216 ++++++++++++++++++
.../flink/runtime/rpc/akka/AsyncCallsTest.java | 219 -------------------
5 files changed, 242 insertions(+), 234 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/156f0ff5/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
index 7b3f8a1..e9e2b2c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
@@ -161,7 +161,7 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
*
* @return Main thread execution context
*/
- public ExecutionContext getMainThreadExecutionContext() {
+ protected ExecutionContext getMainThreadExecutionContext() {
return mainThreadExecutionContext;
}
@@ -184,7 +184,7 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
*
* @param runnable Runnable to be executed in the main thread of the underlying RPC endpoint
*/
- public void runAsync(Runnable runnable) {
+ protected void runAsync(Runnable runnable) {
((MainThreadExecutor) self).runAsync(runnable);
}
@@ -195,7 +195,7 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
* @param runnable Runnable to be executed
* @param delay The delay after which the runnable will be executed
*/
- public void scheduleRunAsync(Runnable runnable, long delay, TimeUnit unit) {
+ protected void scheduleRunAsync(Runnable runnable, long delay, TimeUnit unit) {
((MainThreadExecutor) self).scheduleRunAsync(runnable, unit.toMillis(delay));
}
@@ -209,7 +209,7 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
* @param <V> Return type of the callable
* @return Future for the result of the callable.
*/
- public <V> Future<V> callAsync(Callable<V> callable, Timeout timeout) {
+ protected <V> Future<V> callAsync(Callable<V> callable, Timeout timeout) {
return ((MainThreadExecutor) self).callAsync(callable, timeout);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/156f0ff5/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 4871b96..735730b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -176,7 +176,12 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
if (newLeaderAddress != null) {
log.info("Attempting to register at ResourceManager {}", newLeaderAddress);
resourceManagerConnection =
- new TaskExecutorToResourceManagerConnection(log, this, newLeaderAddress, newLeaderId);
+ new TaskExecutorToResourceManagerConnection(
+ log,
+ this,
+ newLeaderAddress,
+ newLeaderId,
+ getMainThreadExecutionContext());
resourceManagerConnection.start();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/156f0ff5/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
index 25332a0..28062b6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.slf4j.Logger;
+import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
@@ -55,9 +56,12 @@ public class TaskExecutorToResourceManagerConnection {
private final String resourceManagerAddress;
+ /** Execution context to be used to execute the on complete action of the ResourceManagerRegistration */
+ private final ExecutionContext executionContext;
+
private TaskExecutorToResourceManagerConnection.ResourceManagerRegistration pendingRegistration;
- private ResourceManagerGateway registeredResourceManager;
+ private volatile ResourceManagerGateway registeredResourceManager;
private InstanceID registrationId;
@@ -66,15 +70,17 @@ public class TaskExecutorToResourceManagerConnection {
public TaskExecutorToResourceManagerConnection(
- Logger log,
- TaskExecutor taskExecutor,
- String resourceManagerAddress,
- UUID resourceManagerLeaderId) {
+ Logger log,
+ TaskExecutor taskExecutor,
+ String resourceManagerAddress,
+ UUID resourceManagerLeaderId,
+ ExecutionContext executionContext) {
this.log = checkNotNull(log);
this.taskExecutor = checkNotNull(taskExecutor);
this.resourceManagerAddress = checkNotNull(resourceManagerAddress);
this.resourceManagerLeaderId = checkNotNull(resourceManagerLeaderId);
+ this.executionContext = checkNotNull(executionContext);
}
// ------------------------------------------------------------------------
@@ -93,22 +99,22 @@ public class TaskExecutorToResourceManagerConnection {
pendingRegistration.startRegistration();
Future<Tuple2<ResourceManagerGateway, TaskExecutorRegistrationSuccess>> future = pendingRegistration.getFuture();
-
+
future.onSuccess(new OnSuccess<Tuple2<ResourceManagerGateway, TaskExecutorRegistrationSuccess>>() {
@Override
public void onSuccess(Tuple2<ResourceManagerGateway, TaskExecutorRegistrationSuccess> result) {
- registeredResourceManager = result.f0;
registrationId = result.f1.getRegistrationId();
+ registeredResourceManager = result.f0;
}
- }, taskExecutor.getMainThreadExecutionContext());
+ }, executionContext);
// this future should only ever fail if there is a bug, not if the registration is declined
future.onFailure(new OnFailure() {
@Override
public void onFailure(Throwable failure) {
- taskExecutor.onFatalError(failure);
+ taskExecutor.onFatalErrorAsync(failure);
}
- }, taskExecutor.getMainThreadExecutionContext());
+ }, executionContext);
}
public void close() {
http://git-wip-us.apache.org/repos/asf/flink/blob/156f0ff5/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
new file mode 100644
index 0000000..1791056
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import akka.actor.ActorSystem;
+import akka.util.Timeout;
+
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.akka.AkkaUtils;
+
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.util.TestLogger;
+import org.junit.AfterClass;
+import org.junit.Test;
+
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.junit.Assert.*;
+
+public class AsyncCallsTest extends TestLogger {
+
+ // ------------------------------------------------------------------------
+ // shared test members
+ // ------------------------------------------------------------------------
+
+ private static ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem();
+
+ private static AkkaRpcService akkaRpcService =
+ new AkkaRpcService(actorSystem, new Timeout(10000, TimeUnit.MILLISECONDS));
+
+ @AfterClass
+ public static void shutdown() {
+ akkaRpcService.stopService();
+ actorSystem.shutdown();
+ }
+
+
+ // ------------------------------------------------------------------------
+ // tests
+ // ------------------------------------------------------------------------
+
+ @Test
+ public void testScheduleWithNoDelay() throws Exception {
+
+ // to collect all the thread references
+ final ReentrantLock lock = new ReentrantLock();
+ final AtomicBoolean concurrentAccess = new AtomicBoolean(false);
+
+ TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService, lock);
+ testEndpoint.start();
+ TestGateway gateway = testEndpoint.getSelf();
+
+ // a bunch of gateway calls
+ gateway.someCall();
+ gateway.anotherCall();
+ gateway.someCall();
+
+ // run something asynchronously
+ for (int i = 0; i < 10000; i++) {
+ testEndpoint.runAsync(new Runnable() {
+ @Override
+ public void run() {
+ boolean holdsLock = lock.tryLock();
+ if (holdsLock) {
+ lock.unlock();
+ } else {
+ concurrentAccess.set(true);
+ }
+ }
+ });
+ }
+
+ Future<String> result = testEndpoint.callAsync(new Callable<String>() {
+ @Override
+ public String call() throws Exception {
+ boolean holdsLock = lock.tryLock();
+ if (holdsLock) {
+ lock.unlock();
+ } else {
+ concurrentAccess.set(true);
+ }
+ return "test";
+ }
+ }, new Timeout(30, TimeUnit.SECONDS));
+ String str = Await.result(result, new FiniteDuration(30, TimeUnit.SECONDS));
+ assertEquals("test", str);
+
+ // validate that no concurrent access happened
+ assertFalse("Rpc Endpoint had concurrent access", testEndpoint.hasConcurrentAccess());
+ assertFalse("Rpc Endpoint had concurrent access", concurrentAccess.get());
+
+ akkaRpcService.stopServer(testEndpoint.getSelf());
+ }
+
+ @Test
+ public void testScheduleWithDelay() throws Exception {
+
+ // to collect all the thread references
+ final ReentrantLock lock = new ReentrantLock();
+ final AtomicBoolean concurrentAccess = new AtomicBoolean(false);
+ final OneShotLatch latch = new OneShotLatch();
+
+ final long delay = 200;
+
+ TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService, lock);
+ testEndpoint.start();
+
+ // run something asynchronously
+ testEndpoint.runAsync(new Runnable() {
+ @Override
+ public void run() {
+ boolean holdsLock = lock.tryLock();
+ if (holdsLock) {
+ lock.unlock();
+ } else {
+ concurrentAccess.set(true);
+ }
+ }
+ });
+
+ final long start = System.nanoTime();
+
+ testEndpoint.scheduleRunAsync(new Runnable() {
+ @Override
+ public void run() {
+ boolean holdsLock = lock.tryLock();
+ if (holdsLock) {
+ lock.unlock();
+ } else {
+ concurrentAccess.set(true);
+ }
+ latch.trigger();
+ }
+ }, delay, TimeUnit.MILLISECONDS);
+
+ latch.await();
+ final long stop = System.nanoTime();
+
+ // validate that no concurrent access happened
+ assertFalse("Rpc Endpoint had concurrent access", testEndpoint.hasConcurrentAccess());
+ assertFalse("Rpc Endpoint had concurrent access", concurrentAccess.get());
+
+ assertTrue("call was not properly delayed", ((stop - start) / 1000000) >= delay);
+ }
+
+ // ------------------------------------------------------------------------
+ // test RPC endpoint
+ // ------------------------------------------------------------------------
+
+ public interface TestGateway extends RpcGateway {
+
+ void someCall();
+
+ void anotherCall();
+ }
+
+ @SuppressWarnings("unused")
+ public static class TestEndpoint extends RpcEndpoint<TestGateway> {
+
+ private final ReentrantLock lock;
+
+ private volatile boolean concurrentAccess;
+
+ public TestEndpoint(RpcService rpcService, ReentrantLock lock) {
+ super(rpcService);
+ this.lock = lock;
+ }
+
+ @RpcMethod
+ public void someCall() {
+ boolean holdsLock = lock.tryLock();
+ if (holdsLock) {
+ lock.unlock();
+ } else {
+ concurrentAccess = true;
+ }
+ }
+
+ @RpcMethod
+ public void anotherCall() {
+ boolean holdsLock = lock.tryLock();
+ if (holdsLock) {
+ lock.unlock();
+ } else {
+ concurrentAccess = true;
+ }
+ }
+
+ public boolean hasConcurrentAccess() {
+ return concurrentAccess;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/156f0ff5/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AsyncCallsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AsyncCallsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AsyncCallsTest.java
deleted file mode 100644
index d33987c..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AsyncCallsTest.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.akka;
-
-import akka.actor.ActorSystem;
-import akka.util.Timeout;
-
-import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.rpc.RpcEndpoint;
-import org.apache.flink.runtime.rpc.RpcGateway;
-import org.apache.flink.runtime.rpc.RpcMethod;
-import org.apache.flink.runtime.rpc.RpcService;
-
-import org.apache.flink.util.TestLogger;
-import org.junit.AfterClass;
-import org.junit.Test;
-
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReentrantLock;
-
-import static org.junit.Assert.*;
-
-public class AsyncCallsTest extends TestLogger {
-
- // ------------------------------------------------------------------------
- // shared test members
- // ------------------------------------------------------------------------
-
- private static ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem();
-
- private static AkkaRpcService akkaRpcService =
- new AkkaRpcService(actorSystem, new Timeout(10000, TimeUnit.MILLISECONDS));
-
- @AfterClass
- public static void shutdown() {
- akkaRpcService.stopService();
- actorSystem.shutdown();
- }
-
-
- // ------------------------------------------------------------------------
- // tests
- // ------------------------------------------------------------------------
-
- @Test
- public void testScheduleWithNoDelay() throws Exception {
-
- // to collect all the thread references
- final ReentrantLock lock = new ReentrantLock();
- final AtomicBoolean concurrentAccess = new AtomicBoolean(false);
-
- TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService, lock);
- testEndpoint.start();
- TestGateway gateway = testEndpoint.getSelf();
-
- // a bunch of gateway calls
- gateway.someCall();
- gateway.anotherCall();
- gateway.someCall();
-
- // run something asynchronously
- for (int i = 0; i < 10000; i++) {
- testEndpoint.runAsync(new Runnable() {
- @Override
- public void run() {
- boolean holdsLock = lock.tryLock();
- if (holdsLock) {
- lock.unlock();
- } else {
- concurrentAccess.set(true);
- }
- }
- });
- }
-
- Future<String> result = testEndpoint.callAsync(new Callable<String>() {
- @Override
- public String call() throws Exception {
- boolean holdsLock = lock.tryLock();
- if (holdsLock) {
- lock.unlock();
- } else {
- concurrentAccess.set(true);
- }
- return "test";
- }
- }, new Timeout(30, TimeUnit.SECONDS));
- String str = Await.result(result, new FiniteDuration(30, TimeUnit.SECONDS));
- assertEquals("test", str);
-
- // validate that no concurrent access happened
- assertFalse("Rpc Endpoint had concurrent access", testEndpoint.hasConcurrentAccess());
- assertFalse("Rpc Endpoint had concurrent access", concurrentAccess.get());
-
- akkaRpcService.stopServer(testEndpoint.getSelf());
- }
-
- @Test
- public void testScheduleWithDelay() throws Exception {
-
- // to collect all the thread references
- final ReentrantLock lock = new ReentrantLock();
- final AtomicBoolean concurrentAccess = new AtomicBoolean(false);
- final OneShotLatch latch = new OneShotLatch();
-
- final long delay = 200;
-
- TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService, lock);
- testEndpoint.start();
-
- // run something asynchronously
- testEndpoint.runAsync(new Runnable() {
- @Override
- public void run() {
- boolean holdsLock = lock.tryLock();
- if (holdsLock) {
- lock.unlock();
- } else {
- concurrentAccess.set(true);
- }
- }
- });
-
- final long start = System.nanoTime();
-
- testEndpoint.scheduleRunAsync(new Runnable() {
- @Override
- public void run() {
- boolean holdsLock = lock.tryLock();
- if (holdsLock) {
- lock.unlock();
- } else {
- concurrentAccess.set(true);
- }
- latch.trigger();
- }
- }, delay, TimeUnit.MILLISECONDS);
-
- latch.await();
- final long stop = System.nanoTime();
-
- // validate that no concurrent access happened
- assertFalse("Rpc Endpoint had concurrent access", testEndpoint.hasConcurrentAccess());
- assertFalse("Rpc Endpoint had concurrent access", concurrentAccess.get());
-
- assertTrue("call was not properly delayed", ((stop - start) / 1000000) >= delay);
- }
-
- // ------------------------------------------------------------------------
- // test RPC endpoint
- // ------------------------------------------------------------------------
-
- interface TestGateway extends RpcGateway {
-
- void someCall();
-
- void anotherCall();
- }
-
- @SuppressWarnings("unused")
- public static class TestEndpoint extends RpcEndpoint<TestGateway> {
-
- private final ReentrantLock lock;
-
- private volatile boolean concurrentAccess;
-
- public TestEndpoint(RpcService rpcService, ReentrantLock lock) {
- super(rpcService);
- this.lock = lock;
- }
-
- @RpcMethod
- public void someCall() {
- boolean holdsLock = lock.tryLock();
- if (holdsLock) {
- lock.unlock();
- } else {
- concurrentAccess = true;
- }
- }
-
- @RpcMethod
- public void anotherCall() {
- boolean holdsLock = lock.tryLock();
- if (holdsLock) {
- lock.unlock();
- } else {
- concurrentAccess = true;
- }
- }
-
- public boolean hasConcurrentAccess() {
- return concurrentAccess;
- }
- }
-}