You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/06/30 13:33:02 UTC
[3/3] flink git commit: Remove polluting log message in
ContinuousFileReaderOperator
Remove polluting log message in ContinuousFileReaderOperator
Before, when snapshotting, we printed a log message about the file
input format not being checkpointable when the current split was
"null". Now, we only print the message when when appropriate.
This closes #2174
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6c6b17b4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6c6b17b4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6c6b17b4
Branch: refs/heads/master
Commit: 6c6b17b4d47d281b0e5dcf4413fd1ad53ce49eee
Parents: a9733a9
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Jun 30 11:46:52 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Jun 30 15:31:10 2016 +0200
----------------------------------------------------------------------
.../source/ContinuousFileReaderOperator.java | 33 ++++++++++++++------
1 file changed, 24 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6c6b17b4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index 1c2da34..0daa7ad 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -65,6 +65,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends AbstractStreamOperator<OUT>
implements OneInputStreamOperator<FileInputSplit, OUT>, OutputTypeConfigurable<OUT> {
+ private static final long serialVersionUID = 1L;
+
private static final Logger LOG = LoggerFactory.getLogger(ContinuousFileReaderOperator.class);
private static final FileInputSplit EOS = new FileInputSplit(-1, null, -1, -1, null);
@@ -75,7 +77,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
private FileInputFormat<OUT> format;
private TypeSerializer<OUT> serializer;
- private Object checkpointLock;
+ private transient Object checkpointLock;
private Tuple3<List<FileInputSplit>, FileInputSplit, S> readerState;
@@ -259,7 +261,12 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
}
if (this.format instanceof CheckpointableInputFormat && restoredFormatState != null) {
- ((CheckpointableInputFormat) format).reopen(currentSplit, restoredFormatState);
+
+ @SuppressWarnings("unchecked")
+ CheckpointableInputFormat<FileInputSplit, S> checkpointableFormat =
+ (CheckpointableInputFormat<FileInputSplit, S>) this.format;
+
+ checkpointableFormat.reopen(currentSplit, restoredFormatState);
} else {
// this is the case of a non-checkpointable input format that will reprocess the last split.
LOG.info("Format " + this.format.getClass().getName() + " used is not checkpointable.");
@@ -342,14 +349,22 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
this.pendingSplits.remove();
}
- if (this.format instanceof CheckpointableInputFormat && this.currentSplit != null) {
- S formatState = this.isSplitOpen ?
- (S) ((CheckpointableInputFormat) format).getCurrentState() :
- restoredFormatState;
- return new Tuple3<>(snapshot, currentSplit, formatState);
+ if (this.currentSplit != null) {
+ if (this.format instanceof CheckpointableInputFormat) {
+ @SuppressWarnings("unchecked")
+ CheckpointableInputFormat<FileInputSplit, S> checkpointableFormat =
+ (CheckpointableInputFormat<FileInputSplit, S>) this.format;
+
+ S formatState = this.isSplitOpen ?
+ checkpointableFormat.getCurrentState() :
+ restoredFormatState;
+ return new Tuple3<>(snapshot, currentSplit, formatState);
+ } else {
+ LOG.info("The format used is not checkpointable. The current input split will be restarted upon recovery.");
+ return new Tuple3<>(snapshot, currentSplit, null);
+ }
} else {
- LOG.info("The format used is not checkpointable. The current input split will be restarted upon recovery.");
- return new Tuple3<>(snapshot, currentSplit, null);
+ return new Tuple3<>(snapshot, null, null);
}
}