You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/02/18 08:04:03 UTC
[flink] 04/05: [FLINK-15966][runtime] Capture callstacks for RPC
ask() calls to improve exceptions.
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 2746e6a4f3f84e05425484ad63ea096c5536c77f
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Wed Feb 5 20:09:12 2020 +0100
[FLINK-15966][runtime] Capture callstacks for RPC ask() calls to improve exceptions.
---
docs/_includes/generated/akka_configuration.html | 6 +
.../apache/flink/configuration/AkkaOptions.java | 12 ++
.../runtime/rpc/akka/AkkaInvocationHandler.java | 72 ++++++++---
.../flink/runtime/rpc/akka/AkkaRpcService.java | 19 ++-
.../rpc/akka/AkkaRpcServiceConfiguration.java | 19 ++-
.../rpc/akka/FencedAkkaInvocationHandler.java | 5 +-
.../ResourceManagerTaskExecutorTest.java | 7 +-
.../runtime/rpc/akka/TimeoutCallStackTest.java | 135 +++++++++++++++++++++
8 files changed, 246 insertions(+), 29 deletions(-)
diff --git a/docs/_includes/generated/akka_configuration.html b/docs/_includes/generated/akka_configuration.html
index c77a393..2baeb6f 100644
--- a/docs/_includes/generated/akka_configuration.html
+++ b/docs/_includes/generated/akka_configuration.html
@@ -9,6 +9,12 @@
</thead>
<tbody>
<tr>
+ <td><h5>akka.ask.callstack</h5></td>
+ <td style="word-wrap: break-word;">true</td>
+ <td>Boolean</td>
+ <td>If true, call stack for asynchronous asks are captured. That way, when an ask fails (for example times out), you get a proper exception, describing to the original method call and call site. Note that in case of having millions of concurrent RPC calls, this may add to the memory footprint.</td>
+ </tr>
+ <tr>
<td><h5>akka.ask.timeout</h5></td>
<td style="word-wrap: break-word;">"10 s"</td>
<td>String</td>
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
index e71f18f..bdecb2e 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
@@ -30,6 +30,18 @@ import static org.apache.flink.configuration.description.LinkElement.link;
public class AkkaOptions {
/**
+ * Flag whether to capture call stacks for RPC ask calls.
+ */
+ public static final ConfigOption<Boolean> CAPTURE_ASK_CALLSTACK = ConfigOptions
+ .key("akka.ask.callstack")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription("If true, call stack for asynchronous asks are captured. That way, when an ask fails " +
+ "(for example times out), you get a proper exception, describing to the original method call and " +
+ "call site. Note that in case of having millions of concurrent RPC calls, this may add to the " +
+ "memory footprint.");
+
+ /**
* Timeout for akka ask calls.
*/
public static final ConfigOption<String> ASK_TIMEOUT = ConfigOptions
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
index 44bfb3b..3b2a2cb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
@@ -47,6 +47,7 @@ import java.io.IOException;
import java.lang.annotation.Annotation;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
+import java.util.Arrays;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
@@ -90,13 +91,16 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, Rpc
@Nullable
private final CompletableFuture<Void> terminationFuture;
+ private final boolean captureAskCallStack;
+
AkkaInvocationHandler(
- String address,
- String hostname,
- ActorRef rpcEndpoint,
- Time timeout,
- long maximumFramesize,
- @Nullable CompletableFuture<Void> terminationFuture) {
+ String address,
+ String hostname,
+ ActorRef rpcEndpoint,
+ Time timeout,
+ long maximumFramesize,
+ @Nullable CompletableFuture<Void> terminationFuture,
+ boolean captureAskCallStack) {
this.address = Preconditions.checkNotNull(address);
this.hostname = Preconditions.checkNotNull(hostname);
@@ -105,6 +109,7 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, Rpc
this.timeout = Preconditions.checkNotNull(timeout);
this.maximumFramesize = maximumFramesize;
this.terminationFuture = terminationFuture;
+ this.captureAskCallStack = captureAskCallStack;
}
@Override
@@ -208,20 +213,20 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, Rpc
result = null;
} else {
+ // Capture the call stack. It is significantly faster to do that via an exception than
+ // via Thread.getStackTrace(), because exceptions lazily initialize the stack trace, initially only
+ // capture a lightweight native pointer, and convert that into the stack trace lazily when needed.
+ final Throwable callStackCapture = captureAskCallStack ? new Throwable() : null;
+
// execute an asynchronous call
- CompletableFuture<?> resultFuture = ask(rpcInvocation, futureTimeout);
-
- CompletableFuture<?> completableFuture = resultFuture.thenApply((Object o) -> {
- if (o instanceof SerializedValue) {
- try {
- return ((SerializedValue<?>) o).deserializeValue(getClass().getClassLoader());
- } catch (IOException | ClassNotFoundException e) {
- throw new CompletionException(
- new RpcException("Could not deserialize the serialized payload of RPC method : "
- + methodName, e));
- }
+ final CompletableFuture<?> resultFuture = ask(rpcInvocation, futureTimeout);
+
+ final CompletableFuture<Object> completableFuture = new CompletableFuture<>();
+ resultFuture.whenComplete((resultValue, failure) -> {
+ if (failure != null) {
+ completableFuture.completeExceptionally(resolveTimeoutException(failure, callStackCapture, method));
} else {
- return o;
+ completableFuture.complete(deserializeValueIfNeeded(resultValue, method));
}
});
@@ -370,4 +375,35 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, Rpc
public CompletableFuture<Void> getTerminationFuture() {
return terminationFuture;
}
+
+ static Object deserializeValueIfNeeded(Object o, Method method) {
+ if (o instanceof SerializedValue) {
+ try {
+ return ((SerializedValue<?>) o).deserializeValue(AkkaInvocationHandler.class.getClassLoader());
+ } catch (IOException | ClassNotFoundException e) {
+ throw new CompletionException(
+ new RpcException(
+ "Could not deserialize the serialized payload of RPC method : " + method.getName(), e));
+ }
+ } else {
+ return o;
+ }
+ }
+
+ static Throwable resolveTimeoutException(Throwable exception, @Nullable Throwable callStackCapture, Method method) {
+ if (!(exception instanceof akka.pattern.AskTimeoutException)) {
+ return exception;
+ }
+
+ final TimeoutException newException = new TimeoutException("Invocation of " + method + " timed out.");
+ newException.initCause(exception);
+
+ if (callStackCapture != null) {
+ // remove the stack frames coming from the proxy interface invocation
+ final StackTraceElement[] stackTrace = callStackCapture.getStackTrace();
+ newException.setStackTrace(Arrays.copyOfRange(stackTrace, 3, stackTrace.length));
+ }
+
+ return newException;
+ }
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index 344f96b..0c41f05 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -98,6 +98,8 @@ public class AkkaRpcService implements RpcService {
private final String address;
private final int port;
+ private final boolean captureAskCallstacks;
+
private final ScheduledExecutor internalScheduledExecutor;
private final CompletableFuture<Void> terminationFuture;
@@ -122,6 +124,8 @@ public class AkkaRpcService implements RpcService {
port = -1;
}
+ captureAskCallstacks = configuration.captureAskCallStack();
+
internalScheduledExecutor = new ActorSystemScheduledExecutorAdapter(actorSystem);
terminationFuture = new CompletableFuture<>();
@@ -165,7 +169,8 @@ public class AkkaRpcService implements RpcService {
actorRef,
configuration.getTimeout(),
configuration.getMaximumFramesize(),
- null);
+ null,
+ captureAskCallstacks);
});
}
@@ -185,7 +190,8 @@ public class AkkaRpcService implements RpcService {
configuration.getTimeout(),
configuration.getMaximumFramesize(),
null,
- () -> fencingToken);
+ () -> fencingToken,
+ captureAskCallstacks);
});
}
@@ -247,7 +253,8 @@ public class AkkaRpcService implements RpcService {
configuration.getTimeout(),
configuration.getMaximumFramesize(),
terminationFuture,
- ((FencedRpcEndpoint<?>) rpcEndpoint)::getFencingToken);
+ ((FencedRpcEndpoint<?>) rpcEndpoint)::getFencingToken,
+ captureAskCallstacks);
implementedRpcGateways.add(FencedMainThreadExecutable.class);
} else {
@@ -257,7 +264,8 @@ public class AkkaRpcService implements RpcService {
actorRef,
configuration.getTimeout(),
configuration.getMaximumFramesize(),
- terminationFuture);
+ terminationFuture,
+ captureAskCallstacks);
}
// Rather than using the System ClassLoader directly, we derive the ClassLoader
@@ -285,7 +293,8 @@ public class AkkaRpcService implements RpcService {
configuration.getTimeout(),
configuration.getMaximumFramesize(),
null,
- () -> fencingToken);
+ () -> fencingToken,
+ captureAskCallstacks);
// Rather than using the System ClassLoader directly, we derive the ClassLoader
// from this class . That works better in cases where Flink runs embedded and all Flink
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceConfiguration.java
index 0c478a9..91b5a07 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceConfiguration.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.rpc.akka;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
@@ -38,11 +39,19 @@ public class AkkaRpcServiceConfiguration {
private final long maximumFramesize;
- public AkkaRpcServiceConfiguration(@Nonnull Configuration configuration, @Nonnull Time timeout, long maximumFramesize) {
+ private final boolean captureAskCallStack;
+
+ public AkkaRpcServiceConfiguration(
+ @Nonnull Configuration configuration,
+ @Nonnull Time timeout,
+ long maximumFramesize,
+ boolean captureAskCallStack) {
+
checkArgument(maximumFramesize > 0L, "Maximum framesize must be positive.");
this.configuration = configuration;
this.timeout = timeout;
this.maximumFramesize = maximumFramesize;
+ this.captureAskCallStack = captureAskCallStack;
}
@Nonnull
@@ -59,12 +68,18 @@ public class AkkaRpcServiceConfiguration {
return maximumFramesize;
}
+ public boolean captureAskCallStack() {
+ return captureAskCallStack;
+ }
+
public static AkkaRpcServiceConfiguration fromConfiguration(Configuration configuration) {
final Time timeout = AkkaUtils.getTimeoutAsTime(configuration);
final long maximumFramesize = AkkaRpcServiceUtils.extractMaximumFramesize(configuration);
- return new AkkaRpcServiceConfiguration(configuration, timeout, maximumFramesize);
+ final boolean captureAskCallStacks = configuration.get(AkkaOptions.CAPTURE_ASK_CALLSTACK);
+
+ return new AkkaRpcServiceConfiguration(configuration, timeout, maximumFramesize, captureAskCallStacks);
}
public static AkkaRpcServiceConfiguration defaultConfiguration() {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaInvocationHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaInvocationHandler.java
index 564b1ef..1a15fe2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaInvocationHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaInvocationHandler.java
@@ -61,8 +61,9 @@ public class FencedAkkaInvocationHandler<F extends Serializable> extends AkkaInv
Time timeout,
long maximumFramesize,
@Nullable CompletableFuture<Void> terminationFuture,
- Supplier<F> fencingTokenSupplier) {
- super(address, hostname, rpcEndpoint, timeout, maximumFramesize, terminationFuture);
+ Supplier<F> fencingTokenSupplier,
+ boolean captureAskCallStacks) {
+ super(address, hostname, rpcEndpoint, timeout, maximumFramesize, terminationFuture, captureAskCallStacks);
this.fencingTokenSupplier = Preconditions.checkNotNull(fencingTokenSupplier);
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
index 553b07c..3a1632c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
@@ -51,7 +51,6 @@ import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
-import akka.pattern.AskTimeoutException;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@@ -63,9 +62,11 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
@@ -249,7 +250,9 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
firstFuture.get();
fail("Should have failed because connection to taskmanager is delayed beyond timeout");
} catch (Exception e) {
- assertThat(ExceptionUtils.stripExecutionException(e), instanceOf(AskTimeoutException.class));
+ final Throwable cause = ExceptionUtils.stripExecutionException(e);
+ assertThat(cause, instanceOf(TimeoutException.class));
+ assertThat(cause.getMessage(), containsString("ResourceManagerGateway.registerTaskExecutor"));
}
startConnection.await();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/TimeoutCallStackTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/TimeoutCallStackTest.java
new file mode 100644
index 0000000..4457026
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/TimeoutCallStackTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.RpcTimeout;
+import org.apache.flink.util.IOUtils;
+
+import akka.actor.ActorSystem;
+import akka.actor.Terminated;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests that ask timeouts report the call stack of the calling function.
+ */
+public class TimeoutCallStackTest {
+
+ private static ActorSystem actorSystem;
+ private static RpcService rpcService;
+
+ private final List<RpcEndpoint> endpointsToStop = new ArrayList<>();
+
+ @BeforeClass
+ public static void setup() {
+ actorSystem = AkkaUtils.createDefaultActorSystem();
+ rpcService = new AkkaRpcService(actorSystem, AkkaRpcServiceConfiguration.defaultConfiguration());
+ }
+
+ @AfterClass
+ public static void teardown() throws Exception {
+
+ final CompletableFuture<Void> rpcTerminationFuture = rpcService.stopService();
+ final CompletableFuture<Terminated> actorSystemTerminationFuture = FutureUtils.toJava(actorSystem.terminate());
+
+ FutureUtils
+ .waitForAll(Arrays.asList(rpcTerminationFuture, actorSystemTerminationFuture))
+ .get(10_000, TimeUnit.MILLISECONDS);
+ }
+
+ @After
+ public void stopTestEndpoints() {
+ endpointsToStop.forEach(IOUtils::closeQuietly);
+ }
+
+ @Test
+ public void testTimeoutException() throws Exception {
+ final TestingGateway gateway = createTestingGateway();
+
+ final CompletableFuture<Void> future = gateway.callThatTimesOut(Time.milliseconds(1));
+
+ Throwable failureCause = null;
+ try {
+ future.get();
+ fail("test buggy: the call should never have completed");
+ } catch (ExecutionException e) {
+ failureCause = e.getCause();
+ }
+
+ assertThat(failureCause, instanceOf(TimeoutException.class));
+ assertThat(failureCause.getMessage(), containsString("callThatTimesOut"));
+ assertThat(failureCause.getStackTrace()[0].getMethodName(), equalTo("testTimeoutException"));
+ }
+
+ // ------------------------------------------------------------------------
+ // setup helpers
+ // ------------------------------------------------------------------------
+
+ private TestingGateway createTestingGateway() throws Exception {
+ final TestingRpcEndpoint endpoint = new TestingRpcEndpoint(rpcService, "test_name");
+ endpointsToStop.add(endpoint);
+ endpoint.start();
+
+ return rpcService.connect(endpoint.getAddress(), TestingGateway.class).get();
+ }
+
+ // ------------------------------------------------------------------------
+ // testing mocks / stubs
+ // ------------------------------------------------------------------------
+
+ private interface TestingGateway extends RpcGateway {
+
+ CompletableFuture<Void> callThatTimesOut(@RpcTimeout Time timeout);
+ }
+
+ private static final class TestingRpcEndpoint extends RpcEndpoint implements TestingGateway {
+
+ TestingRpcEndpoint(RpcService rpcService, String endpointId) {
+ super(rpcService, endpointId);
+ }
+
+ @Override
+ public CompletableFuture<Void> callThatTimesOut(@RpcTimeout Time timeout) {
+ // return a future that never completes, so the call is guaranteed to time out
+ return new CompletableFuture<>();
+ }
+ }
+}