You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/09/21 09:52:55 UTC

[22/50] [abbrv] flink git commit: [FLINK-4386] [rpc] Add a utility to verify calls happen in the Rpc Endpoint's main thread

[FLINK-4386] [rpc] Add a utility to verify calls happen in the Rpc Endpoint's main thread


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4ca049b5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4ca049b5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4ca049b5

Branch: refs/heads/flip-6
Commit: 4ca049b5c5d7c543a2e0ec40534cd08a13cd113a
Parents: 86f21bf
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Aug 11 20:30:54 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Sep 21 11:39:13 2016 +0200

----------------------------------------------------------------------
 .../flink/runtime/rpc/MainThreadExecutor.java   |  2 +-
 .../runtime/rpc/MainThreadValidatorUtil.java    | 47 ++++++++++
 .../apache/flink/runtime/rpc/RpcEndpoint.java   | 38 +++++++-
 .../flink/runtime/rpc/akka/AkkaRpcActor.java    | 37 +++++---
 .../flink/runtime/rpc/akka/AkkaRpcService.java  |  2 +-
 .../rpc/akka/MainThreadValidationTest.java      | 97 ++++++++++++++++++++
 6 files changed, 205 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4ca049b5/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java
index 4efb382..5e4fead 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java
@@ -30,7 +30,7 @@ import java.util.concurrent.TimeoutException;
  *
  * <p>This interface is intended to be implemented by the self gateway in a {@link RpcEndpoint}
  * implementation which allows to dispatch local procedures to the main thread of the underlying
