You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/03 12:10:48 UTC

[41/50] [abbrv] flink git commit: [FLINK-2067] [runtime] Unwrap the ExceptionInChainedOperatorException exceptions to clean up stack traces

[FLINK-2067] [runtime] Unwrap the ExceptionInChainedOperatorException exceptions to clean up stack traces


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6181302f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6181302f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6181302f

Branch: refs/heads/table-retraction
Commit: 6181302f1ab741b86af357e4513f5952a5fc1531
Parents: c9623be
Author: Stephan Ewen <se...@apache.org>
Authored: Tue May 2 22:48:08 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue May 2 23:11:03 2017 +0200

----------------------------------------------------------------------
 .../flink/util/WrappingRuntimeException.java    | 54 +++++++++++++++++++
 .../apache/flink/runtime/taskmanager/Task.java  |  7 +++
 .../flink/runtime/taskmanager/TaskTest.java     | 57 ++++++++++++++++++--
 .../ExceptionInChainedOperatorException.java    | 11 +---
 .../streaming/runtime/tasks/OperatorChain.java  |  4 +-
 5 files changed, 120 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6181302f/flink-core/src/main/java/org/apache/flink/util/WrappingRuntimeException.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/WrappingRuntimeException.java b/flink-core/src/main/java/org/apache/flink/util/WrappingRuntimeException.java
