You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/06/30 13:33:00 UTC
[1/3] flink git commit: Revert recent change to WrapperSetupHelperTest
Repository: flink
Updated Branches:
refs/heads/master 5709bf69f -> 6c6b17b4d
Revert recent change to WrapperSetupHelperTest
@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"}) was changed
to @PowerMockIgnore("javax.*"). This broke builds on Travis.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a9733a9a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a9733a9a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a9733a9a
Branch: refs/heads/master
Commit: a9733a9ae0b063ab51b5683f1d80c6e10ae2aefe
Parents: bd273a8
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Jun 30 10:14:55 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Jun 30 14:46:26 2016 +0200
----------------------------------------------------------------------
.../org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a9733a9a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
index f37b547..82b12d6 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
@@ -53,7 +53,7 @@ import static org.mockito.Mockito.when;
@RunWith(PowerMockRunner.class)
@PrepareForTest(WrapperSetupHelper.class)
-@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
+@PowerMockIgnore("javax.*")
public class WrapperSetupHelperTest extends AbstractTest {
@Test
[2/3] flink git commit: [FLINK-4075] Fix unstable
ContinuousFileProcessingCheckpointITCase
Posted by al...@apache.org.
[FLINK-4075] Fix unstable ContinuousFileProcessingCheckpointITCase
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bd273a8f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bd273a8f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bd273a8f
Branch: refs/heads/master
Commit: bd273a8f435b222eb67840fb39b854ec9ef8602f
Parents: 5709bf6
Author: kl0u <kk...@gmail.com>
Authored: Fri Jun 24 15:01:44 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Jun 30 14:46:26 2016 +0200
----------------------------------------------------------------------
.../hdfstests/ContinuousFileMonitoringTest.java | 6 +-
.../ContinuousFileMonitoringFunction.java | 105 +++++++++----------
.../source/ContinuousFileReaderOperator.java | 27 +++--
.../source/InputFormatSourceFunction.java | 6 +-
...ontinuousFileProcessingCheckpointITCase.java | 2 +
5 files changed, 78 insertions(+), 68 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/bd273a8f/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
index 87567e3..def9378 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
@@ -166,9 +166,9 @@ public class ContinuousFileMonitoringTest {
content.add(element.getValue() +"\n");
}
- Assert.assertEquals(actualFileContents.size(), expectedFileContents.size());
+ Assert.assertEquals(expectedFileContents.size(), actualFileContents.size());
for (Integer fileIdx: expectedFileContents.keySet()) {
- Assert.assertTrue(actualFileContents.keySet().contains(fileIdx));
+ Assert.assertTrue("file" + fileIdx + " not found", actualFileContents.keySet().contains(fileIdx));
List<String> cntnt = actualFileContents.get(fileIdx);
Collections.sort(cntnt, new Comparator<String>() {
@@ -182,7 +182,7 @@ public class ContinuousFileMonitoringTest {
for (String line: cntnt) {
cntntStr.append(line);
}
- Assert.assertEquals(cntntStr.toString(), expectedFileContents.get(fileIdx));
+ Assert.assertEquals(expectedFileContents.get(fileIdx), cntntStr.toString());
}
for(org.apache.hadoop.fs.Path file: filesCreated) {
http://git-wip-us.apache.org/repos/asf/flink/blob/bd273a8f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
index b97c274..8ff4a2a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
@@ -19,7 +19,6 @@ package org.apache.flink.streaming.api.functions.source;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.FileStatus;
@@ -53,7 +52,7 @@ import java.util.Map;
*/
@Internal
public class ContinuousFileMonitoringFunction<OUT>
- extends RichSourceFunction<FileInputSplit> implements Checkpointed<Tuple3<List<Tuple2<Long, List<FileInputSplit>>>, Tuple2<Long, List<FileInputSplit>>, Long>> {
+ extends RichSourceFunction<FileInputSplit> implements Checkpointed<Long> {
private static final long serialVersionUID = 1L;
@@ -80,14 +79,12 @@ public class ContinuousFileMonitoringFunction<OUT>
/** Which new data to process (see {@link FileProcessingMode}. */
private final FileProcessingMode watchType;
- private List<Tuple2<Long, List<FileInputSplit>>> splitsToFwdOrderedAscByModTime;
-
- private Tuple2<Long, List<FileInputSplit>> currentSplitsToFwd;
-
- private long globalModificationTime;
+ private Long globalModificationTime;
private FilePathFilter pathFilter;
+ private transient Object checkpointLock;
+
private volatile boolean isRunning = true;
public ContinuousFileMonitoringFunction(
@@ -113,7 +110,7 @@ public class ContinuousFileMonitoringFunction<OUT>
@SuppressWarnings("unchecked")
public void open(Configuration parameters) throws Exception {
LOG.info("Opening File Monitoring Source.");
-
+
super.open(parameters);
format.configure(parameters);
}
@@ -122,17 +119,28 @@ public class ContinuousFileMonitoringFunction<OUT>
public void run(SourceFunction.SourceContext<FileInputSplit> context) throws Exception {
FileSystem fileSystem = FileSystem.get(new URI(path));
+ checkpointLock = context.getCheckpointLock();
switch (watchType) {
case PROCESS_CONTINUOUSLY:
while (isRunning) {
- monitorDirAndForwardSplits(fileSystem, context);
+ synchronized (checkpointLock) {
+ monitorDirAndForwardSplits(fileSystem, context);
+ }
Thread.sleep(interval);
}
- isRunning = false;
+
+ // here we do not need to set the running to false and the
+ // globalModificationTime to Long.MAX_VALUE because to arrive here,
+ // either close() or cancel() have already been called, so this
+ // is already done.
+
break;
case PROCESS_ONCE:
- monitorDirAndForwardSplits(fileSystem, context);
- isRunning = false;
+ synchronized (checkpointLock) {
+ monitorDirAndForwardSplits(fileSystem, context);
+ globalModificationTime = Long.MAX_VALUE;
+ isRunning = false;
+ }
break;
default:
isRunning = false;
@@ -141,41 +149,22 @@ public class ContinuousFileMonitoringFunction<OUT>
}
private void monitorDirAndForwardSplits(FileSystem fs, SourceContext<FileInputSplit> context) throws IOException, JobException {
- final Object lock = context.getCheckpointLock();
+ assert (Thread.holdsLock(checkpointLock));
- // it may be non-null in the case of a recovery after a failure.
- if (currentSplitsToFwd != null) {
- synchronized (lock) {
- forwardSplits(currentSplitsToFwd, context);
- }
- }
- currentSplitsToFwd = null;
-
- // it may be non-null in the case of a recovery after a failure.
- if (splitsToFwdOrderedAscByModTime == null) {
- splitsToFwdOrderedAscByModTime = getInputSplitSortedOnModTime(fs);
- }
-
- Iterator<Tuple2<Long, List<FileInputSplit>>> it =
- splitsToFwdOrderedAscByModTime.iterator();
+ List<Tuple2<Long, List<FileInputSplit>>> splitsByModTime = getInputSplitSortedOnModTime(fs);
+ Iterator<Tuple2<Long, List<FileInputSplit>>> it = splitsByModTime.iterator();
while (it.hasNext()) {
- synchronized (lock) {
- currentSplitsToFwd = it.next();
- it.remove();
- forwardSplits(currentSplitsToFwd, context);
- }
+ forwardSplits(it.next(), context);
+ it.remove();
}
-
- // set them to null to distinguish from a restore.
- splitsToFwdOrderedAscByModTime = null;
- currentSplitsToFwd = null;
}
private void forwardSplits(Tuple2<Long, List<FileInputSplit>> splitsToFwd, SourceContext<FileInputSplit> context) {
- currentSplitsToFwd = splitsToFwd;
- Long modTime = currentSplitsToFwd.f0;
- List<FileInputSplit> splits = currentSplitsToFwd.f1;
+ assert (Thread.holdsLock(checkpointLock));
+
+ Long modTime = splitsToFwd.f0;
+ List<FileInputSplit> splits = splitsToFwd.f1;
Iterator<FileInputSplit> it = splits.iterator();
while (it.hasNext()) {
@@ -284,6 +273,7 @@ public class ContinuousFileMonitoringFunction<OUT>
* is the time of the most recent modification found in any of the already processed files.
*/
private boolean shouldIgnore(Path filePath, long modificationTime) {
+ assert (Thread.holdsLock(checkpointLock));
boolean shouldIgnore = ((pathFilter != null && pathFilter.filterPath(filePath)) || modificationTime <= globalModificationTime);
if (shouldIgnore) {
LOG.debug("Ignoring " + filePath + ", with mod time= " + modificationTime + " and global mod time= " + globalModificationTime);
@@ -294,35 +284,36 @@ public class ContinuousFileMonitoringFunction<OUT>
@Override
public void close() throws Exception {
super.close();
- isRunning = false;
+ synchronized (checkpointLock) {
+ globalModificationTime = Long.MAX_VALUE;
+ isRunning = false;
+ }
LOG.info("Closed File Monitoring Source.");
}
@Override
public void cancel() {
- isRunning = false;
+ if (checkpointLock != null) {
+ // this is to cover the case where cancel() is called before the run()
+ synchronized (checkpointLock) {
+ globalModificationTime = Long.MAX_VALUE;
+ isRunning = false;
+ }
+ } else {
+ globalModificationTime = Long.MAX_VALUE;
+ isRunning = false;
+ }
}
// --------------------- Checkpointing --------------------------
@Override
- public Tuple3<List<Tuple2<Long, List<FileInputSplit>>>, Tuple2<Long, List<FileInputSplit>>, Long> snapshotState(
- long checkpointId, long checkpointTimestamp) throws Exception {
-
- if (!isRunning) {
- LOG.debug("snapshotState() called on closed source");
- return null;
- }
- return new Tuple3<>(splitsToFwdOrderedAscByModTime,
- currentSplitsToFwd, globalModificationTime);
+ public Long snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+ return globalModificationTime;
}
@Override
- public void restoreState(Tuple3<List<Tuple2<Long, List<FileInputSplit>>>,
- Tuple2<Long, List<FileInputSplit>>, Long> state) throws Exception {
-
- this.splitsToFwdOrderedAscByModTime = state.f0;
- this.currentSplitsToFwd = state.f1;
- this.globalModificationTime = state.f2;
+ public void restoreState(Long state) throws Exception {
+ this.globalModificationTime = state;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bd273a8f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index 455c753..1c2da34 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -104,7 +104,14 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
this.collector = new TimestampedCollector<>(output);
this.checkpointLock = getContainingTask().getCheckpointLock();
+ Preconditions.checkState(reader == null, "The reader is already initialized.");
+
this.reader = new SplitReader<>(format, serializer, collector, checkpointLock, readerState);
+
+ // the readerState is needed for the initialization of the reader
+ // when recovering from a failure. So after the initialization,
+ // we can set it to null.
+ this.readerState = null;
this.reader.start();
}
@@ -191,7 +198,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
private volatile boolean isSplitOpen = false;
- SplitReader(FileInputFormat<OT> format,
+ private SplitReader(FileInputFormat<OT> format,
TypeSerializer<OT> serializer,
TimestampedCollector<OT> collector,
Object checkpointLock,
@@ -212,18 +219,19 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
S formatState = restoredState.f2;
for (FileInputSplit split : pending) {
+ Preconditions.checkArgument(!pendingSplits.contains(split), "Duplicate split entry to read: " + split + ".");
pendingSplits.add(split);
}
this.currentSplit = current;
this.restoredFormatState = formatState;
}
- ContinuousFileReaderOperator.this.readerState = null;
}
- void addSplit(FileInputSplit split) {
+ private void addSplit(FileInputSplit split) {
Preconditions.checkNotNull(split);
synchronized (checkpointLock) {
+ Preconditions.checkArgument(!pendingSplits.contains(split), "Duplicate split entry to read: " + split + ".");
this.pendingSplits.add(split);
}
}
@@ -323,7 +331,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
}
}
- Tuple3<List<FileInputSplit>, FileInputSplit, S> getReaderState() throws IOException {
+ private Tuple3<List<FileInputSplit>, FileInputSplit, S> getReaderState() throws IOException {
List<FileInputSplit> snapshot = new ArrayList<>(this.pendingSplits.size());
for (FileInputSplit split: this.pendingSplits) {
snapshot.add(split);
@@ -334,9 +342,11 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
this.pendingSplits.remove();
}
- if (this.format instanceof CheckpointableInputFormat && this.isSplitOpen) {
- S formatState = (S) ((CheckpointableInputFormat) format).getCurrentState();
- return new Tuple3<>(snapshot, currentSplit, currentSplit == null ? null : formatState);
+ if (this.format instanceof CheckpointableInputFormat && this.currentSplit != null) {
+ S formatState = this.isSplitOpen ?
+ (S) ((CheckpointableInputFormat) format).getCurrentState() :
+ restoredFormatState;
+ return new Tuple3<>(snapshot, currentSplit, formatState);
} else {
LOG.info("The format used is not checkpointable. The current input split will be restarted upon recovery.");
return new Tuple3<>(snapshot, currentSplit, null);
@@ -405,6 +415,9 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
S formatState = (S) ois.readObject();
// set the whole reader state for the open() to find.
+ Preconditions.checkState(this.readerState == null,
+ "The reader state has already been initialized.");
+
this.readerState = new Tuple3<>(pendingSplits, currSplit, formatState);
div.close();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bd273a8f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java
index f35cbba..e3e5c54 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java
@@ -85,7 +85,11 @@ public class InputFormatSourceFunction<OUT> extends RichParallelSourceFunction<O
while (isRunning && !format.reachedEnd()) {
nextElement = format.nextRecord(nextElement);
- ctx.collect(nextElement);
+ if (nextElement != null) {
+ ctx.collect(nextElement);
+ } else {
+ break;
+ }
}
format.close();
completedSplitsCounter.inc();
http://git-wip-us.apache.org/repos/asf/flink/blob/bd273a8f/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
index 4c0f648..d540a92 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
@@ -219,6 +219,8 @@ public class ContinuousFileProcessingCheckpointITCase extends StreamFaultToleran
stream.write(line.getBytes());
}
stream.close();
+
+ Assert.assertTrue("Result file present", !fs.exists(file));
fs.rename(tmp, file);
Assert.assertTrue("No result file present", fs.exists(file));
return new Tuple2<>(file, str.toString());
[3/3] flink git commit: Remove polluting log message in
ContinuousFileReaderOperator
Posted by al...@apache.org.
Remove polluting log message in ContinuousFileReaderOperator
Before, when snapshotting, we printed a log message about the file
input format not being checkpointable when the current split was
"null". Now, we only print the message when when appropriate.
This closes #2174
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6c6b17b4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6c6b17b4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6c6b17b4
Branch: refs/heads/master
Commit: 6c6b17b4d47d281b0e5dcf4413fd1ad53ce49eee
Parents: a9733a9
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Jun 30 11:46:52 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Jun 30 15:31:10 2016 +0200
----------------------------------------------------------------------
.../source/ContinuousFileReaderOperator.java | 33 ++++++++++++++------
1 file changed, 24 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6c6b17b4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index 1c2da34..0daa7ad 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -65,6 +65,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends AbstractStreamOperator<OUT>
implements OneInputStreamOperator<FileInputSplit, OUT>, OutputTypeConfigurable<OUT> {
+ private static final long serialVersionUID = 1L;
+
private static final Logger LOG = LoggerFactory.getLogger(ContinuousFileReaderOperator.class);
private static final FileInputSplit EOS = new FileInputSplit(-1, null, -1, -1, null);
@@ -75,7 +77,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
private FileInputFormat<OUT> format;
private TypeSerializer<OUT> serializer;
- private Object checkpointLock;
+ private transient Object checkpointLock;
private Tuple3<List<FileInputSplit>, FileInputSplit, S> readerState;
@@ -259,7 +261,12 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
}
if (this.format instanceof CheckpointableInputFormat && restoredFormatState != null) {
- ((CheckpointableInputFormat) format).reopen(currentSplit, restoredFormatState);
+
+ @SuppressWarnings("unchecked")
+ CheckpointableInputFormat<FileInputSplit, S> checkpointableFormat =
+ (CheckpointableInputFormat<FileInputSplit, S>) this.format;
+
+ checkpointableFormat.reopen(currentSplit, restoredFormatState);
} else {
// this is the case of a non-checkpointable input format that will reprocess the last split.
LOG.info("Format " + this.format.getClass().getName() + " used is not checkpointable.");
@@ -342,14 +349,22 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
this.pendingSplits.remove();
}
- if (this.format instanceof CheckpointableInputFormat && this.currentSplit != null) {
- S formatState = this.isSplitOpen ?
- (S) ((CheckpointableInputFormat) format).getCurrentState() :
- restoredFormatState;
- return new Tuple3<>(snapshot, currentSplit, formatState);
+ if (this.currentSplit != null) {
+ if (this.format instanceof CheckpointableInputFormat) {
+ @SuppressWarnings("unchecked")
+ CheckpointableInputFormat<FileInputSplit, S> checkpointableFormat =
+ (CheckpointableInputFormat<FileInputSplit, S>) this.format;
+
+ S formatState = this.isSplitOpen ?
+ checkpointableFormat.getCurrentState() :
+ restoredFormatState;
+ return new Tuple3<>(snapshot, currentSplit, formatState);
+ } else {
+ LOG.info("The format used is not checkpointable. The current input split will be restarted upon recovery.");
+ return new Tuple3<>(snapshot, currentSplit, null);
+ }
} else {
- LOG.info("The format used is not checkpointable. The current input split will be restarted upon recovery.");
- return new Tuple3<>(snapshot, currentSplit, null);
+ return new Tuple3<>(snapshot, null, null);
}
}