You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/09/28 08:20:47 UTC
[04/50] [abbrv] flink git commit: [FLINK-4386] [rpc] Add a utility to
verify calls happen in the Rpc Endpoint's main thread
[FLINK-4386] [rpc] Add a utility to verify calls happen in the Rpc Endpoint's main thread
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f0d5b7b8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f0d5b7b8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f0d5b7b8
Branch: refs/heads/flip-6
Commit: f0d5b7b8f45208ebfdb659f83cbe00fb4c91fee0
Parents: d881403
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Aug 11 20:30:54 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Sep 27 19:24:55 2016 +0200
----------------------------------------------------------------------
.../flink/runtime/rpc/MainThreadExecutor.java | 2 +-
.../runtime/rpc/MainThreadValidatorUtil.java | 47 ++++++++++
.../apache/flink/runtime/rpc/RpcEndpoint.java | 38 +++++++-
.../flink/runtime/rpc/akka/AkkaRpcActor.java | 37 +++++---
.../flink/runtime/rpc/akka/AkkaRpcService.java | 2 +-
.../rpc/akka/MainThreadValidationTest.java | 97 ++++++++++++++++++++
6 files changed, 205 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f0d5b7b8/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java
index 4efb382..5e4fead 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java
@@ -30,7 +30,7 @@ import java.util.concurrent.TimeoutException;
*
* <p>This interface is intended to be implemented by the self gateway in a {@link RpcEndpoint}
* implementation which allows to dispatch local procedures to the main thread of the underlying
- * rpc server.
+ * RPC endpoint.
*/
public interface MainThreadExecutor {
http://git-wip-us.apache.org/repos/asf/flink/blob/f0d5b7b8/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadValidatorUtil.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadValidatorUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadValidatorUtil.java
new file mode 100644
index 0000000..b3fea77
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadValidatorUtil.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This utility exists to bridge between the visibility of the
+ * {@code currentMainThread} field in the {@link RpcEndpoint}.
+ *
+ * The {@code currentMainThread} can be hidden from {@code RpcEndpoint} implementations
+ * and only be accessed via this utility from other packages.
+ */
+public final class MainThreadValidatorUtil {
+
+ private final RpcEndpoint<?> endpoint;
+
+ public MainThreadValidatorUtil(RpcEndpoint<?> endpoint) {
+ this.endpoint = checkNotNull(endpoint);
+ }
+
+ public void enterMainThread() {
+ assert(endpoint.currentMainThread.compareAndSet(null, Thread.currentThread())) :
+ "The RpcEndpoint has concurrent access from " + endpoint.currentMainThread.get();
+ }
+
+ public void exitMainThread() {
+ assert(endpoint.currentMainThread.compareAndSet(Thread.currentThread(), null)) :
+ "The RpcEndpoint has concurrent access from " + endpoint.currentMainThread.get();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f0d5b7b8/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
index 44933d5..d36a283 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
@@ -29,6 +29,7 @@ import scala.concurrent.Future;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -75,6 +76,9 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
* of the executing rpc server. */
private final MainThreadExecutionContext mainThreadExecutionContext;
+ /** A reference to the endpoint's main thread, if the current method is called by the main thread */
+ final AtomicReference<Thread> currentMainThread = new AtomicReference<>(null);
+
/**
* Initializes the RPC endpoint.
*
@@ -92,6 +96,15 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
this.mainThreadExecutionContext = new MainThreadExecutionContext((MainThreadExecutor) self);
}
+ /**
+ * Returns the class of the self gateway type.
+ *
+ * @return Class of the self gateway type
+ */
+ public final Class<C> getSelfGatewayType() {
+ return selfGatewayType;
+ }
+
// ------------------------------------------------------------------------
// Shutdown
// ------------------------------------------------------------------------
@@ -193,13 +206,28 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
return ((MainThreadExecutor) self).callAsync(callable, timeout);
}
+ // ------------------------------------------------------------------------
+ // Main Thread Validation
+ // ------------------------------------------------------------------------
+
/**
- * Returns the class of the self gateway type.
- *
- * @return Class of the self gateway type
+ * Validates that the method call happens in the RPC endpoint's main thread.
+ *
+ * <p><b>IMPORTANT:</b> This check only happens when assertions are enabled,
+ * such as when running tests.
+ *
+ * <p>This can be used for additional checks, like
+ * <pre>{@code
+ * protected void concurrencyCriticalMethod() {
+ * validateRunsInMainThread();
+ *
+ * // some critical stuff
+ * }
+ * }</pre>
*/
- public final Class<C> getSelfGatewayType() {
- return selfGatewayType;
+ public void validateRunsInMainThread() {
+ // because the initialization is lazy, it can be that certain methods are
+ assert currentMainThread.get() == Thread.currentThread();
}
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f0d5b7b8/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
index 18ccf1b..5e0a7da 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
@@ -22,14 +22,16 @@ import akka.actor.ActorRef;
import akka.actor.Status;
import akka.actor.UntypedActor;
import akka.pattern.Patterns;
+import org.apache.flink.runtime.rpc.MainThreadValidatorUtil;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.akka.messages.CallAsync;
import org.apache.flink.runtime.rpc.akka.messages.RpcInvocation;
import org.apache.flink.runtime.rpc.akka.messages.RunAsync;
-import org.apache.flink.util.Preconditions;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
@@ -37,6 +39,8 @@ import java.lang.reflect.Method;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
* Akka rpc actor which receives {@link RpcInvocation}, {@link RunAsync} and {@link CallAsync}
* messages.
@@ -51,24 +55,35 @@ import java.util.concurrent.TimeUnit;
* @param <T> Type of the {@link RpcEndpoint}
*/
class AkkaRpcActor<C extends RpcGateway, T extends RpcEndpoint<C>> extends UntypedActor {
+
private static final Logger LOG = LoggerFactory.getLogger(AkkaRpcActor.class);
+ /** the endpoint to invoke the methods on */
private final T rpcEndpoint;
+ /** the helper that tracks whether calls come from the main thread */
+ private final MainThreadValidatorUtil mainThreadValidator;
+
AkkaRpcActor(final T rpcEndpoint) {
- this.rpcEndpoint = Preconditions.checkNotNull(rpcEndpoint, "rpc endpoint");
+ this.rpcEndpoint = checkNotNull(rpcEndpoint, "rpc endpoint");
+ this.mainThreadValidator = new MainThreadValidatorUtil(rpcEndpoint);
}
@Override
- public void onReceive(final Object message) {
- if (message instanceof RunAsync) {
- handleRunAsync((RunAsync) message);
- } else if (message instanceof CallAsync) {
- handleCallAsync((CallAsync) message);
- } else if (message instanceof RpcInvocation) {
- handleRpcInvocation((RpcInvocation) message);
- } else {
- LOG.warn("Received message of unknown type {}. Dropping this message!", message.getClass());
+ public void onReceive(final Object message) {
+ mainThreadValidator.enterMainThread();
+ try {
+ if (message instanceof RunAsync) {
+ handleRunAsync((RunAsync) message);
+ } else if (message instanceof CallAsync) {
+ handleCallAsync((CallAsync) message);
+ } else if (message instanceof RpcInvocation) {
+ handleRpcInvocation((RpcInvocation) message);
+ } else {
+ LOG.warn("Received message of unknown type {}. Dropping this message!", message.getClass());
+ }
+ } finally {
+ mainThreadValidator.exitMainThread();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f0d5b7b8/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index 448216c..db40f10 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -174,7 +174,7 @@ public class AkkaRpcService implements RpcService {
}
@Override
- public <C extends RpcGateway> String getAddress(C selfGateway) {
+ public String getAddress(RpcGateway selfGateway) {
checkState(!stopped, "RpcService is stopped");
if (selfGateway instanceof AkkaGateway) {
http://git-wip-us.apache.org/repos/asf/flink/blob/f0d5b7b8/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
new file mode 100644
index 0000000..b854143
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka;
+
+import akka.util.Timeout;
+
+import org.apache.flink.runtime.akka.AkkaUtils;
+
+import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcMethod;
+import org.apache.flink.runtime.rpc.RpcService;
+
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertTrue;
+
+public class MainThreadValidationTest {
+
+ @Test
+ public void failIfNotInMainThread() {
+ // test if assertions are activated. The test only works if assertions are loaded.
+ try {
+ assert false;
+ // apparently they are not activated
+ return;
+ } catch (AssertionError ignored) {}
+
+ // actual test
+ AkkaRpcService akkaRpcService = new AkkaRpcService(
+ AkkaUtils.createDefaultActorSystem(),
+ new Timeout(10000, TimeUnit.MILLISECONDS));
+
+ try {
+ TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService);
+
+ // this works, because it is executed as an RPC call
+ testEndpoint.getSelf().someConcurrencyCriticalFunction();
+
+ // this fails, because it is executed directly
+ boolean exceptionThrown;
+ try {
+ testEndpoint.someConcurrencyCriticalFunction();
+ exceptionThrown = false;
+ }
+ catch (AssertionError e) {
+ exceptionThrown = true;
+ }
+ assertTrue("should fail with an assertion error", exceptionThrown);
+
+ akkaRpcService.stopServer(testEndpoint.getSelf());
+ }
+ finally {
+ akkaRpcService.stopService();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // test RPC endpoint
+ // ------------------------------------------------------------------------
+
+ interface TestGateway extends RpcGateway {
+
+ void someConcurrencyCriticalFunction();
+ }
+
+ @SuppressWarnings("unused")
+ public static class TestEndpoint extends RpcEndpoint<TestGateway> {
+
+ public TestEndpoint(RpcService rpcService) {
+ super(rpcService);
+ }
+
+ @RpcMethod
+ public void someConcurrencyCriticalFunction() {
+ validateRunsInMainThread();
+ }
+ }
+}