You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/02/18 08:04:03 UTC

[flink] 04/05: [FLINK-15966][runtime] Capture callstacks for RPC ask() calls to improve exceptions.

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2746e6a4f3f84e05425484ad63ea096c5536c77f
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Wed Feb 5 20:09:12 2020 +0100

    [FLINK-15966][runtime] Capture callstacks for RPC ask() calls to improve exceptions.
---
 docs/_includes/generated/akka_configuration.html   |   6 +
 .../apache/flink/configuration/AkkaOptions.java    |  12 ++
 .../runtime/rpc/akka/AkkaInvocationHandler.java    |  72 ++++++++---
 .../flink/runtime/rpc/akka/AkkaRpcService.java     |  19 ++-
 .../rpc/akka/AkkaRpcServiceConfiguration.java      |  19 ++-
 .../rpc/akka/FencedAkkaInvocationHandler.java      |   5 +-
 .../ResourceManagerTaskExecutorTest.java           |   7 +-
 .../runtime/rpc/akka/TimeoutCallStackTest.java     | 135 +++++++++++++++++++++
 8 files changed, 246 insertions(+), 29 deletions(-)

diff --git a/docs/_includes/generated/akka_configuration.html b/docs/_includes/generated/akka_configuration.html
index c77a393..2baeb6f 100644
--- a/docs/_includes/generated/akka_configuration.html
+++ b/docs/_includes/generated/akka_configuration.html
@@ -9,6 +9,12 @@
     </thead>
     <tbody>
         <tr>
