You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/06/30 13:33:02 UTC

[3/3] flink git commit: Remove polluting log message in ContinuousFileReaderOperator

Remove polluting log message in ContinuousFileReaderOperator

Before, when snapshotting, we printed a log message about the file
input format not being checkpointable when the current split was
"null". Now, we only print the message when when appropriate.

This closes #2174


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6c6b17b4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6c6b17b4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6c6b17b4

Branch: refs/heads/master
Commit: 6c6b17b4d47d281b0e5dcf4413fd1ad53ce49eee
Parents: a9733a9
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Jun 30 11:46:52 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Jun 30 15:31:10 2016 +0200

----------------------------------------------------------------------
 .../source/ContinuousFileReaderOperator.java    | 33 ++++++++++++++------
 1 file changed, 24 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6c6b17b4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index 1c2da34..0daa7ad 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -65,6 +65,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends AbstractStreamOperator<OUT>
 	implements OneInputStreamOperator<FileInputSplit, OUT>, OutputTypeConfigurable<OUT> {
 
+	private static final long serialVersionUID = 1L;
+
 	private static final Logger LOG = LoggerFactory.getLogger(ContinuousFileReaderOperator.class);
 
 	private static final FileInputSplit EOS = new FileInputSplit(-1, null, -1, -1, null);
@@ -75,7 +77,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
 	private FileInputFormat<OUT> format;
 	private TypeSerializer<OUT> serializer;
 
-	private Object checkpointLock;
+	private transient Object checkpointLock;
 
 	private Tuple3<List<FileInputSplit>, FileInputSplit, S> readerState;
 
@@ -259,7 +261,12 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
 							}
 
 							if (this.format instanceof CheckpointableInputFormat && restoredFormatState != null) {
-								((CheckpointableInputFormat) format).reopen(currentSplit, restoredFormatState);
+
+								@SuppressWarnings("unchecked")
+								CheckpointableInputFormat<FileInputSplit, S> checkpointableFormat =
+										(CheckpointableInputFormat<FileInputSplit, S>) this.format;
+
+								checkpointableFormat.reopen(currentSplit, restoredFormatState);
 							} else {
 								// this is the case of a non-checkpointable input format that will reprocess the last split.
 								LOG.info("Format " + this.format.getClass().getName() + " used is not checkpointable.");
@@ -342,14 +349,22 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
 				this.pendingSplits.remove();
 			}
 
-			if (this.format instanceof CheckpointableInputFormat && this.currentSplit != null) {
-				S formatState = this.isSplitOpen ?
-					(S) ((CheckpointableInputFormat) format).getCurrentState() :
-					restoredFormatState;
-				return new Tuple3<>(snapshot, currentSplit, formatState);
+			if (this.currentSplit != null) {
+				if (this.format instanceof CheckpointableInputFormat) {
+					@SuppressWarnings("unchecked")
+					CheckpointableInputFormat<FileInputSplit, S> checkpointableFormat =
+							(CheckpointableInputFormat<FileInputSplit, S>) this.format;
+
+					S formatState = this.isSplitOpen ?
+							checkpointableFormat.getCurrentState() :
+							restoredFormatState;
+					return new Tuple3<>(snapshot, currentSplit, formatState);
+				} else {
+					LOG.info("The format used is not checkpointable. The current input split will be restarted upon recovery.");
+					return new Tuple3<>(snapshot, currentSplit, null);
+				}
 			} else {
-				LOG.info("The format used is not checkpointable. The current input split will be restarted upon recovery.");
-				return new Tuple3<>(snapshot, currentSplit, null);
+				return new Tuple3<>(snapshot, null, null);
 			}
 		}