You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/06/22 15:44:46 UTC

[flink] branch master updated: [FLINK-17769] Wrong order of log events on a task failure

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new ec4d155  [FLINK-17769] Wrong order of log events on a task failure
ec4d155 is described below

commit ec4d155101faa2c1979c4981457e63d657068169
Author: Yuan Mei <yu...@gmail.com>
AuthorDate: Wed Jun 17 13:04:56 2020 +0800

    [FLINK-17769] Wrong order of log events on a task failure
    
    When a task failure occurs, the error of disposing of an operator is logged
    before the real rootcasue is printed, which is confusing.
    
    This fix suppressed exception occurring in disposing of an operator and
    attached the exception together with the rootcause.
---
 .../flink/streaming/runtime/tasks/StreamTask.java  | 108 ++++++++++-----------
 .../streaming/runtime/tasks/StreamTaskTest.java    |  30 ++++--
 2 files changed, 76 insertions(+), 62 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index a7d04d2..e65537f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -541,8 +541,10 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 			try {
 				cleanUpInvoke();
 			}
+			// TODO: investigate why Throwable instead of Exception is used here.
 			catch (Throwable cleanUpException) {
-				throw (Exception) ExceptionUtils.firstOrSuppressed(cleanUpException, invokeException);
+				Throwable throwable = ExceptionUtils.firstOrSuppressed(cleanUpException, invokeException);
+				throw (throwable instanceof Exception ? (Exception) throwable : new Exception(throwable));
 			}
 			throw invokeException;
 		}
@@ -593,8 +595,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 
 		// make an attempt to dispose the operators such that failures in the dispose call
 		// still let the computation fail
-		disposeAllOperators(false);
-		disposedOperators = true;
+		disposeAllOperators();
 	}
 
 	protected void cleanUpInvoke() throws Exception {
@@ -612,45 +613,28 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 		Thread.interrupted();
 
 		// stop all timers and threads
-		tryShutdownTimerService();
+		Exception suppressedException = runAndSuppressThrowable(this::tryShutdownTimerService, null);
 
 		// stop all asynchronous checkpoint threads
-		try {
-			cancelables.close();
-			shutdownAsyncThreads();
-		} catch (Throwable t) {
-			// catch and log the exception to not replace the original exception
-			LOG.error("Could not shut down async checkpoint threads", t);
-		}
+		suppressedException = runAndSuppressThrowable(cancelables::close, suppressedException);
+		suppressedException = runAndSuppressThrowable(this::shutdownAsyncThreads, suppressedException);
 
 		// we must! perform this cleanup
-		try {
-			cleanup();
-		} catch (Throwable t) {
-			// catch and log the exception to not replace the original exception
-			LOG.error("Error during cleanup of stream task", t);
-		}
+		suppressedException = runAndSuppressThrowable(this::cleanup, suppressedException);
 
 		// if the operators were not disposed before, do a hard dispose
-		disposeAllOperators(true);
+		suppressedException = runAndSuppressThrowable(this::disposeAllOperators, suppressedException);
 
 		// release the output resources. this method should never fail.
-		if (operatorChain != null) {
-			// beware: without synchronization, #performCheckpoint() may run in
-			//         parallel and this call is not thread-safe
-			actionExecutor.run(() -> operatorChain.releaseOutputs());
-		} else {
-			// failed to allocate operatorChain, clean up record writers
-			recordWriter.close();
-		}
+		suppressedException = runAndSuppressThrowable(this::releaseOutputResources, suppressedException);
 
-		try {
-			channelIOExecutor.shutdown();
-		} catch (Throwable t) {
-			LOG.error("Error during shutdown the channel state unspill executor", t);
-		}
+		suppressedException = runAndSuppressThrowable(channelIOExecutor::shutdown, suppressedException);
 
-		mailboxProcessor.close();
+		suppressedException = runAndSuppressThrowable(mailboxProcessor::close, suppressedException);
+
+		if (suppressedException != null) {
+			throw suppressedException;
+		}
 	}
 
 	@Override
@@ -687,27 +671,49 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 		}
 	}
 
+	private void releaseOutputResources() throws Exception {
+		if (operatorChain != null) {
+			// beware: without synchronization, #performCheckpoint() may run in
+			//         parallel and this call is not thread-safe
+			actionExecutor.run(() -> operatorChain.releaseOutputs());
+		} else {
+			// failed to allocate operatorChain, clean up record writers
+			recordWriter.close();
+		}
+	}
+
+	private Exception runAndSuppressThrowable(ThrowingRunnable<?> runnable, @Nullable Exception originalException) {
+		try {
+			runnable.run();
+		} catch (Throwable t) {
+			// TODO: investigate why Throwable instead of Exception is used here.
+			Exception e = t instanceof Exception ? (Exception) t : new Exception(t);
+			return ExceptionUtils.firstOrSuppressed(e, originalException);
+		}
+
+		return originalException;
+	}
+
 	/**
 	 * Execute @link StreamOperator#dispose()} of each operator in the chain of this
 	 * {@link StreamTask}. Disposing happens from <b>tail to head</b> operator in the chain.
 	 */