+            <td><h5>akka.ask.callstack</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Boolean</td>
+            <td>If true, call stack for asynchronous asks are captured. That way, when an ask fails (for example times out), you get a proper exception, describing to the original method call and call site. Note that in case of having millions of concurrent RPC calls, this may add to the memory footprint.</td>
+        </tr>
+        <tr>
             <td><h5>akka.ask.timeout</h5></td>
             <td style="word-wrap: break-word;">"10 s"</td>
             <td>String</td>
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
index e71f18f..bdecb2e 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
@@ -30,6 +30,18 @@ import static org.apache.flink.configuration.description.LinkElement.link;
 public class AkkaOptions {
 
 	/**
+	 * Flag whether to capture call stacks for RPC ask calls.
+	 */
+	public static final ConfigOption<Boolean> CAPTURE_ASK_CALLSTACK = ConfigOptions
+		.key("akka.ask.callstack")
+		.booleanType()
+		.defaultValue(true)
+		.withDescription("If true, call stack for asynchronous asks are captured. That way, when an ask fails " +
+			"(for example times out), you get a proper exception, describing to the original method call and " +
+			"call site. Note that in case of having millions of concurrent RPC calls, this may add to the " +
+			"memory footprint.");
+
+	/**
 	 * Timeout for akka ask calls.
 	 */
 	public static final ConfigOption<String> ASK_TIMEOUT = ConfigOptions
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
index 44bfb3b..3b2a2cb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
@@ -47,6 +47,7 @@ import java.io.IOException;
 import java.lang.annotation.Annotation;
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Method;
+import java.util.Arrays;
 import java.util.Objects;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
@@ -90,13 +91,16 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, Rpc
 	@Nullable
 	private final CompletableFuture<Void> terminationFuture;
 
+	private final boolean captureAskCallStack;
+
 	AkkaInvocationHandler(
-			String address,
-			String hostname,
-			ActorRef rpcEndpoint,
-			Time timeout,
-			long maximumFramesize,
-			@Nullable CompletableFuture<Void> terminationFuture) {
+		String address,
+		String hostname,
+		ActorRef rpcEndpoint,
+		Time timeout,
+		long maximumFramesize,
+		@Nullable CompletableFuture<Void> terminationFuture,
+		boolean captureAskCallStack) {
 
 		this.address = Preconditions.checkNotNull(address);
 		this.hostname = Preconditions.checkNotNull(hostname);
@@ -105,6 +109,7 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, Rpc
 		this.timeout = Preconditions.checkNotNull(timeout);
 		this.maximumFramesize = maximumFramesize;
 		this.terminationFuture = terminationFuture;
+		this.captureAskCallStack = captureAskCallStack;
 	}
 
 	@Override
@@ -208,20 +213,20 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, Rpc
 
 			result = null;
 		} else {
+			// Capture the call stack. It is significantly faster to do that via an exception than
+			// via Thread.getStackTrace(), because exceptions lazily initialize the stack trace, initially only
+			// capture a lightweight native pointer, and convert that into the stack trace lazily when needed.
+			final Throwable callStackCapture = captureAskCallStack ? new Throwable() : null;
+
 			// execute an asynchronous call
-			CompletableFuture<?> resultFuture = ask(rpcInvocation, futureTimeout);
-
-			CompletableFuture<?> completableFuture = resultFuture.thenApply((Object o) -> {
-				if (o instanceof SerializedValue) {
-					try {
-						return  ((SerializedValue<?>) o).deserializeValue(getClass().getClassLoader());
-					} catch (IOException | ClassNotFoundException e) {
-						throw new CompletionException(
-							new RpcException("Could not deserialize the serialized payload of RPC method : "
-								+ methodName, e));
-					}
+			final CompletableFuture<?> resultFuture = ask(rpcInvocation, futureTimeout);
+
+			final CompletableFuture<Object> completableFuture = new CompletableFuture<>();
+			resultFuture.whenComplete((resultValue, failure) -> {
+				if (failure != null) {
+					completableFuture.completeExceptionally(resolveTimeoutException(failure, callStackCapture, method));
 				} else {
-					return o;
+					completableFuture.complete(deserializeValueIfNeeded(resultValue, method));
 				}
 			});
 
@@ -370,4 +375,35 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, Rpc
 	public CompletableFuture<Void> getTerminationFuture() {
 		return terminationFuture;
 	}
+
+	static Object deserializeValueIfNeeded(Object o, Method method) {
+		if (o instanceof SerializedValue) {
+			try {
+				return  ((SerializedValue<?>) o).deserializeValue(AkkaInvocationHandler.class.getClassLoader());
+			} catch (IOException | ClassNotFoundException e) {
+				throw new CompletionException(
+					new RpcException(
+						"Could not deserialize the serialized payload of RPC method : " + method.getName(), e));
+			}
+		} else {
+			return o;
+		}
+	}
+
+	static Throwable resolveTimeoutException(Throwable exception, @Nullable Throwable callStackCapture, Method method) {
+		if (!(exception instanceof akka.pattern.AskTimeoutException)) {
+			return exception;
+		}
+
+		final TimeoutException newException = new TimeoutException("Invocation of " + method + " timed out.");
+		newException.initCause(exception);
+
+		if (callStackCapture != null) {
+			// remove the stack frames coming from the proxy interface invocation
+			final StackTraceElement[] stackTrace = callStackCapture.getStackTrace();
+			newException.setStackTrace(Arrays.copyOfRange(stackTrace, 3, stackTrace.length));
+		}
+
+		return newException;
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index 344f96b..0c41f05 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -98,6 +98,8 @@ public class AkkaRpcService implements RpcService {
 	private final String address;
 	private final int port;
 
+	private final boolean captureAskCallstacks;
+
 	private final ScheduledExecutor internalScheduledExecutor;
 
 	private final CompletableFuture<Void> terminationFuture;
@@ -122,6 +124,8 @@ public class AkkaRpcService implements RpcService {
 			port = -1;
 		}
 
+		captureAskCallstacks = configuration.captureAskCallStack();
+
 		internalScheduledExecutor = new ActorSystemScheduledExecutorAdapter(actorSystem);
 
 		terminationFuture = new CompletableFuture<>();
@@ -165,7 +169,8 @@ public class AkkaRpcService implements RpcService {
 					actorRef,
 					configuration.getTimeout(),
 					configuration.getMaximumFramesize(),
-					null);
+					null,
+					captureAskCallstacks);
 			});
 	}
 
@@ -185,7 +190,8 @@ public class AkkaRpcService implements RpcService {
 					configuration.getTimeout(),
 					configuration.getMaximumFramesize(),
 					null,
-					() -> fencingToken);
+					() -> fencingToken,
+					captureAskCallstacks);
 			});
 	}
 