- * rpc server.
+ * RPC endpoint.
  */
 public interface MainThreadExecutor {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4ca049b5/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadValidatorUtil.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadValidatorUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadValidatorUtil.java
new file mode 100644
index 0000000..b3fea77
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadValidatorUtil.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This utility exists to bridge between the visibility of the
+ * {@code currentMainThread} field in the {@link RpcEndpoint}.
+ * 
+ * The {@code currentMainThread} can be hidden from {@code RpcEndpoint} implementations
+ * and only be accessed via this utility from other packages.
+ */
+public final class MainThreadValidatorUtil {
+
+	private final RpcEndpoint<?> endpoint;
+
+	public MainThreadValidatorUtil(RpcEndpoint<?> endpoint) {
+		this.endpoint = checkNotNull(endpoint);
+	}
+
+	public void enterMainThread() {
+		assert(endpoint.currentMainThread.compareAndSet(null, Thread.currentThread())) : 
+				"The RpcEndpoint has concurrent access from " + endpoint.currentMainThread.get();
+	}
+	
+	public void exitMainThread() {
+		assert(endpoint.currentMainThread.compareAndSet(Thread.currentThread(), null)) :
+				"The RpcEndpoint has concurrent access from " + endpoint.currentMainThread.get();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4ca049b5/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
index 44933d5..d36a283 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
@@ -29,6 +29,7 @@ import scala.concurrent.Future;
 
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -75,6 +76,9 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
 	 * of the executing rpc server. */
 	private final MainThreadExecutionContext mainThreadExecutionContext;
 
+	/** A reference to the endpoint's main thread, if the current method is called by the main thread */
+	final AtomicReference<Thread> currentMainThread = new AtomicReference<>(null); 
+
 	/**
 	 * Initializes the RPC endpoint.
 	 * 
@@ -92,6 +96,15 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
 		this.mainThreadExecutionContext = new MainThreadExecutionContext((MainThreadExecutor) self);
 	}
 
+	/**
+	 * Returns the class of the self gateway type.
+	 *
+	 * @return Class of the self gateway type
+	 */
+	public final Class<C> getSelfGatewayType() {
+		return selfGatewayType;
+	}
+	
 	// ------------------------------------------------------------------------
 	//  Shutdown
 	// ------------------------------------------------------------------------
@@ -193,13 +206,28 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
 		return ((MainThreadExecutor) self).callAsync(callable, timeout);
 	}
 
+	// ------------------------------------------------------------------------
+	//  Main Thread Validation
+	// ------------------------------------------------------------------------
+
 	/**
-	 * Returns the class of the self gateway type.
-	 *
-	 * @return Class of the self gateway type
+	 * Validates that the method call happens in the RPC endpoint's main thread.
+	 * 
+	 * <p><b>IMPORTANT:</b> This check only happens when assertions are enabled,
+	 * such as when running tests.
+	 * 
+	 * <p>This can be used for additional checks, like
+	 * <pre>{@code
+	 * protected void concurrencyCriticalMethod() {
+	 *     validateRunsInMainThread();
+	 *     
+	 *     // some critical stuff
+	 * }
+	 * }</pre>
 	 */
-	public final Class<C> getSelfGatewayType() {
-		return selfGatewayType;
+	public void validateRunsInMainThread() {
+		// because the initialization is lazy, it can be that certain methods are
+		assert currentMainThread.get() == Thread.currentThread();
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/4ca049b5/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
index 18ccf1b..5e0a7da 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
@@ -22,14 +22,16 @@ import akka.actor.ActorRef;
 import akka.actor.Status;
 import akka.actor.UntypedActor;
 import akka.pattern.Patterns;
+import org.apache.flink.runtime.rpc.MainThreadValidatorUtil;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.akka.messages.CallAsync;
 import org.apache.flink.runtime.rpc.akka.messages.RpcInvocation;
 import org.apache.flink.runtime.rpc.akka.messages.RunAsync;
-import org.apache.flink.util.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -37,6 +39,8 @@ import java.lang.reflect.Method;
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * Akka rpc actor which receives {@link RpcInvocation}, {@link RunAsync} and {@link CallAsync}
  * messages.
@@ -51,24 +55,35 @@ import java.util.concurrent.TimeUnit;
  * @param <T> Type of the {@link RpcEndpoint}
  */
 class AkkaRpcActor<C extends RpcGateway, T extends RpcEndpoint<C>> extends UntypedActor {
+	
 	private static final Logger LOG = LoggerFactory.getLogger(AkkaRpcActor.class);
 
+	/** the endpoint to invoke the methods on */
 	private final T rpcEndpoint;
 
+	/** the helper that tracks whether calls come from the main thread */
+	private final MainThreadValidatorUtil mainThreadValidator;
+
 	AkkaRpcActor(final T rpcEndpoint) {
-		this.rpcEndpoint = Preconditions.checkNotNull(rpcEndpoint, "rpc endpoint");
+		this.rpcEndpoint = checkNotNull(rpcEndpoint, "rpc endpoint");
+		this.mainThreadValidator = new MainThreadValidatorUtil(rpcEndpoint);
 	}
 
 	@Override
-	public void onReceive(final Object message)  {
-		if (message instanceof RunAsync) {
-			handleRunAsync((RunAsync) message);
-		} else if (message instanceof CallAsync) {
-			handleCallAsync((CallAsync) message);
-		} else if (message instanceof RpcInvocation) {
-			handleRpcInvocation((RpcInvocation) message);
-		} else {
-			LOG.warn("Received message of unknown type {}. Dropping this message!", message.getClass());
+	public void onReceive(final Object message) {
+		mainThreadValidator.enterMainThread();
+		try {
+			if (message instanceof RunAsync) {
+				handleRunAsync((RunAsync) message);
+			} else if (message instanceof CallAsync) {
+				handleCallAsync((CallAsync) message);
+			} else if (message instanceof RpcInvocation) {
+				handleRpcInvocation((RpcInvocation) message);
+			} else {
+				LOG.warn("Received message of unknown type {}. Dropping this message!", message.getClass());
+			}
+		} finally {
+			mainThreadValidator.exitMainThread();
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4ca049b5/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index 448216c..db40f10 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -174,7 +174,7 @@ public class AkkaRpcService implements RpcService {
 	}
 
 	@Override
-	public <C extends RpcGateway> String getAddress(C selfGateway) {
+	public String getAddress(RpcGateway selfGateway) {
 		checkState(!stopped, "RpcService is stopped");
 
 		if (selfGateway instanceof AkkaGateway) {

http://git-wip-us.apache.org/repos/asf/flink/blob/4ca049b5/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
new file mode 100644
index 0000000..b854143
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka;
+
+import akka.util.Timeout;
+
+import org.apache.flink.runtime.akka.AkkaUtils;
+
+import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcMethod;
+import org.apache.flink.runtime.rpc.RpcService;
+
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertTrue;
+
+public class MainThreadValidationTest {
+
+	@Test
+	public void failIfNotInMainThread() {
+		// test if assertions are activated. The test only works if assertions are loaded.
+		try {
+			assert false;
+			// apparently they are not activated
+			return;
+		} catch (AssertionError ignored) {}
+
+		// actual test
+		AkkaRpcService akkaRpcService = new AkkaRpcService(
+				AkkaUtils.createDefaultActorSystem(),
+				new Timeout(10000, TimeUnit.MILLISECONDS));
+
+		try {
+			TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService);
+
+			// this works, because it is executed as an RPC call
+			testEndpoint.getSelf().someConcurrencyCriticalFunction();
+
+			// this fails, because it is executed directly
+			boolean exceptionThrown;
+			try {
+				testEndpoint.someConcurrencyCriticalFunction();
+				exceptionThrown = false;
+			}
+			catch (AssertionError e) {
+				exceptionThrown = true;
+			}
+			assertTrue("should fail with an assertion error", exceptionThrown);
+
+			akkaRpcService.stopServer(testEndpoint.getSelf());
+		}
+		finally {
+			akkaRpcService.stopService();
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  test RPC endpoint
+	// ------------------------------------------------------------------------
+
+	interface TestGateway extends RpcGateway {
+
+		void someConcurrencyCriticalFunction();
+	}
+
+	@SuppressWarnings("unused")
+	public static class TestEndpoint extends RpcEndpoint<TestGateway> {
+
+		public TestEndpoint(RpcService rpcService) {
+			super(rpcService);
+		}
+
+		@RpcMethod
+		public void someConcurrencyCriticalFunction() {
+			validateRunsInMainThread();
+		}
+	}
+}