new file mode 100644
index 0000000..f9306df
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/WrappingRuntimeException.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import javax.annotation.Nonnull;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A runtime exception that is explicitly used to wrap non-runtime exceptions.
+ * 
+ * <p>The exception is recognized (for example by the Task when reporting exceptions as
+ * failure causes) and unwrapped to avoid including the wrapper's stack trace in the reports.
+ * That way, exception traces are keeping to the important parts.
+ */
+public class WrappingRuntimeException extends FlinkRuntimeException {
+
+	private static final long serialVersionUID = 1L;
+
+	public WrappingRuntimeException(@Nonnull Throwable cause) {
+		super(checkNotNull(cause));
+	}
+
+	public WrappingRuntimeException(String message, @Nonnull Throwable cause) {
+		super(message, checkNotNull(cause));
+	}
+
+	/**
+	 * Recursively unwraps this WrappingRuntimeException and its causes, getting the first
+	 * non wrapping exception.
+	 * 
+	 * @return The first cause that is not a wrapping exception.
+	 */
+	public Throwable unwrap() {
+		Throwable cause = getCause();
+		return (cause instanceof WrappingRuntimeException) ? ((WrappingRuntimeException) cause).unwrap() : cause;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6181302f/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index dab0f95..e626dae 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -71,6 +71,8 @@ import org.apache.flink.runtime.state.TaskStateHandles;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
+
+import org.apache.flink.util.WrappingRuntimeException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -727,6 +729,11 @@ public class Task implements Runnable, TaskActions {
 		}
 		catch (Throwable t) {
 
+			// unwrap wrapped exceptions to make stack traces more compact
+			if (t instanceof WrappingRuntimeException) {
+				t = ((WrappingRuntimeException) t).unwrap();
+			}
+
 			// ----------------------------------------------------------------
 			// the execution failed. either the invokable code properly failed, or
 			// an exception was thrown as a side effect of cancelling

http://git-wip-us.apache.org/repos/asf/flink/blob/6181302f/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 2522287..56a3b07 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -59,11 +59,13 @@ import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.WrappingRuntimeException;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import scala.concurrent.duration.FiniteDuration;
 
+import javax.annotation.Nonnull;
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.net.URL;
@@ -117,9 +119,9 @@ public class TaskTest extends TestLogger {
 	
 	@Before
 	public void createQueuesAndActors() {
-		taskManagerMessages = new LinkedBlockingQueue<Object>();
-		jobManagerMessages = new LinkedBlockingQueue<Object>();
-		listenerMessages = new LinkedBlockingQueue<Object>();
+		taskManagerMessages = new LinkedBlockingQueue<>();
+		jobManagerMessages = new LinkedBlockingQueue<>();
+		listenerMessages = new LinkedBlockingQueue<>();
 		taskManagerGateway = new ForwardingActorGateway(taskManagerMessages);
 		jobManagerGateway = new ForwardingActorGateway(jobManagerMessages);
 		listenerGateway = new ForwardingActorGateway(listenerMessages);
@@ -335,6 +337,32 @@ public class TaskTest extends TestLogger {
 			fail(e.getMessage());
 		}
 	}
+
+	@Test
+	public void testFailWithWrappedException() {
+		try {
+			Task task = createTask(FailingInvokableWithChainedException.class);
+			task.registerExecutionListener(listener);
+
+			task.run();
+
+			assertEquals(ExecutionState.FAILED, task.getExecutionState());
+			assertTrue(task.isCanceledOrFailed());
+
+			Throwable cause = task.getFailureCause();
+			assertTrue(cause instanceof IOException);
+
+			validateTaskManagerStateChange(ExecutionState.RUNNING, task, false);
+			validateUnregisterTask(task.getExecutionId());
+
+			validateListenerMessage(ExecutionState.RUNNING, task, false);
+			validateListenerMessage(ExecutionState.FAILED, task, true);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
 	
 	@Test
 	public void testCancelDuringInvoke() {
@@ -1232,4 +1260,27 @@ public class TaskTest extends TestLogger {
 		public void cancel() throws Exception {
 		}
 	}
+
+	public static final class FailingInvokableWithChainedException extends AbstractInvokable {
+
+		@Override
+		public void invoke() throws Exception {
+			throw new TestWrappedException(new IOException("test"));
+		}
+
+		@Override
+		public void cancel() {}
+	}
+
+	// ------------------------------------------------------------------------
+	//  test exceptions
+	// ------------------------------------------------------------------------
+
+	private static class TestWrappedException extends WrappingRuntimeException {
+		private static final long serialVersionUID = 1L;
+
+		public TestWrappedException(@Nonnull Throwable cause) {
+			super(cause);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6181302f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ExceptionInChainedOperatorException.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ExceptionInChainedOperatorException.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ExceptionInChainedOperatorException.java
index 77c80c9..d4027bf 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ExceptionInChainedOperatorException.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ExceptionInChainedOperatorException.java
@@ -21,12 +21,13 @@ package org.apache.flink.streaming.runtime.tasks;
 import static java.util.Objects.requireNonNull;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.WrappingRuntimeException;
 
 /**
  * A special exception that signifies that the cause exception came from a chained operator.
  */
 @Internal
-public class ExceptionInChainedOperatorException extends RuntimeException {
+public class ExceptionInChainedOperatorException extends WrappingRuntimeException {
 
 	private static final long serialVersionUID = 1L;
 
@@ -37,12 +38,4 @@ public class ExceptionInChainedOperatorException extends RuntimeException {
 	public ExceptionInChainedOperatorException(String message, Throwable cause) {
 		super(message, requireNonNull(cause));
 	}
-
-	public Throwable getOriginalCause() {
-		Throwable ex = this;
-		do {
-			ex = ex.getCause();
-		} while (ex instanceof ExceptionInChainedOperatorException);
-		return ex;
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6181302f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index b85461d..870c2ed 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -54,6 +55,7 @@ import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
 import org.apache.flink.streaming.runtime.streamstatus.StreamStatusProvider;
 import org.apache.flink.util.OutputTag;
 import org.apache.flink.util.XORShiftRandom;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -525,7 +527,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
 				operator.setKeyContextElement1(copy);
 				operator.processElement(copy);
 			} catch (Exception e) {
-				throw new RuntimeException("Could not forward element to next operator", e);
+				throw new ExceptionInChainedOperatorException(e);
 			}
 
 		}