@@ -247,7 +253,8 @@ public class AkkaRpcService implements RpcService {
 				configuration.getTimeout(),
 				configuration.getMaximumFramesize(),
 				terminationFuture,
-				((FencedRpcEndpoint<?>) rpcEndpoint)::getFencingToken);
+				((FencedRpcEndpoint<?>) rpcEndpoint)::getFencingToken,
+				captureAskCallstacks);
 
 			implementedRpcGateways.add(FencedMainThreadExecutable.class);
 		} else {
@@ -257,7 +264,8 @@ public class AkkaRpcService implements RpcService {
 				actorRef,
 				configuration.getTimeout(),
 				configuration.getMaximumFramesize(),
-				terminationFuture);
+				terminationFuture,
+				captureAskCallstacks);
 		}
 
 		// Rather than using the System ClassLoader directly, we derive the ClassLoader
@@ -285,7 +293,8 @@ public class AkkaRpcService implements RpcService {
 				configuration.getTimeout(),
 				configuration.getMaximumFramesize(),
 				null,
-				() -> fencingToken);
+				() -> fencingToken,
+				captureAskCallstacks);
 
 			// Rather than using the System ClassLoader directly, we derive the ClassLoader
 			// from this class . That works better in cases where Flink runs embedded and all Flink
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceConfiguration.java
index 0c478a9..91b5a07 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceConfiguration.java
@@ -18,6 +18,7 @@
 package org.apache.flink.runtime.rpc.akka;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 
