You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/05/02 21:47:30 UTC
[8/8] flink git commit: [FLINK-2067] [runtime] Unwrap the
ExceptionInChainedOperatorException exceptions to clean up stack traces
[FLINK-2067] [runtime] Unwrap the ExceptionInChainedOperatorException exceptions to clean up stack traces
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6181302f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6181302f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6181302f
Branch: refs/heads/master
Commit: 6181302f1ab741b86af357e4513f5952a5fc1531
Parents: c9623be
Author: Stephan Ewen <se...@apache.org>
Authored: Tue May 2 22:48:08 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue May 2 23:11:03 2017 +0200
----------------------------------------------------------------------
.../flink/util/WrappingRuntimeException.java | 54 +++++++++++++++++++
.../apache/flink/runtime/taskmanager/Task.java | 7 +++
.../flink/runtime/taskmanager/TaskTest.java | 57 ++++++++++++++++++--
.../ExceptionInChainedOperatorException.java | 11 +---
.../streaming/runtime/tasks/OperatorChain.java | 4 +-
5 files changed, 120 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6181302f/flink-core/src/main/java/org/apache/flink/util/WrappingRuntimeException.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/WrappingRuntimeException.java b/flink-core/src/main/java/org/apache/flink/util/WrappingRuntimeException.java
new file mode 100644
index 0000000..f9306df
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/WrappingRuntimeException.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import javax.annotation.Nonnull;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A runtime exception that is explicitly used to wrap non-runtime exceptions.
+ *
+ * <p>The exception is recognized (for example by the Task when reporting exceptions as
+ * failure causes) and unwrapped to avoid including the wrapper's stack trace in the reports.
+ * That way, exception traces are keeping to the important parts.
+ */
+public class WrappingRuntimeException extends FlinkRuntimeException {
+
+ private static final long serialVersionUID = 1L;
+
+ public WrappingRuntimeException(@Nonnull Throwable cause) {
+ super(checkNotNull(cause));
+ }
+
+ public WrappingRuntimeException(String message, @Nonnull Throwable cause) {
+ super(message, checkNotNull(cause));
+ }
+
+ /**
+ * Recursively unwraps this WrappingRuntimeException and its causes, getting the first
+ * non wrapping exception.
+ *
+ * @return The first cause that is not a wrapping exception.
+ */
+ public Throwable unwrap() {
+ Throwable cause = getCause();
+ return (cause instanceof WrappingRuntimeException) ? ((WrappingRuntimeException) cause).unwrap() : cause;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6181302f/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index dab0f95..e626dae 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -71,6 +71,8 @@ import org.apache.flink.runtime.state.TaskStateHandles;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
+
+import org.apache.flink.util.WrappingRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -727,6 +729,11 @@ public class Task implements Runnable, TaskActions {
}
catch (Throwable t) {
+ // unwrap wrapped exceptions to make stack traces more compact
+ if (t instanceof WrappingRuntimeException) {
+ t = ((WrappingRuntimeException) t).unwrap();
+ }
+
// ----------------------------------------------------------------
// the execution failed. either the invokable code properly failed, or
// an exception was thrown as a side effect of cancelling
http://git-wip-us.apache.org/repos/asf/flink/blob/6181302f/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 2522287..56a3b07 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -59,11 +59,13 @@ import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.WrappingRuntimeException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import scala.concurrent.duration.FiniteDuration;
+import javax.annotation.Nonnull;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.URL;
@@ -117,9 +119,9 @@ public class TaskTest extends TestLogger {
@Before
public void createQueuesAndActors() {
- taskManagerMessages = new LinkedBlockingQueue<Object>();
- jobManagerMessages = new LinkedBlockingQueue<Object>();
- listenerMessages = new LinkedBlockingQueue<Object>();
+ taskManagerMessages = new LinkedBlockingQueue<>();
+ jobManagerMessages = new LinkedBlockingQueue<>();
+ listenerMessages = new LinkedBlockingQueue<>();
taskManagerGateway = new ForwardingActorGateway(taskManagerMessages);
jobManagerGateway = new ForwardingActorGateway(jobManagerMessages);
listenerGateway = new ForwardingActorGateway(listenerMessages);
@@ -335,6 +337,32 @@ public class TaskTest extends TestLogger {
fail(e.getMessage());
}
}
+
+ @Test
+ public void testFailWithWrappedException() {
+ try {
+ Task task = createTask(FailingInvokableWithChainedException.class);
+ task.registerExecutionListener(listener);
+
+ task.run();
+
+ assertEquals(ExecutionState.FAILED, task.getExecutionState());
+ assertTrue(task.isCanceledOrFailed());
+
+ Throwable cause = task.getFailureCause();
+ assertTrue(cause instanceof IOException);
+
+ validateTaskManagerStateChange(ExecutionState.RUNNING, task, false);
+ validateUnregisterTask(task.getExecutionId());
+
+ validateListenerMessage(ExecutionState.RUNNING, task, false);
+ validateListenerMessage(ExecutionState.FAILED, task, true);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
@Test
public void testCancelDuringInvoke() {
@@ -1232,4 +1260,27 @@ public class TaskTest extends TestLogger {
public void cancel() throws Exception {
}
}
+
+ public static final class FailingInvokableWithChainedException extends AbstractInvokable {
+
+ @Override
+ public void invoke() throws Exception {
+ throw new TestWrappedException(new IOException("test"));
+ }
+
+ @Override
+ public void cancel() {}
+ }
+
+ // ------------------------------------------------------------------------
+ // test exceptions
+ // ------------------------------------------------------------------------
+
+ private static class TestWrappedException extends WrappingRuntimeException {
+ private static final long serialVersionUID = 1L;
+
+ public TestWrappedException(@Nonnull Throwable cause) {
+ super(cause);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6181302f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ExceptionInChainedOperatorException.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ExceptionInChainedOperatorException.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ExceptionInChainedOperatorException.java
index 77c80c9..d4027bf 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ExceptionInChainedOperatorException.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ExceptionInChainedOperatorException.java
@@ -21,12 +21,13 @@ package org.apache.flink.streaming.runtime.tasks;
import static java.util.Objects.requireNonNull;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.WrappingRuntimeException;
/**
* A special exception that signifies that the cause exception came from a chained operator.
*/
@Internal
-public class ExceptionInChainedOperatorException extends RuntimeException {
+public class ExceptionInChainedOperatorException extends WrappingRuntimeException {
private static final long serialVersionUID = 1L;
@@ -37,12 +38,4 @@ public class ExceptionInChainedOperatorException extends RuntimeException {
public ExceptionInChainedOperatorException(String message, Throwable cause) {
super(message, requireNonNull(cause));
}
-
- public Throwable getOriginalCause() {
- Throwable ex = this;
- do {
- ex = ex.getCause();
- } while (ex instanceof ExceptionInChainedOperatorException);
- return ex;
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6181302f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index b85461d..870c2ed 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
+
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -54,6 +55,7 @@ import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusProvider;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.XORShiftRandom;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -525,7 +527,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
operator.setKeyContextElement1(copy);
operator.processElement(copy);
} catch (Exception e) {
- throw new RuntimeException("Could not forward element to next operator", e);
+ throw new ExceptionInChainedOperatorException(e);
}
}