-	private void disposeAllOperators(boolean logOnlyErrors) throws Exception {
+	private void disposeAllOperators() throws Exception {
 		if (operatorChain != null && !disposedOperators) {
+			Exception disposalException = null;
 			for (StreamOperatorWrapper<?, ?> operatorWrapper : operatorChain.getAllOperators(true)) {
 				StreamOperator<?> operator = operatorWrapper.getStreamOperator();
-				if (!logOnlyErrors) {
+				try {
 					operator.dispose();
 				}
-				else {
-					try {
-						operator.dispose();
-					}
-					catch (Exception e) {
-						LOG.error("Error during disposal of stream operator.", e);
-					}
+				catch (Exception e) {
+					disposalException = ExceptionUtils.firstOrSuppressed(e, disposalException);
 				}
 			}
 			disposedOperators = true;
+			if (disposalException != null) {
+				throw disposalException;
+			}
 		}
 	}
 
@@ -964,20 +970,14 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 	}
 
 	private void tryShutdownTimerService() {
-
 		if (!timerService.isTerminated()) {
-
-			try {
-				final long timeoutMs = getEnvironment().getTaskManagerInfo().getConfiguration().
-					getLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT_TIMERS);
-
-				if (!timerService.shutdownServiceUninterruptible(timeoutMs)) {
-					LOG.warn("Timer service shutdown exceeded time limit of {} ms while waiting for pending " +
-						"timers. Will continue with shutdown procedure.", timeoutMs);
-				}
-			} catch (Throwable t) {
-				// catch and log the exception to not replace the original exception
-				LOG.error("Could not shut down timer service", t);
+			final long timeoutMs = getEnvironment()
+				.getTaskManagerInfo()
+				.getConfiguration()
+				.getLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT_TIMERS);
+			if (!timerService.shutdownServiceUninterruptible(timeoutMs)) {
+				LOG.warn("Timer service shutdown exceeded time limit of {} ms while waiting for pending " +
+					"timers. Will continue with shutdown procedure.", timeoutMs);
 			}
 		}
 	}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 833f07b..020aee4 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -135,7 +135,6 @@ import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.StreamCorruptedException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -152,6 +151,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 
+import static java.util.Arrays.asList;
 import static org.apache.flink.runtime.checkpoint.StateObjectCollection.singleton;
 import static org.apache.flink.streaming.util.StreamTaskUtil.waitTaskIsRunning;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -209,7 +209,15 @@ public class StreamTaskTest extends TestLogger {
 			testHarness.waitForTaskCompletion();
 		}
 		catch (Exception ex) {
-			if (!ExceptionUtils.findThrowable(ex, ExpectedTestException.class).isPresent()) {
+			// make sure the original exception is the cause and not wrapped
+			if (!(ex.getCause() instanceof ExpectedTestException)) {
+				throw ex;
+			}
+			// make sure DisposeException is the only suppressed exception
+			if (ex.getCause().getSuppressed().length != 1) {
+				throw ex;
+			}
+			if (!(ex.getCause().getSuppressed()[0] instanceof FailingTwiceOperator.DisposeException)) {
 				throw ex;
 			}
 		}
@@ -225,7 +233,13 @@ public class StreamTaskTest extends TestLogger {
 
 		@Override
 		public void dispose() throws Exception {
-			fail("This exception should be suppressed");
+			throw new DisposeException();
+		}
+
+		class DisposeException extends Exception {
+			public DisposeException() {
+				super("Dispose Exception. This exception should be suppressed");
+			}
 		}
 	}
 
@@ -818,7 +832,7 @@ public class StreamTaskTest extends TestLogger {
 		task.streamTask.cancel();
 
 		final FutureUtils.ConjunctFuture<Void> discardFuture = FutureUtils.waitForAll(
-			Arrays.asList(
+			asList(
 				managedKeyedStateHandle.getDiscardFuture(),
 				rawKeyedStateHandle.getDiscardFuture(),
 				managedOperatorStateHandle.getDiscardFuture(),
@@ -1059,8 +1073,8 @@ public class StreamTaskTest extends TestLogger {
 		}
 
 		MockEnvironment mockEnvironment = new MockEnvironmentBuilder().build();
-		mockEnvironment.addOutputs(Arrays.asList(partitions));
-		mockEnvironment.addInputs(Arrays.asList(gates));
+		mockEnvironment.addOutputs(asList(partitions));
+		mockEnvironment.addInputs(asList(gates));
 		StreamTask task = new MockStreamTaskBuilder(mockEnvironment).build();
 		try {
 			verifyResults(gates, partitions, false, false);
@@ -1095,8 +1109,8 @@ public class StreamTaskTest extends TestLogger {
 			NoOpCheckpointResponder.INSTANCE,
 			reader);
 		MockEnvironment mockEnvironment = new MockEnvironmentBuilder().setTaskStateManager(taskStateManager).build();
-		mockEnvironment.addOutputs(Arrays.asList(partitions));
-		mockEnvironment.addInputs(Arrays.asList(gates));
+		mockEnvironment.addOutputs(asList(partitions));
+		mockEnvironment.addInputs(asList(gates));
 		StreamTask task = new MockStreamTaskBuilder(mockEnvironment).build();
 		try {
 			verifyResults(gates, partitions, false, false);