@@ -38,11 +39,19 @@ public class AkkaRpcServiceConfiguration {
 
 	private final long maximumFramesize;
 
-	public AkkaRpcServiceConfiguration(@Nonnull Configuration configuration, @Nonnull Time timeout, long maximumFramesize) {
+	private final boolean captureAskCallStack;
+
+	public AkkaRpcServiceConfiguration(
+			@Nonnull Configuration configuration,
+			@Nonnull Time timeout,
+			long maximumFramesize,
+			boolean captureAskCallStack) {
+
 		checkArgument(maximumFramesize > 0L, "Maximum framesize must be positive.");
 		this.configuration = configuration;
 		this.timeout = timeout;
 		this.maximumFramesize = maximumFramesize;
+		this.captureAskCallStack = captureAskCallStack;
 	}
 
 	@Nonnull
@@ -59,12 +68,18 @@ public class AkkaRpcServiceConfiguration {
 		return maximumFramesize;
 	}
 
+	public boolean captureAskCallStack() {
+		return captureAskCallStack;
+	}
+
 	public static AkkaRpcServiceConfiguration fromConfiguration(Configuration configuration) {
 		final Time timeout = AkkaUtils.getTimeoutAsTime(configuration);
 
 		final long maximumFramesize = AkkaRpcServiceUtils.extractMaximumFramesize(configuration);
 
-		return new AkkaRpcServiceConfiguration(configuration, timeout, maximumFramesize);
+		final boolean captureAskCallStacks = configuration.get(AkkaOptions.CAPTURE_ASK_CALLSTACK);
+
+		return new AkkaRpcServiceConfiguration(configuration, timeout, maximumFramesize, captureAskCallStacks);
 	}
 
 	public static AkkaRpcServiceConfiguration defaultConfiguration() {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaInvocationHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaInvocationHandler.java
index 564b1ef..1a15fe2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaInvocationHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaInvocationHandler.java
@@ -61,8 +61,9 @@ public class FencedAkkaInvocationHandler<F extends Serializable> extends AkkaInv
 			Time timeout,
 			long maximumFramesize,
 			@Nullable CompletableFuture<Void> terminationFuture,
-			Supplier<F> fencingTokenSupplier) {
-		super(address, hostname, rpcEndpoint, timeout, maximumFramesize, terminationFuture);
+			Supplier<F> fencingTokenSupplier,
+			boolean captureAskCallStacks) {
+		super(address, hostname, rpcEndpoint, timeout, maximumFramesize, terminationFuture, captureAskCallStacks);
 
 		this.fencingTokenSupplier = Preconditions.checkNotNull(fencingTokenSupplier);
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
index 553b07c..3a1632c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
@@ -51,7 +51,6 @@ import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
 
-import akka.pattern.AskTimeoutException;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -63,9 +62,11 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
@@ -249,7 +250,9 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 				firstFuture.get();
 				fail("Should have failed because connection to taskmanager is delayed beyond timeout");
 			} catch (Exception e) {
-				assertThat(ExceptionUtils.stripExecutionException(e), instanceOf(AskTimeoutException.class));
+				final Throwable cause = ExceptionUtils.stripExecutionException(e);
+				assertThat(cause, instanceOf(TimeoutException.class));
+				assertThat(cause.getMessage(), containsString("ResourceManagerGateway.registerTaskExecutor"));
 			}
 
 			startConnection.await();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/TimeoutCallStackTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/TimeoutCallStackTest.java
new file mode 100644
index 0000000..4457026
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/TimeoutCallStackTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.RpcTimeout;
+import org.apache.flink.util.IOUtils;
+
+import akka.actor.ActorSystem;
+import akka.actor.Terminated;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests that ask timeouts report the call stack of the calling function.
+ */
+public class TimeoutCallStackTest {
+
+	private static ActorSystem actorSystem;
+	private static RpcService rpcService;
+
+	private final List<RpcEndpoint> endpointsToStop = new ArrayList<>();
+
+	@BeforeClass
+	public static void setup() {
+		actorSystem = AkkaUtils.createDefaultActorSystem();
+		rpcService = new AkkaRpcService(actorSystem, AkkaRpcServiceConfiguration.defaultConfiguration());
+	}
+
+	@AfterClass
+	public static void teardown() throws Exception {
+
+		final CompletableFuture<Void> rpcTerminationFuture = rpcService.stopService();
+		final CompletableFuture<Terminated> actorSystemTerminationFuture = FutureUtils.toJava(actorSystem.terminate());
+
+		FutureUtils
+			.waitForAll(Arrays.asList(rpcTerminationFuture, actorSystemTerminationFuture))
+			.get(10_000, TimeUnit.MILLISECONDS);
+	}
+
+	@After
+	public void stopTestEndpoints() {
+		endpointsToStop.forEach(IOUtils::closeQuietly);
+	}
+
+	@Test
+	public void testTimeoutException() throws Exception {
+		final TestingGateway gateway = createTestingGateway();
+
+		final CompletableFuture<Void> future = gateway.callThatTimesOut(Time.milliseconds(1));
+
+		Throwable failureCause = null;
+		try {
+			future.get();
+			fail("test buggy: the call should never have completed");
+		} catch (ExecutionException e) {
+			failureCause = e.getCause();
+		}
+
+		assertThat(failureCause, instanceOf(TimeoutException.class));
+		assertThat(failureCause.getMessage(), containsString("callThatTimesOut"));
+		assertThat(failureCause.getStackTrace()[0].getMethodName(), equalTo("testTimeoutException"));
+	}
+
+	// ------------------------------------------------------------------------
+	//  setup helpers
+	// ------------------------------------------------------------------------
+
+	private TestingGateway createTestingGateway() throws Exception {
+		final TestingRpcEndpoint endpoint = new TestingRpcEndpoint(rpcService, "test_name");
+		endpointsToStop.add(endpoint);
+		endpoint.start();
+
+		return rpcService.connect(endpoint.getAddress(), TestingGateway.class).get();
+	}
+
+	// ------------------------------------------------------------------------
+	//  testing mocks / stubs
+	// ------------------------------------------------------------------------
+
+	private interface TestingGateway extends RpcGateway {
+
+		CompletableFuture<Void> callThatTimesOut(@RpcTimeout Time timeout);
+	}
+
+	private static final class TestingRpcEndpoint extends RpcEndpoint implements TestingGateway {
+
+		TestingRpcEndpoint(RpcService rpcService, String endpointId) {
+			super(rpcService, endpointId);
+		}
+
+		@Override
+		public CompletableFuture<Void> callThatTimesOut(@RpcTimeout Time timeout) {
+			// return a future that never completes, so the call is guaranteed to time out
+			return new CompletableFuture<>();
+		}
+	}
+}