You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/07/16 16:57:14 UTC
[flink] branch master updated: [FLINK-18163][task] Add
RecordWriter.volatileFlusherException
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 9fa0a99 [FLINK-18163][task] Add RecordWriter.volatileFlusherException
9fa0a99 is described below
commit 9fa0a99e5886fbebf4c02d5194ac047994e883ce
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Mon Jul 6 10:42:46 2020 +0200
[FLINK-18163][task] Add RecordWriter.volatileFlusherException
---
.../flink/runtime/io/network/api/writer/RecordWriter.java | 13 +++++++++++--
1 file changed, 11 insertions(+), 2 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index be40d8d..49c79bc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -90,6 +90,9 @@ public abstract class RecordWriter<T extends IOReadableWritable> implements Avai
/** To avoid synchronization overhead on the critical path, best-effort error tracking is enough here.*/
private Throwable flusherException;
+ private volatile Throwable volatileFlusherException;
+ private int volatileFlusherExceptionCheckSkipCount;
+ private static final int VOLATILE_FLUSHER_EXCEPTION_MAX_CHECK_SKIP_COUNT = 100;
RecordWriter(ResultPartitionWriter writer, long timeout, String taskName) {
this.targetPartition = writer;
@@ -271,12 +274,18 @@ public abstract class RecordWriter<T extends IOReadableWritable> implements Avai
if (flusherException == null) {
LOG.error("An exception happened while flushing the outputs", t);
flusherException = t;
+ volatileFlusherException = t;
}
}
protected void checkErroneous() throws IOException {
- if (flusherException != null) {
- throw new IOException("An exception happened while flushing the outputs", flusherException);
+ // For performance reasons, we are not checking volatile field every single time.
+ if (flusherException != null ||
+ (volatileFlusherExceptionCheckSkipCount >= VOLATILE_FLUSHER_EXCEPTION_MAX_CHECK_SKIP_COUNT && volatileFlusherException != null)) {
+ throw new IOException("An exception happened while flushing the outputs", volatileFlusherException);
+ }
+ if (++volatileFlusherExceptionCheckSkipCount >= VOLATILE_FLUSHER_EXCEPTION_MAX_CHECK_SKIP_COUNT) {
+ volatileFlusherExceptionCheckSkipCount = 0;
}
}