You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/07/16 16:57:14 UTC

[flink] branch master updated: [FLINK-18163][task] Add RecordWriter.volatileFlusherException

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 9fa0a99  [FLINK-18163][task] Add RecordWriter.volatileFlusherException
9fa0a99 is described below

commit 9fa0a99e5886fbebf4c02d5194ac047994e883ce
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Mon Jul 6 10:42:46 2020 +0200

    [FLINK-18163][task] Add RecordWriter.volatileFlusherException
---
 .../flink/runtime/io/network/api/writer/RecordWriter.java   | 13 +++++++++++--
 1 file changed, 11 insertions(+), 2 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index be40d8d..49c79bc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -90,6 +90,9 @@ public abstract class RecordWriter<T extends IOReadableWritable> implements Avai
 
 	/** To avoid synchronization overhead on the critical path, best-effort error tracking is enough here.*/
 	private Throwable flusherException;
+	private volatile Throwable volatileFlusherException;
+	private int volatileFlusherExceptionCheckSkipCount;
+	private static final int VOLATILE_FLUSHER_EXCEPTION_MAX_CHECK_SKIP_COUNT = 100;
 
 	RecordWriter(ResultPartitionWriter writer, long timeout, String taskName) {
 		this.targetPartition = writer;
@@ -271,12 +274,18 @@ public abstract class RecordWriter<T extends IOReadableWritable> implements Avai
 		if (flusherException == null) {
 			LOG.error("An exception happened while flushing the outputs", t);
 			flusherException = t;
+			volatileFlusherException = t;
 		}
 	}
 
 	protected void checkErroneous() throws IOException {
-		if (flusherException != null) {
-			throw new IOException("An exception happened while flushing the outputs", flusherException);
+		// For performance reasons, we are not checking volatile field every single time.
+		if (flusherException != null ||
+				(volatileFlusherExceptionCheckSkipCount >= VOLATILE_FLUSHER_EXCEPTION_MAX_CHECK_SKIP_COUNT && volatileFlusherException != null)) {
+			throw new IOException("An exception happened while flushing the outputs", volatileFlusherException);
+		}
+		if (++volatileFlusherExceptionCheckSkipCount >= VOLATILE_FLUSHER_EXCEPTION_MAX_CHECK_SKIP_COUNT) {
+			volatileFlusherExceptionCheckSkipCount = 0;
 		}
 	}