You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/06/30 13:33:00 UTC

[1/3] flink git commit: Revert recent change to WrapperSetupHelperTest

Repository: flink
Updated Branches:
  refs/heads/master 5709bf69f -> 6c6b17b4d


Revert recent change to WrapperSetupHelperTest

@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"}) was changed
to @PowerMockIgnore("javax.*"). This broke builds on Travis.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a9733a9a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a9733a9a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a9733a9a

Branch: refs/heads/master
Commit: a9733a9ae0b063ab51b5683f1d80c6e10ae2aefe
Parents: bd273a8
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Jun 30 10:14:55 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Jun 30 14:46:26 2016 +0200

----------------------------------------------------------------------
 .../org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a9733a9a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
index f37b547..82b12d6 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
@@ -53,7 +53,7 @@ import static org.mockito.Mockito.when;
 
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(WrapperSetupHelper.class)
-@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
+@PowerMockIgnore("javax.*")
 public class WrapperSetupHelperTest extends AbstractTest {
 
 	@Test


[2/3] flink git commit: [FLINK-4075] Fix unstable ContinuousFileProcessingCheckpointITCase

Posted by al...@apache.org.
[FLINK-4075] Fix unstable ContinuousFileProcessingCheckpointITCase


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bd273a8f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bd273a8f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bd273a8f

Branch: refs/heads/master
Commit: bd273a8f435b222eb67840fb39b854ec9ef8602f
Parents: 5709bf6
Author: kl0u <kk...@gmail.com>
Authored: Fri Jun 24 15:01:44 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Jun 30 14:46:26 2016 +0200

----------------------------------------------------------------------
 .../hdfstests/ContinuousFileMonitoringTest.java |   6 +-
 .../ContinuousFileMonitoringFunction.java       | 105 +++++++++----------
 .../source/ContinuousFileReaderOperator.java    |  27 +++--
 .../source/InputFormatSourceFunction.java       |   6 +-
 ...ontinuousFileProcessingCheckpointITCase.java |   2 +
 5 files changed, 78 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bd273a8f/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
index 87567e3..def9378 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
@@ -166,9 +166,9 @@ public class ContinuousFileMonitoringTest {
 			content.add(element.getValue() +"\n");
 		}
 
-		Assert.assertEquals(actualFileContents.size(), expectedFileContents.size());
+		Assert.assertEquals(expectedFileContents.size(), actualFileContents.size());
 		for (Integer fileIdx: expectedFileContents.keySet()) {
-			Assert.assertTrue(actualFileContents.keySet().contains(fileIdx));
+			Assert.assertTrue("file" + fileIdx + " not found", actualFileContents.keySet().contains(fileIdx));
 
 			List<String> cntnt = actualFileContents.get(fileIdx);
 			Collections.sort(cntnt, new Comparator<String>() {
@@ -182,7 +182,7 @@ public class ContinuousFileMonitoringTest {
 			for (String line: cntnt) {
 				cntntStr.append(line);
 			}
-			Assert.assertEquals(cntntStr.toString(), expectedFileContents.get(fileIdx));
+			Assert.assertEquals(expectedFileContents.get(fileIdx), cntntStr.toString());
 		}
 
 		for(org.apache.hadoop.fs.Path file: filesCreated) {

http://git-wip-us.apache.org/repos/asf/flink/blob/bd273a8f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
index b97c274..8ff4a2a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
@@ -19,7 +19,6 @@ package org.apache.flink.streaming.api.functions.source;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.io.FileInputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.FileStatus;
@@ -53,7 +52,7 @@ import java.util.Map;
  */
 @Internal
 public class ContinuousFileMonitoringFunction<OUT>
-	extends RichSourceFunction<FileInputSplit> implements Checkpointed<Tuple3<List<Tuple2<Long, List<FileInputSplit>>>, Tuple2<Long, List<FileInputSplit>>, Long>> {
+	extends RichSourceFunction<FileInputSplit> implements Checkpointed<Long> {
 
 	private static final long serialVersionUID = 1L;
 
@@ -80,14 +79,12 @@ public class ContinuousFileMonitoringFunction<OUT>
 	/** Which new data to process (see {@link FileProcessingMode}. */
 	private final FileProcessingMode watchType;
 
-	private List<Tuple2<Long, List<FileInputSplit>>> splitsToFwdOrderedAscByModTime;
-
-	private Tuple2<Long, List<FileInputSplit>> currentSplitsToFwd;
-
-	private long globalModificationTime;
+	private Long globalModificationTime;
 
 	private FilePathFilter pathFilter;
 
+	private transient Object checkpointLock;
+
 	private volatile boolean isRunning = true;
 
 	public ContinuousFileMonitoringFunction(
@@ -113,7 +110,7 @@ public class ContinuousFileMonitoringFunction<OUT>
 	@SuppressWarnings("unchecked")
 	public void open(Configuration parameters) throws Exception {
 		LOG.info("Opening File Monitoring Source.");
-		
+
 		super.open(parameters);
 		format.configure(parameters);
 	}
@@ -122,17 +119,28 @@ public class ContinuousFileMonitoringFunction<OUT>
 	public void run(SourceFunction.SourceContext<FileInputSplit> context) throws Exception {
 		FileSystem fileSystem = FileSystem.get(new URI(path));
 
+		checkpointLock = context.getCheckpointLock();
 		switch (watchType) {
 			case PROCESS_CONTINUOUSLY:
 				while (isRunning) {
-					monitorDirAndForwardSplits(fileSystem, context);
+					synchronized (checkpointLock) {
+						monitorDirAndForwardSplits(fileSystem, context);
+					}
 					Thread.sleep(interval);
 				}
-				isRunning = false;
+
+				// here we do not need to set the running to false and the
+				// globalModificationTime to Long.MAX_VALUE because to arrive here,
+				// either close() or cancel() have already been called, so this
+				// is already done.
+
 				break;
 			case PROCESS_ONCE:
-				monitorDirAndForwardSplits(fileSystem, context);
-				isRunning = false;
+				synchronized (checkpointLock) {
+					monitorDirAndForwardSplits(fileSystem, context);
+					globalModificationTime = Long.MAX_VALUE;
+					isRunning = false;
+				}
 				break;
 			default:
 				isRunning = false;
@@ -141,41 +149,22 @@ public class ContinuousFileMonitoringFunction<OUT>
 	}
 
 	private void monitorDirAndForwardSplits(FileSystem fs, SourceContext<FileInputSplit> context) throws IOException, JobException {
-		final Object lock = context.getCheckpointLock();
+		assert (Thread.holdsLock(checkpointLock));
 
-		// it may be non-null in the case of a recovery after a failure.
-		if (currentSplitsToFwd != null) {
-			synchronized (lock) {
-				forwardSplits(currentSplitsToFwd, context);
-			}
-		}
-		currentSplitsToFwd = null;
-
-		// it may be non-null in the case of a recovery after a failure.
-		if (splitsToFwdOrderedAscByModTime == null) {
-			splitsToFwdOrderedAscByModTime = getInputSplitSortedOnModTime(fs);
-		}
-
-		Iterator<Tuple2<Long, List<FileInputSplit>>> it =
-			splitsToFwdOrderedAscByModTime.iterator();
+		List<Tuple2<Long, List<FileInputSplit>>> splitsByModTime = getInputSplitSortedOnModTime(fs);
 
+		Iterator<Tuple2<Long, List<FileInputSplit>>> it = splitsByModTime.iterator();
 		while (it.hasNext()) {
-			synchronized (lock) {
-				currentSplitsToFwd = it.next();
-				it.remove();
-				forwardSplits(currentSplitsToFwd, context);
-			}
+			forwardSplits(it.next(), context);
+			it.remove();
 		}
-
-		// set them to null to distinguish from a restore.
-		splitsToFwdOrderedAscByModTime = null;
-		currentSplitsToFwd = null;
 	}
 
 	private void forwardSplits(Tuple2<Long, List<FileInputSplit>> splitsToFwd, SourceContext<FileInputSplit> context) {
-		currentSplitsToFwd = splitsToFwd;
-		Long modTime = currentSplitsToFwd.f0;
-		List<FileInputSplit> splits = currentSplitsToFwd.f1;
+		assert (Thread.holdsLock(checkpointLock));
+
+		Long modTime = splitsToFwd.f0;
+		List<FileInputSplit> splits = splitsToFwd.f1;
 
 		Iterator<FileInputSplit> it = splits.iterator();
 		while (it.hasNext()) {
@@ -284,6 +273,7 @@ public class ContinuousFileMonitoringFunction<OUT>
 	 * is the time of the most recent modification found in any of the already processed files.
 	 */
 	private boolean shouldIgnore(Path filePath, long modificationTime) {
+		assert (Thread.holdsLock(checkpointLock));
 		boolean shouldIgnore = ((pathFilter != null && pathFilter.filterPath(filePath)) || modificationTime <= globalModificationTime);
 		if (shouldIgnore) {
 			LOG.debug("Ignoring " + filePath + ", with mod time= " + modificationTime + " and global mod time= " + globalModificationTime);
@@ -294,35 +284,36 @@ public class ContinuousFileMonitoringFunction<OUT>
 	@Override
 	public void close() throws Exception {
 		super.close();
-		isRunning = false;
+		synchronized (checkpointLock) {
+			globalModificationTime = Long.MAX_VALUE;
+			isRunning = false;
+		}
 		LOG.info("Closed File Monitoring Source.");
 	}
 
 	@Override
 	public void cancel() {
-		isRunning = false;
+		if (checkpointLock != null) {
+			// this is to cover the case where cancel() is called before the run()
+			synchronized (checkpointLock) {
+				globalModificationTime = Long.MAX_VALUE;
+				isRunning = false;
+			}
+		} else {
+			globalModificationTime = Long.MAX_VALUE;
+			isRunning = false;
+		}
 	}
 
 	//	---------------------			Checkpointing			--------------------------
 
 	@Override
-	public Tuple3<List<Tuple2<Long, List<FileInputSplit>>>, Tuple2<Long, List<FileInputSplit>>, Long> snapshotState(
-		long checkpointId, long checkpointTimestamp) throws Exception {
-
-		if (!isRunning) {
-			LOG.debug("snapshotState() called on closed source");
-			return null;
-		}
-		return new Tuple3<>(splitsToFwdOrderedAscByModTime,
-			currentSplitsToFwd, globalModificationTime);
+	public Long snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+		return globalModificationTime;
 	}
 
 	@Override
-	public void restoreState(Tuple3<List<Tuple2<Long, List<FileInputSplit>>>,
-		Tuple2<Long, List<FileInputSplit>>, Long> state) throws Exception {
-
-		this.splitsToFwdOrderedAscByModTime = state.f0;
-		this.currentSplitsToFwd = state.f1;
-		this.globalModificationTime = state.f2;
+	public void restoreState(Long state) throws Exception {
+		this.globalModificationTime = state;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bd273a8f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index 455c753..1c2da34 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -104,7 +104,14 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
 		this.collector = new TimestampedCollector<>(output);
 		this.checkpointLock = getContainingTask().getCheckpointLock();
 
+		Preconditions.checkState(reader == null, "The reader is already initialized.");
+
 		this.reader = new SplitReader<>(format, serializer, collector, checkpointLock, readerState);
+
+		// the readerState is needed for the initialization of the reader
+		// when recovering from a failure. So after the initialization,
+		// we can set it to null.
+		this.readerState = null;
 		this.reader.start();
 	}
 
@@ -191,7 +198,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
 
 		private volatile boolean isSplitOpen = false;
 
-		SplitReader(FileInputFormat<OT> format,
+		private SplitReader(FileInputFormat<OT> format,
 					TypeSerializer<OT> serializer,
 					TimestampedCollector<OT> collector,
 					Object checkpointLock,
@@ -212,18 +219,19 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
 				S formatState = restoredState.f2;
 
 				for (FileInputSplit split : pending) {
+					Preconditions.checkArgument(!pendingSplits.contains(split), "Duplicate split entry to read: " + split + ".");
 					pendingSplits.add(split);
 				}
 
 				this.currentSplit = current;
 				this.restoredFormatState = formatState;
 			}
-			ContinuousFileReaderOperator.this.readerState = null;
 		}
 
-		void addSplit(FileInputSplit split) {
+		private void addSplit(FileInputSplit split) {
 			Preconditions.checkNotNull(split);
 			synchronized (checkpointLock) {
+				Preconditions.checkArgument(!pendingSplits.contains(split), "Duplicate split entry to read: " + split + ".");
 				this.pendingSplits.add(split);
 			}
 		}
@@ -323,7 +331,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
 			}
 		}
 
-		Tuple3<List<FileInputSplit>, FileInputSplit, S> getReaderState() throws IOException {
+		private Tuple3<List<FileInputSplit>, FileInputSplit, S> getReaderState() throws IOException {
 			List<FileInputSplit> snapshot = new ArrayList<>(this.pendingSplits.size());
 			for (FileInputSplit split: this.pendingSplits) {
 				snapshot.add(split);
@@ -334,9 +342,11 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
 				this.pendingSplits.remove();
 			}
 
-			if (this.format instanceof CheckpointableInputFormat && this.isSplitOpen) {
-				S formatState = (S) ((CheckpointableInputFormat) format).getCurrentState();
-				return new Tuple3<>(snapshot, currentSplit, currentSplit == null ? null : formatState);
+			if (this.format instanceof CheckpointableInputFormat && this.currentSplit != null) {
+				S formatState = this.isSplitOpen ?
+					(S) ((CheckpointableInputFormat) format).getCurrentState() :
+					restoredFormatState;
+				return new Tuple3<>(snapshot, currentSplit, formatState);
 			} else {
 				LOG.info("The format used is not checkpointable. The current input split will be restarted upon recovery.");
 				return new Tuple3<>(snapshot, currentSplit, null);
@@ -405,6 +415,9 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
 		S formatState = (S) ois.readObject();
 
 		// set the whole reader state for the open() to find.
+		Preconditions.checkState(this.readerState == null,
+			"The reader state has already been initialized.");
+
 		this.readerState = new Tuple3<>(pendingSplits, currSplit, formatState);
 		div.close();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/bd273a8f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java
index f35cbba..e3e5c54 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java
@@ -85,7 +85,11 @@ public class InputFormatSourceFunction<OUT> extends RichParallelSourceFunction<O
 				
 				while (isRunning && !format.reachedEnd()) {
 					nextElement = format.nextRecord(nextElement);
-					ctx.collect(nextElement);
+					if (nextElement != null) {
+						ctx.collect(nextElement);
+					} else {
+						break;
+					}
 				}
 				format.close();
 				completedSplitsCounter.inc();

http://git-wip-us.apache.org/repos/asf/flink/blob/bd273a8f/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
index 4c0f648..d540a92 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
@@ -219,6 +219,8 @@ public class ContinuousFileProcessingCheckpointITCase extends StreamFaultToleran
 			stream.write(line.getBytes());
 		}
 		stream.close();
+
+		Assert.assertTrue("Result file present", !fs.exists(file));
 		fs.rename(tmp, file);
 		Assert.assertTrue("No result file present", fs.exists(file));
 		return new Tuple2<>(file, str.toString());


[3/3] flink git commit: Remove polluting log message in ContinuousFileReaderOperator

Posted by al...@apache.org.
Remove polluting log message in ContinuousFileReaderOperator

Before, when snapshotting, we printed a log message about the file
input format not being checkpointable when the current split was
"null". Now, we only print the message when when appropriate.

This closes #2174


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6c6b17b4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6c6b17b4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6c6b17b4

Branch: refs/heads/master
Commit: 6c6b17b4d47d281b0e5dcf4413fd1ad53ce49eee
Parents: a9733a9
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Jun 30 11:46:52 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Jun 30 15:31:10 2016 +0200

----------------------------------------------------------------------
 .../source/ContinuousFileReaderOperator.java    | 33 ++++++++++++++------
 1 file changed, 24 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6c6b17b4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index 1c2da34..0daa7ad 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -65,6 +65,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends AbstractStreamOperator<OUT>
 	implements OneInputStreamOperator<FileInputSplit, OUT>, OutputTypeConfigurable<OUT> {
 
+	private static final long serialVersionUID = 1L;
+
 	private static final Logger LOG = LoggerFactory.getLogger(ContinuousFileReaderOperator.class);
 
 	private static final FileInputSplit EOS = new FileInputSplit(-1, null, -1, -1, null);
@@ -75,7 +77,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
 	private FileInputFormat<OUT> format;
 	private TypeSerializer<OUT> serializer;
 
-	private Object checkpointLock;
+	private transient Object checkpointLock;
 
 	private Tuple3<List<FileInputSplit>, FileInputSplit, S> readerState;
 
@@ -259,7 +261,12 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
 							}
 
 							if (this.format instanceof CheckpointableInputFormat && restoredFormatState != null) {
-								((CheckpointableInputFormat) format).reopen(currentSplit, restoredFormatState);
+
+								@SuppressWarnings("unchecked")
+								CheckpointableInputFormat<FileInputSplit, S> checkpointableFormat =
+										(CheckpointableInputFormat<FileInputSplit, S>) this.format;
+
+								checkpointableFormat.reopen(currentSplit, restoredFormatState);
 							} else {
 								// this is the case of a non-checkpointable input format that will reprocess the last split.
 								LOG.info("Format " + this.format.getClass().getName() + " used is not checkpointable.");
@@ -342,14 +349,22 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
 				this.pendingSplits.remove();
 			}
 
-			if (this.format instanceof CheckpointableInputFormat && this.currentSplit != null) {
-				S formatState = this.isSplitOpen ?
-					(S) ((CheckpointableInputFormat) format).getCurrentState() :
-					restoredFormatState;
-				return new Tuple3<>(snapshot, currentSplit, formatState);
+			if (this.currentSplit != null) {
+				if (this.format instanceof CheckpointableInputFormat) {
+					@SuppressWarnings("unchecked")
+					CheckpointableInputFormat<FileInputSplit, S> checkpointableFormat =
+							(CheckpointableInputFormat<FileInputSplit, S>) this.format;
+
+					S formatState = this.isSplitOpen ?
+							checkpointableFormat.getCurrentState() :
+							restoredFormatState;
+					return new Tuple3<>(snapshot, currentSplit, formatState);
+				} else {
+					LOG.info("The format used is not checkpointable. The current input split will be restarted upon recovery.");
+					return new Tuple3<>(snapshot, currentSplit, null);
+				}
 			} else {
-				LOG.info("The format used is not checkpointable. The current input split will be restarted upon recovery.");
-				return new Tuple3<>(snapshot, currentSplit, null);
+				return new Tuple3<>(snapshot, null, null);
 			}
 		}