You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/09/21 19:17:14 UTC

[flink] branch master updated (f7ec186 -> 2da55b8)

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from f7ec186  [FLINK-19301][python] Improve the package structure of Python DataStream API
     new 90d01f7  [FLINK-19161][file connector] Add first version of the FLIP-27 File Source
     new 2da55b8  [hotfix][runtime] Remove commented-out annotation in SourceOperator

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../SingleThreadMultiplexSourceReaderBase.java     |  63 ++-
 .../source/reader/mocks/TestingReaderContext.java  |   4 +-
 flink-connectors/flink-connector-files/pom.xml     |  83 ++++
 .../file/src/ContinuousEnumerationSettings.java    |  55 +++
 .../flink/connector/file/src/FileSource.java       | 437 +++++++++++++++++++++
 .../flink/connector/file/src/FileSourceSplit.java  | 219 +++++++++++
 .../file/src/FileSourceSplitSerializer.java        | 128 ++++++
 .../connector/file/src/FileSourceSplitState.java   | 106 +++++
 .../file/src/PendingSplitsCheckpoint.java          |  93 +++++
 .../src/PendingSplitsCheckpointSerializer.java     | 150 +++++++
 .../file/src/assigners/FileSplitAssigner.java      |  71 ++++
 .../file/src/assigners/SimpleSplitAssigner.java    |  65 +++
 .../src/compression/StandardDeCompressors.java     | 105 +++++
 .../BlockSplittingRecursiveEnumerator.java         | 150 +++++++
 .../file/src/enumerate/DefaultFileFilter.java      |  42 ++
 .../file/src/enumerate/FileEnumerator.java         |  57 +++
 .../enumerate/NonSplittingRecursiveEnumerator.java | 144 +++++++
 .../src/impl/ContinuousFileSplitEnumerator.java    | 162 ++++++++
 .../file/src/impl/FileRecordFormatAdapter.java     | 168 ++++++++
 .../flink/connector/file/src/impl/FileRecords.java | 109 +++++
 .../connector/file/src/impl/FileSourceReader.java  |  72 ++++
 .../file/src/impl/FileSourceRecordEmitter.java     |  47 +++
 .../file/src/impl/FileSourceSplitReader.java       | 116 ++++++
 .../file/src/impl/StaticFileSplitEnumerator.java   | 128 ++++++
 .../file/src/impl/StreamFormatAdapter.java         | 269 +++++++++++++
 .../connector/file/src/reader/BulkFormat.java      | 199 ++++++++++
 .../file/src/reader/FileRecordFormat.java          | 200 ++++++++++
 .../file/src/reader/SimpleStreamFormat.java        | 115 ++++++
 .../connector/file/src/reader/StreamFormat.java    | 219 +++++++++++
 .../connector/file/src/reader/TextLineFormat.java  |  96 +++++
 .../file/src/util/ArrayResultIterator.java         |  90 +++++
 .../file/src/util/CheckpointedPosition.java        | 107 +++++
 .../file/src/util/IteratorResultIterator.java      |  95 +++++
 .../file/src/util/MutableRecordAndPosition.java    |  57 +++
 .../apache/flink/connector/file/src/util/Pool.java | 115 ++++++
 .../connector/file/src/util/RecordAndPosition.java |  90 +++++
 .../file/src/util/RecyclableIterator.java          |  50 +++
 .../file/src/util/SingletonResultIterator.java     |  75 ++++
 .../flink/connector/file/src/util/Utils.java       |  72 ++++
 .../file/src/FileSourceHeavyThroughputTest.java    | 230 +++++++++++
 .../file/src/FileSourceSplitSerializerTest.java    | 128 ++++++
 .../file/src/FileSourceSplitStateTest.java         |  82 ++++
 .../connector/file/src/FileSourceSplitTest.java    |  34 ++
 .../file/src/FileSourceTextLinesITCase.java        | 273 +++++++++++++
 .../src/PendingSplitsCheckpointSerializerTest.java | 150 +++++++
 .../BlockSplittingRecursiveEnumeratorTest.java     |  62 +++
 .../NonSplittingRecursiveEnumeratorTest.java       | 204 ++++++++++
 .../connector/file/src/impl/AdapterTestBase.java   | 284 +++++++++++++
 .../file/src/impl/FileRecordFormatAdapterTest.java | 199 ++++++++++
 .../connector/file/src/impl/FileRecordsTest.java   | 126 ++++++
 .../file/src/impl/StreamFormatAdapterTest.java     | 197 ++++++++++
 .../connector/file/src/impl/TestIntReader.java     |  90 +++++
 .../file/src/testutils/TestingFileSystem.java      | 391 ++++++++++++++++++
 .../file/src/util/ArrayResultIteratorTest.java     |  94 +++++
 .../file/src/util/IteratorResultIteratorTest.java  |  60 +++
 .../file/src/util/SingletonResultIteratorTest.java |  81 ++++
 .../src/test/resources/log4j2-test.properties      |  28 ++
 flink-connectors/pom.xml                           |   1 +
 flink-dist/pom.xml                                 |   6 +
 .../streaming/api/operators/SourceOperator.java    |   6 +-
 60 files changed, 7335 insertions(+), 14 deletions(-)
 create mode 100644 flink-connectors/flink-connector-files/pom.xml
 create mode 100644 flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/ContinuousEnumerationSettings.java
 create mode 100644 flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSource.java
 create mode 100644 flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSourceSplit.java
 create mode 100644 flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSourceSplitSerializer.java
 create mode 100644 flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSourceSplitState.java
 create mode 100644 flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/PendingSplitsCheckpoint.java
 create mode 100644 flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/PendingSplitsCheckpointSerializer.java
 create mode 100644 flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/assigners/FileSplitAssigner.java
 create mode 100644 flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/assigners/SimpleSplitAssigner.java
 create mode 100644 flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/compression/StandardDeCompressors.java
 create mode 100644 flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/BlockSplittingRecursiveEnumerator.java
 create mode 100644 flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/DefaultFileFilter.java
 create mode 100644 flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/FileEnumerator.java
 create mode 100644 flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/NonSplittingRecursiveEnumerator.java
 create mode 100644 flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/ContinuousFileSplitEnumerator.java
 create mode 100644 flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/FileRecordFormatAdapter.java
 create mode 100644 flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/FileRecords.java
 create mode 100644 flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/FileSourceReader.java
 create mode 100644 flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/FileSourceRecordEmitter.java
 create mode 100644 flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/FileSourceSplitReader.java
 create mode 100644 flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/StaticFileSplitEnumerator.java
 create mode 100644 flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/StreamFormatAdapter.java
 create mode 100644 flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/BulkFormat.java
 create mode 100644 flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/FileRecordFormat.java
 create mode 100644 flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/SimpleStreamFormat.java
 create mode 100644 flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/StreamFormat.java
 create mode 100644 flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/TextLineFormat.java
 create mode 100644 flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/ArrayResultIterator.java
 create mode 100644 flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/CheckpointedPosition.java
 create mode 100644 flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/IteratorResultIterator.java
 create mode 100644 flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/MutableRecordAndPosition.java
 create mode 100644 flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/Pool.java
 create mode 100644 flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/RecordAndPosition.java
 create mode 100644 flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/RecyclableIterator.java
 create mode 100644 flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/SingletonResultIterator.java
 create mode 100644 flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/Utils.java
 create mode 100644 flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceHeavyThroughputTest.java
 create mode 100644 flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceSplitSerializerTest.java
 create mode 100644 flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceSplitStateTest.java
 create mode 100644 flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceSplitTest.java
 create mode 100644 flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
 create mode 100644 flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/PendingSplitsCheckpointSerializerTest.java
 create mode 100644 flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/enumerate/BlockSplittingRecursiveEnumeratorTest.java
 create mode 100644 flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/enumerate/NonSplittingRecursiveEnumeratorTest.java
 create mode 100644 flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/AdapterTestBase.java
 create mode 100644 flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/FileRecordFormatAdapterTest.java
 create mode 100644 flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/FileRecordsTest.java
 create mode 100644 flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/StreamFormatAdapterTest.java
 create mode 100644 flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/TestIntReader.java
 create mode 100644 flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/testutils/TestingFileSystem.java
 create mode 100644 flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/util/ArrayResultIteratorTest.java
 create mode 100644 flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/util/IteratorResultIteratorTest.java
 create mode 100644 flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/util/SingletonResultIteratorTest.java
 create mode 100644 flink-connectors/flink-connector-files/src/test/resources/log4j2-test.properties


[flink] 01/02: [FLINK-19161][file connector] Add first version of the FLIP-27 File Source

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 90d01f7409c579313e70fcbfa9e0342e6af50ff8
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Mon Jun 29 11:00:59 2020 +0200

    [FLINK-19161][file connector] Add first version of the FLIP-27 File Source
    
    This closes #13401
---
 .../SingleThreadMultiplexSourceReaderBase.java     |  63 ++-
 .../source/reader/mocks/TestingReaderContext.java  |   4 +-
 flink-connectors/flink-connector-files/pom.xml     |  83 ++++
 .../file/src/ContinuousEnumerationSettings.java    |  55 +++
 .../flink/connector/file/src/FileSource.java       | 437 +++++++++++++++++++++
 .../flink/connector/file/src/FileSourceSplit.java  | 219 +++++++++++
 .../file/src/FileSourceSplitSerializer.java        | 128 ++++++
 .../connector/file/src/FileSourceSplitState.java   | 106 +++++
 .../file/src/PendingSplitsCheckpoint.java          |  93 +++++
 .../src/PendingSplitsCheckpointSerializer.java     | 150 +++++++
 .../file/src/assigners/FileSplitAssigner.java      |  71 ++++
 .../file/src/assigners/SimpleSplitAssigner.java    |  65 +++
 .../src/compression/StandardDeCompressors.java     | 105 +++++
 .../BlockSplittingRecursiveEnumerator.java         | 150 +++++++
 .../file/src/enumerate/DefaultFileFilter.java      |  42 ++
 .../file/src/enumerate/FileEnumerator.java         |  57 +++
 .../enumerate/NonSplittingRecursiveEnumerator.java | 144 +++++++
 .../src/impl/ContinuousFileSplitEnumerator.java    | 162 ++++++++
 .../file/src/impl/FileRecordFormatAdapter.java     | 168 ++++++++
 .../flink/connector/file/src/impl/FileRecords.java | 109 +++++
 .../connector/file/src/impl/FileSourceReader.java  |  72 ++++
 .../file/src/impl/FileSourceRecordEmitter.java     |  47 +++
 .../file/src/impl/FileSourceSplitReader.java       | 116 ++++++
 .../file/src/impl/StaticFileSplitEnumerator.java   | 128 ++++++
 .../file/src/impl/StreamFormatAdapter.java         | 269 +++++++++++++
 .../connector/file/src/reader/BulkFormat.java      | 199 ++++++++++
 .../file/src/reader/FileRecordFormat.java          | 200 ++++++++++
 .../file/src/reader/SimpleStreamFormat.java        | 115 ++++++
 .../connector/file/src/reader/StreamFormat.java    | 219 +++++++++++
 .../connector/file/src/reader/TextLineFormat.java  |  96 +++++
 .../file/src/util/ArrayResultIterator.java         |  90 +++++
 .../file/src/util/CheckpointedPosition.java        | 107 +++++
 .../file/src/util/IteratorResultIterator.java      |  95 +++++
 .../file/src/util/MutableRecordAndPosition.java    |  57 +++
 .../apache/flink/connector/file/src/util/Pool.java | 115 ++++++
 .../connector/file/src/util/RecordAndPosition.java |  90 +++++
 .../file/src/util/RecyclableIterator.java          |  50 +++
 .../file/src/util/SingletonResultIterator.java     |  75 ++++
 .../flink/connector/file/src/util/Utils.java       |  72 ++++
 .../file/src/FileSourceHeavyThroughputTest.java    | 230 +++++++++++
 .../file/src/FileSourceSplitSerializerTest.java    | 128 ++++++
 .../file/src/FileSourceSplitStateTest.java         |  82 ++++
 .../connector/file/src/FileSourceSplitTest.java    |  34 ++
 .../file/src/FileSourceTextLinesITCase.java        | 273 +++++++++++++
 .../src/PendingSplitsCheckpointSerializerTest.java | 150 +++++++
 .../BlockSplittingRecursiveEnumeratorTest.java     |  62 +++
 .../NonSplittingRecursiveEnumeratorTest.java       | 204 ++++++++++
 .../connector/file/src/impl/AdapterTestBase.java   | 284 +++++++++++++
 .../file/src/impl/FileRecordFormatAdapterTest.java | 199 ++++++++++
 .../connector/file/src/impl/FileRecordsTest.java   | 126 ++++++
 .../file/src/impl/StreamFormatAdapterTest.java     | 197 ++++++++++
 .../connector/file/src/impl/TestIntReader.java     |  90 +++++
 .../file/src/testutils/TestingFileSystem.java      | 391 ++++++++++++++++++
 .../file/src/util/ArrayResultIteratorTest.java     |  94 +++++
 .../file/src/util/IteratorResultIteratorTest.java  |  60 +++
 .../file/src/util/SingletonResultIteratorTest.java |  81 ++++
 .../src/test/resources/log4j2-test.properties      |  28 ++
 flink-connectors/pom.xml                           |   1 +
 flink-dist/pom.xml                                 |   6 +
 .../streaming/api/operators/SourceOperator.java    |   5 +-
 60 files changed, 7335 insertions(+), 13 deletions(-)

diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java
index ab87db0..f5806d1 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java
@@ -26,24 +26,67 @@ import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcher
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
 import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
 
+import java.util.Collection;
 import java.util.function.Supplier;
 
 /**
- * A abstract {@link SourceReader} implementation that assign all the splits to a single thread to consume.
- * @param <E>
- * @param <T>
- * @param <SplitT>
- * @param <SplitStateT>
+ * A base for {@link SourceReader}s that read splits with one thread using one {@link SplitReader}.
+ * The splits can be read either one after the other (like in a file source) or concurrently by changing
+ * the subscription in the split reader (like in the Kafka Source).
+ *
+ * <p>To implement a source reader based on this class, implementors need to supply the following:
+ * <ul>
+ *   <li>A {@link SplitReader}, which connects to the source and reads/polls data. The split reader
+ *       gets notified whenever there is a new split. The split reader would read files, contain a
+ *       Kafka or other source client, etc.</li>
+ *   <li>A {@link RecordEmitter} that takes a record from the Split Reader and updates the checkpointing
+ *       state and converts it into the final form. For example for Kafka, the Record Emitter takes a
+ *       {@code ConsumerRecord}, puts the offset information into state, transforms the records with the
+ *       deserializers into the final type, and emits the record.</li>
+ *   <li>The class must override the methods to convert back and forth between the immutable splits
+ *       ({@code SplitT}) and the mutable split state representation ({@code SplitStateT}).</li>
+ *   <li>Finally, the reader must decide what to do when it starts ({@link #start()}) or when a split is
+ *       finished ({@link #onSplitFinished(Collection)}).</li>
+ * </ul>
+ *
+ * @param <E> The type of the records (the raw type that typically contains checkpointing information).
+ * @param <T> The final type of the records emitted by the source.
+ * @param <SplitT> The type of the splits processed by the source.
+ * @param <SplitStateT> The type of the mutable state per split.
  */
 public abstract class SingleThreadMultiplexSourceReaderBase<E, T, SplitT extends SourceSplit, SplitStateT>
 	extends SourceReaderBase<E, T, SplitT, SplitStateT> {
 
+	/**
+	 * The primary constructor for the source reader.
+	 *
+	 * <p>The reader will use a handover queue sized as configured via
+	 * {@link SourceReaderOptions#ELEMENT_QUEUE_CAPACITY}.
+	 */
+	public SingleThreadMultiplexSourceReaderBase(
+			Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
+			RecordEmitter<E, T, SplitStateT> recordEmitter,
+			Configuration config,
+			SourceReaderContext context) {
+		this(
+			new FutureCompletingBlockingQueue<>(config.getInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY)),
+			splitReaderSupplier,
+			recordEmitter,
+			config,
+			context);
+	}
+
+	/**
+	 * This constructor behaves like
+	 * {@link #SingleThreadMultiplexSourceReaderBase(Supplier, RecordEmitter, Configuration, SourceReaderContext)},
+	 * but accepts a specific {@link FutureCompletingBlockingQueue}.
+	 */
 	public SingleThreadMultiplexSourceReaderBase(
-		FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
-		Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
-		RecordEmitter<E, T, SplitStateT> recordEmitter,
-		Configuration config,
-		SourceReaderContext context) {
+			FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
+			Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
+			RecordEmitter<E, T, SplitStateT> recordEmitter,
+			Configuration config,
+			SourceReaderContext context) {
 		super(
 			elementsQueue,
 			new SingleThreadFetcherManager<>(elementsQueue, splitReaderSupplier),
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingReaderContext.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingReaderContext.java
index 02faf1f..5bb9b59 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingReaderContext.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingReaderContext.java
@@ -64,7 +64,9 @@ public class TestingReaderContext implements SourceReaderContext {
 	}
 
 	@Override
-	public void sendSourceEventToCoordinator(SourceEvent sourceEvent) {}
+	public void sendSourceEventToCoordinator(SourceEvent sourceEvent) {
+		sentEvents.add(sourceEvent);
+	}
 
 	// ------------------------------------------------------------------------
 
diff --git a/flink-connectors/flink-connector-files/pom.xml b/flink-connectors/flink-connector-files/pom.xml
new file mode 100644
index 0000000..e1a5149
--- /dev/null
+++ b/flink-connectors/flink-connector-files/pom.xml
@@ -0,0 +1,83 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-connectors</artifactId>
+		<version>1.12-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-connector-files</artifactId>
+	<name>Flink : Connectors : Files</name>
+
+	<packaging>jar</packaging>
+
+	<dependencies>
+
+		<!-- core dependencies -->
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-base</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<!-- test dependencies -->
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-base</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
+			<version>1.12-SNAPSHOT</version>
+			<scope>test</scope>
+		</dependency>
+
+	</dependencies>
+
+</project>
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/ContinuousEnumerationSettings.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/ContinuousEnumerationSettings.java
new file mode 100644
index 0000000..4123c43
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/ContinuousEnumerationSettings.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src;
+
+import org.apache.flink.annotation.Internal;
+
+import java.io.Serializable;
+import java.time.Duration;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Settings describing how to do continuous file discovery and enumeration for the
+ * file source's continuous discovery and streaming mode.
+ */
+@Internal
+final class ContinuousEnumerationSettings implements Serializable {
+
+	private static final long serialVersionUID = 1L;
+
+	private final Duration discoveryInterval;
+
+	ContinuousEnumerationSettings(Duration discoveryInterval) {
+		this.discoveryInterval = checkNotNull(discoveryInterval);
+	}
+
+	public Duration getDiscoveryInterval() {
+		return discoveryInterval;
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public String toString() {
+		return "ContinuousEnumerationSettings{" +
+			"discoveryInterval=" + discoveryInterval +
+			'}';
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSource.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSource.java
new file mode 100644
index 0000000..296abf9
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSource.java
@@ -0,0 +1,437 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.connector.file.src.assigners.FileSplitAssigner;
+import org.apache.flink.connector.file.src.assigners.SimpleSplitAssigner;
+import org.apache.flink.connector.file.src.enumerate.BlockSplittingRecursiveEnumerator;
+import org.apache.flink.connector.file.src.enumerate.FileEnumerator;
+import org.apache.flink.connector.file.src.impl.ContinuousFileSplitEnumerator;
+import org.apache.flink.connector.file.src.impl.FileRecordFormatAdapter;
+import org.apache.flink.connector.file.src.impl.FileSourceReader;
+import org.apache.flink.connector.file.src.impl.StaticFileSplitEnumerator;
+import org.apache.flink.connector.file.src.impl.StreamFormatAdapter;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connector.file.src.reader.FileRecordFormat;
+import org.apache.flink.connector.file.src.reader.StreamFormat;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A unified data source that reads files - both in batch and in streaming mode.
+ *
+ * <p>This source supports all (distributed) file systems and object stores that can be
+ * accessed via the Flink's {@link FileSystem} class.
+ *
+ * <p>Start building a file source via one of the following calls:
+ * <ul>
+ *   <li>{@link FileSource#forRecordStreamFormat(StreamFormat, Path...)}</li>
+ *   <li>{@link FileSource#forBulkFileFormat(BulkFormat, Path...)}</li>
+ *   <li>{@link FileSource#forRecordFileFormat(FileRecordFormat, Path...)}</li>
+ * </ul>
+ * This creates a {@link FileSource.FileSourceBuilder} on which you can configure all the
+ * properties of the file source.
+ *
+ * <h2>Batch and Streaming</h2>
+ *
+ * <p>This source supports both bounded/batch and continuous/streaming data inputs. For the
+ * bounded/batch case, the file source processes all files under the given path(s).
+ * In the continuous/streaming case, the source periodically checks the paths for new files
+ * and will start reading those.
+ *
+ * <p>When you start creating a file source (via the {@link FileSource.FileSourceBuilder} created
+ * through one of the above-mentioned methods) the source is by default in bounded/batch mode.
+ * Call {@link FileSource.FileSourceBuilder#monitorContinuously(Duration)} to put the source
+ * into continuous streaming mode.
+ *
+ * <h2>Format Types</h2>
+ *
+ * <p>The reading of each file happens through file readers defined by <i>file formats</i>.
+ * These define the parsing logic for the contents of the file. There are multiple classes that
+ * the source supports. Their interfaces trade of simplicity of implementation and flexibility/efficiency.
+ * <ul>
+ *     <li>A {@link StreamFormat} reads the contents of a file from a file stream. It is the simplest
+ *         format to implement, and provides many features out-of-the-box (like checkpointing logic)
+ *         but is limited in the optimizations it can apply (such as object reuse, batching, etc.).</li>
+ *     <li>A {@link BulkFormat} reads batches of records from a file at a time. It is the most "low level"
+ *         format to implement, but offers the greatest flexibility to optimize the implementation.</li>
+ *     <li>A {@link FileRecordFormat} is in the middle of the trade-off spectrum between the
+ *         {@code StreamFormat} and the {@code BulkFormat}.</li>
+ * </ul>
+ *
+ * <h2>Discovering / Enumerating Files</h2>
+ *
+ * <p>The way that the source lists the files to be processes is defined by the {@link FileEnumerator}.
+ * The {@code FileEnumerator} is responsible to select the relevant files (for example filter out
+ * hidden files) and to optionally splits files into multiple regions (= file source splits) that
+ * can be read in parallel).
+ *
+ * @param <T> The type of the events/records produced by this source.
+ */
+@PublicEvolving
+public final class FileSource<T> implements Source<T, FileSourceSplit, PendingSplitsCheckpoint>, ResultTypeQueryable<T> {
+
+	private static final long serialVersionUID = 1L;
+
+	/**
+	 * The default split assigner, a lazy non-locality-aware assigner.
+	 */
+	public static final FileSplitAssigner.Provider DEFAULT_SPLIT_ASSIGNER = SimpleSplitAssigner::new;
+
+	/**
+	 * The default file enumerator used for splittable formats.
+	 * The enumerator recursively enumerates files, split files that consist of multiple distributed storage
+	 * blocks into multiple splits, and filters hidden files (files starting with '.' or '_').
+	 * Files with suffixes of common compression formats (for example '.gzip', '.bz2', '.xy', '.zip', ...)
+	 * will not be split.
+	 */
+	public static final FileEnumerator.Provider DEFAULT_SPLITTABLE_FILE_ENUMERATOR = BlockSplittingRecursiveEnumerator::new;
+
+	/**
+	 * The default file enumerator used for non-splittable formats.
+	 * The enumerator recursively enumerates files, creates one split for the file, and filters hidden
+	 * files (files starting with '.' or '_').
+	 */
+	public static final FileEnumerator.Provider DEFAULT_NON_SPLITTABLE_FILE_ENUMERATOR = BlockSplittingRecursiveEnumerator::new;
+
+	// ------------------------------------------------------------------------
+
+	private final Path[] inputPaths;
+
+	private final FileEnumerator.Provider enumeratorFactory;
+
+	private final FileSplitAssigner.Provider assignerFactory;
+
+	private final BulkFormat<T> readerFormat;
+
+	@Nullable
+	private final ContinuousEnumerationSettings continuousEnumerationSettings;
+
+	// ------------------------------------------------------------------------
+
+	private FileSource(
+			final Path[] inputPaths,
+			final FileEnumerator.Provider fileEnumerator,
+			final FileSplitAssigner.Provider splitAssigner,
+			final BulkFormat<T> readerFormat,
+			@Nullable final ContinuousEnumerationSettings continuousEnumerationSettings) {
+
+		checkArgument(inputPaths.length > 0);
+		this.inputPaths = inputPaths;
+		this.enumeratorFactory = checkNotNull(fileEnumerator);
+		this.assignerFactory = checkNotNull(splitAssigner);
+		this.readerFormat = checkNotNull(readerFormat);
+		this.continuousEnumerationSettings = continuousEnumerationSettings;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Source API Methods
+	// ------------------------------------------------------------------------
+
+	@Override
+	public Boundedness getBoundedness() {
+		return continuousEnumerationSettings == null ? Boundedness.BOUNDED : Boundedness.CONTINUOUS_UNBOUNDED;
+	}
+
+	@Override
+	public SourceReader<T, FileSourceSplit> createReader(SourceReaderContext readerContext) {
+		return new FileSourceReader<>(readerContext, readerFormat, readerContext.getConfiguration());
+	}
+
+	@Override
+	public SplitEnumerator<FileSourceSplit, PendingSplitsCheckpoint> createEnumerator(
+			SplitEnumeratorContext<FileSourceSplit> enumContext) {
+
+		final FileEnumerator enumerator = enumeratorFactory.create();
+
+		// read the initial set of splits (which is also the total set of splits for bounded sources)
+		final Collection<FileSourceSplit> splits;
+		try {
+			// TODO - in the next cleanup pass, we should try to remove the need to "wrap unchecked" here
+			splits = enumerator.enumerateSplits(inputPaths, enumContext.currentParallelism());
+		} catch (IOException e) {
+			throw new FlinkRuntimeException("Could not enumerate file splits", e);
+		}
+
+		return createSplitEnumerator(enumContext, enumerator, splits, null);
+	}
+
+	@Override
+	public SplitEnumerator<FileSourceSplit, PendingSplitsCheckpoint> restoreEnumerator(
+			SplitEnumeratorContext<FileSourceSplit> enumContext,
+			PendingSplitsCheckpoint checkpoint) throws IOException {
+
+		final FileEnumerator enumerator = enumeratorFactory.create();
+
+		return createSplitEnumerator(enumContext, enumerator, checkpoint.getSplits(), checkpoint.getAlreadyProcessedPaths());
+	}
+
+	@Override
+	public SimpleVersionedSerializer<FileSourceSplit> getSplitSerializer() {
+		return FileSourceSplitSerializer.INSTANCE;
+	}
+
+	@Override
+	public SimpleVersionedSerializer<PendingSplitsCheckpoint> getEnumeratorCheckpointSerializer() {
+		return PendingSplitsCheckpointSerializer.INSTANCE;
+	}
+
+	@Override
+	public TypeInformation<T> getProducedType() {
+		return readerFormat.getProducedType();
+	}
+
+	// ------------------------------------------------------------------------
+	//  helpers
+	// ------------------------------------------------------------------------
+
+	private SplitEnumerator<FileSourceSplit, PendingSplitsCheckpoint> createSplitEnumerator(
+			SplitEnumeratorContext<FileSourceSplit> context,
+			FileEnumerator enumerator,
+			Collection<FileSourceSplit> splits,
+			@Nullable Collection<Path> alreadyProcessedPaths) {
+
+		final FileSplitAssigner splitAssigner = assignerFactory.create(splits);
+
+		if (continuousEnumerationSettings == null) {
+			// bounded case
+			return new StaticFileSplitEnumerator(context, splitAssigner);
+		} else {
+			// unbounded case
+			if (alreadyProcessedPaths == null) {
+				alreadyProcessedPaths = splitsToPaths(splits);
+			}
+
+			return new ContinuousFileSplitEnumerator(
+					context,
+					enumerator,
+					splitAssigner,
+					inputPaths,
+					alreadyProcessedPaths,
+					continuousEnumerationSettings.getDiscoveryInterval().toMillis());
+		}
+	}
+
+	private static Collection<Path> splitsToPaths(Collection<FileSourceSplit> splits) {
+		return splits.stream()
+			.map(FileSourceSplit::path)
+			.collect(Collectors.toCollection(HashSet::new));
+	}
+
+	// ------------------------------------------------------------------------
+	//  Entry-point Factory Methods
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Builds a new {@code FileSource} using a {@link StreamFormat} to read record-by-record from a
+	 * file stream.
+	 *
+	 * <p>When possible, stream-based formats are generally easier (preferable) to file-based formats,
+	 * because they support better default behavior around I/O batching, or better progress tracking to
+	 * avoid re-doing work on recovery.
+	 */
+	public static <T> FileSourceBuilder<T> forRecordStreamFormat(final StreamFormat<T> reader, final Path... paths) {
+		checkNotNull(reader, "reader");
+		checkNotNull(paths, "paths");
+		checkArgument(paths.length > 0, "paths must not be empty");
+
+		final BulkFormat<T> bulkFormat = new StreamFormatAdapter<>(reader);
+		return new FileSourceBuilder<>(paths, bulkFormat);
+	}
+
+	/**
+	 * Builds a new {@code FileSource} using a {@link BulkFormat} to read batches of records
+	 * from files.
+	 *
+	 * <p>Examples for bulk readers are compressed and vectorized formats such as ORC or Parquet.
+	 */
+	public static <T> FileSourceBuilder<T> forBulkFileFormat(final BulkFormat<T> reader, final Path... paths) {
+		checkNotNull(reader, "reader");
+		checkNotNull(paths, "paths");
+		checkArgument(paths.length > 0, "paths must not be empty");
+
+		return new FileSourceBuilder<>(paths, reader);
+	}
+
+	/**
+	 * Builds a new {@code FileSource} using a {@link FileRecordFormat} to read record-by-record from a
+	 * a file path.
+	 *
+	 * <p>A {@code FileRecordFormat} is more general than the {@link StreamFormat}, but also
+	 * requires often more careful parametrization.
+	 */
+	public static <T> FileSourceBuilder<T> forRecordFileFormat(final FileRecordFormat<T> reader, final Path... paths) {
+		checkNotNull(reader, "reader");
+		checkNotNull(paths, "paths");
+		checkArgument(paths.length > 0, "paths must not be empty");
+
+		final BulkFormat<T> bulkFormat = new FileRecordFormatAdapter<>(reader);
+		return new FileSourceBuilder<>(paths, bulkFormat);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Builder
+	// ------------------------------------------------------------------------
+
+	/**
+	 * The builder for the {@code FileSource}, to configure the various behaviors.
+	 *
+	 * <p>Start building the source via one of the following methods:
+	 * <ul>
+	 *   <li>{@link FileSource#forRecordStreamFormat(StreamFormat, Path...)}</li>
+	 *   <li>{@link FileSource#forBulkFileFormat(BulkFormat, Path...)}</li>
+	 *   <li>{@link FileSource#forRecordFileFormat(FileRecordFormat, Path...)}</li>
+	 * </ul>
+	 */
+	public static final class FileSourceBuilder<T> extends AbstractFileSourceBuilder<T, FileSourceBuilder<T>> {
+		public FileSourceBuilder(Path[] inputPaths, BulkFormat<T> readerFormat) {
+			super(inputPaths, readerFormat);
+		}
+	}
+
+	/**
+	 * The generic base builder. This builder carries a <i>SELF</i> type to make it convenient to
+	 * extend this for subclasses, using the following pattern.
+	 * <pre>{@code
+	 * public class SubBuilder<T> extends AbstractFileSourceBuilder<T, SubBuilder<T>> {
+	 *     ...
+	 * }
+	 * }</pre>
+	 * That way, all return values from builder method defined here are typed to the sub-class
+	 * type and support fluent chaining.
+	 *
+	 * <p>We don't make the publicly visible builder generic with a SELF type, because it leads to
+	 * generic signatures that can look complicated and confusing.
+	 */
+	public static class AbstractFileSourceBuilder<T, SELF extends AbstractFileSourceBuilder<T, SELF>> {
+
+		// mandatory - have no defaults
+		private final Path[] inputPaths;
+		private final BulkFormat<T> readerFormat;
+
+		// optional - have defaults
+		private FileEnumerator.Provider fileEnumerator;
+		private FileSplitAssigner.Provider splitAssigner;
+		@Nullable
+		private ContinuousEnumerationSettings continuousSourceSettings;
+
+		protected AbstractFileSourceBuilder(Path[] inputPaths, BulkFormat<T> readerFormat) {
+			this.inputPaths = checkNotNull(inputPaths);
+			this.readerFormat = checkNotNull(readerFormat);
+
+			this.fileEnumerator = readerFormat.isSplittable()
+					? DEFAULT_SPLITTABLE_FILE_ENUMERATOR
+					: DEFAULT_NON_SPLITTABLE_FILE_ENUMERATOR;
+			this.splitAssigner = DEFAULT_SPLIT_ASSIGNER;
+		}
+
+		/**
+		 * Creates the file source with the settings applied to this builder.
+		 */
+		public FileSource<T> build() {
+			return new FileSource<>(
+				inputPaths,
+				fileEnumerator,
+				splitAssigner,
+				readerFormat,
+				continuousSourceSettings);
+		}
+
+		/**
+		 * Sets this source to streaming ("continuous monitoring") mode.
+		 *
+		 * <p>This makes the source a "continuous streaming" source that keeps running, monitoring
+		 * for new files, and reads these files when they appear and are discovered by the monitoring.
+		 *
+		 * <p>The interval in which the source checks for new files is the {@code discoveryInterval}.
+		 * Shorter intervals mean that files are discovered more quickly, but also imply more frequent
+		 * listing or directory traversal of the file system / object store.
+		 */
+		public SELF monitorContinuously(Duration discoveryInterval) {
+			checkNotNull(discoveryInterval, "discoveryInterval");
+			checkArgument(!(discoveryInterval.isNegative() || discoveryInterval.isZero()), "discoveryInterval must be > 0");
+
+			this.continuousSourceSettings = new ContinuousEnumerationSettings(discoveryInterval);
+			return self();
+		}
+
+		/**
+		 * Sets this source to bounded (batch) mode.
+		 *
+		 * <p>In this mode, the source processes the files that are under the given paths when the
+		 * application is started. Once all files are processed, the source will finish.
+		 *
+		 * <p>This setting is also the default behavior. This method is mainly here to "switch back" to
+		 * bounded (batch) mode, or to make it explicit in the source construction.
+		 */
+		public SELF processStaticFileSet() {
+			this.continuousSourceSettings = null;
+			return self();
+		}
+
+		/**
+		 * Configures the {@link FileEnumerator} for the source.
+		 * The File Enumerator is responsible for selecting from the input path the set of files
+		 * that should be processed (and which to filter out). Furthermore, the File Enumerator
+		 * may split the files further into sub-regions, to enable parallelization beyond the number
+		 * of files.
+		 */
+		public SELF setFileEnumerator(FileEnumerator.Provider fileEnumerator) {
+			this.fileEnumerator = checkNotNull(fileEnumerator);
+			return self();
+		}
+
+		/**
+		 * Configures the {@link FileSplitAssigner} for the source.
+		 * The File Split Assigner determines which parallel reader instance gets which
+		 * {@link FileSourceSplit}, and in which order these splits are assigned.
+		 */
+		public SELF setSplitAssigner(FileSplitAssigner.Provider splitAssigner) {
+			this.splitAssigner = checkNotNull(splitAssigner);
+			return self();
+		}
+
+		@SuppressWarnings("unchecked")
+		private SELF self() {
+			return (SELF) this;
+		}
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSourceSplit.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSourceSplit.java
new file mode 100644
index 0000000..7f85618
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSourceSplit.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.connector.file.src.util.CheckpointedPosition;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.StringUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link SourceSplit} that represents a file, or a region of a file.
+ *
+ * <p>The split has an offset and an end, which defines the region of the file represented by
+ * the split. For splits representing the while file, the offset is zero and the length is the
+ * file size.
+ *
+ * <p>The split may furthermore have a "reader position", which is the checkpointed position from
+ * a reader previously reading this split. This position is typically null when the split is assigned
+ * from the enumerator to the readers, and is non-null when the readers checkpoint their state
+ * in a file source split.
+ *
+ * <p>This class is {@link Serializable} for convenience. For Flink's internal serialization (both for
+ * RPC and for checkpoints), the {@link FileSourceSplitSerializer} is used.
+ */
+@PublicEvolving
+public class FileSourceSplit implements SourceSplit, Serializable {
+
+	private static final long serialVersionUID = 1L;
+
+	private static final String[] NO_HOSTS = StringUtils.EMPTY_STRING_ARRAY;
+
+	/** The unique ID of the split. Unique within the scope of this source. */
+	private final String id;
+
+	/** The path of the file referenced by this split. */
+	private final Path filePath;
+
+	/** The position of the first byte in the file to process. */
+	private final long offset;
+
+	/** The number of bytes in the file to process. */
+	private final long length;
+
+	/** The names of the hosts storing this range of the file. Empty, if no host information is available. */
+	private final String[] hostnames;
+
+	/** The precise reader position in the split, to resume from. */
+	@Nullable
+	private final CheckpointedPosition readerPosition;
+
+	/** The splits are frequently serialized into checkpoints.
+	 * Caching the byte representation makes repeated serialization cheap.
+	 * This field is used by {@link FileSourceSplitSerializer}. */
+	@Nullable
+	transient byte[] serializedFormCache;
+
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Constructs a split with host information.
+	 *
+	 * @param id The unique ID of this source split.
+	 * @param filePath The path to the file.
+	 * @param offset The start (inclusive) of the split's rage in the file.
+	 * @param length The number of bytes in the split (starting from the offset)
+	 */
+	public FileSourceSplit(String id, Path filePath, long offset, long length) {
+		this(id, filePath, offset, length, NO_HOSTS);
+	}
+
+	/**
+	 * Constructs a split with host information.
+	 *
+	 * @param filePath The path to the file.
+	 * @param offset The start (inclusive) of the split's rage in the file.
+	 * @param length The number of bytes in the split (starting from the offset)
+	 * @param hostnames The hostnames of the nodes storing the split's file range.
+	 */
+	public FileSourceSplit(String id, Path filePath, long offset, long length, String... hostnames) {
+		this(id, filePath, offset, length, hostnames, null, null);
+	}
+
+	/**
+	 * Constructs a split with host information.
+	 *
+	 * @param filePath The path to the file.
+	 * @param offset The start (inclusive) of the split's rage in the file.
+	 * @param length The number of bytes in the split (starting from the offset)
+	 * @param hostnames The hostnames of the nodes storing the split's file range.
+	 */
+	public FileSourceSplit(
+			String id,
+			Path filePath,
+			long offset,
+			long length,
+			String[] hostnames,
+			@Nullable CheckpointedPosition readerPosition) {
+		this(id, filePath, offset, length, hostnames, readerPosition, null);
+	}
+
+	/**
+	 * Package private constructor, used by the serializers to directly cache the serialized form.
+	 */
+	FileSourceSplit(
+			String id,
+			Path filePath,
+			long offset,
+			long length,
+			String[] hostnames,
+			@Nullable CheckpointedPosition readerPosition,
+			@Nullable byte[] serializedForm) {
+
+		checkArgument(offset >= 0, "offset must be >= 0");
+		checkArgument(length >= 0, "length must be >= 0");
+		checkNoNullHosts(hostnames);
+
+		this.id = checkNotNull(id);
+		this.filePath = checkNotNull(filePath);
+		this.offset = offset;
+		this.length = length;
+		this.hostnames = hostnames;
+		this.readerPosition = readerPosition;
+		this.serializedFormCache = serializedForm;
+	}
+
+	// ------------------------------------------------------------------------
+	//  split properties
+	// ------------------------------------------------------------------------
+
+	@Override
+	public String splitId() {
+		return id;
+	}
+
+	/**
+	 * Gets the file's path.
+	 */
+	public Path path() {
+		return filePath;
+	}
+
+	/**
+	 * Returns the start of the file region referenced by this source split.
+	 * The position is inclusive, the value indicates the first byte that is part of the split.
+	 */
+	public long offset() {
+		return offset;
+	}
+
+	/**
+	 * Returns the number of bytes in the file region described by this source split.
+	 */
+	public long length() {
+		return length;
+	}
+
+	/**
+	 * Gets the hostnames of the nodes storing the file range described by this split.
+	 * The returned array is empty, if no host information is available.
+	 *
+	 * <p>Host information is typically only available on specific file systems, like HDFS.
+	 */
+	public String[] hostnames() {
+		return hostnames;
+	}
+
+	/**
+	 * Gets the (checkpointed) position of the reader, if set.
+	 * This value is typically absent for splits when assigned from the enumerator to the readers,
+	 * and present when the splits are recovered from a checkpoint.
+	 */
+	public Optional<CheckpointedPosition> getReaderPosition() {
+		return Optional.ofNullable(readerPosition);
+	}
+
+	// ------------------------------------------------------------------------
+	//  utils
+	// ------------------------------------------------------------------------
+
+	@Override
+	public String toString() {
+		final String hosts = hostnames.length == 0 ? "(no host info)" : " hosts=" + Arrays.toString(hostnames);
+		return String.format("FileSourceSplit: %s [%d, %d) %s ID=%s position=%s",
+				filePath, offset, offset + length, hosts, id, readerPosition);
+	}
+
+	private static void checkNoNullHosts(String[] hosts) {
+		checkNotNull(hosts, "hostnames array must not be null");
+		for (String host : hosts) {
+			checkArgument(host != null, "the hostnames must not contain null entries");
+		}
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSourceSplitSerializer.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSourceSplitSerializer.java
new file mode 100644
index 0000000..1674230
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSourceSplitSerializer.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.file.src.util.CheckpointedPosition;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A serializer for the {@link FileSourceSplit}.
+ */
+@PublicEvolving
+public final class FileSourceSplitSerializer implements SimpleVersionedSerializer<FileSourceSplit> {
+
+	public static final FileSourceSplitSerializer INSTANCE = new FileSourceSplitSerializer();
+
+	private static final ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE =
+			ThreadLocal.withInitial(() -> new DataOutputSerializer(64));
+
+	private static final int VERSION = 1;
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public int getVersion() {
+		return VERSION;
+	}
+
+	@Override
+	public byte[] serialize(FileSourceSplit split) throws IOException {
+		checkArgument(split.getClass() == FileSourceSplit.class, "Cannot serialize subclasses of FileSourceSplit");
+
+		// optimization: the splits lazily cache their own serialized form
+		if (split.serializedFormCache != null) {
+			return split.serializedFormCache;
+		}
+
+		final DataOutputSerializer out = SERIALIZER_CACHE.get();
+
+		out.writeUTF(split.splitId());
+		split.path().write(out);
+		out.writeLong(split.offset());
+		out.writeLong(split.length());
+		writeStringArray(out, split.hostnames());
+
+		final Optional<CheckpointedPosition> readerPosition = split.getReaderPosition();
+		out.writeBoolean(readerPosition.isPresent());
+		if (readerPosition.isPresent()) {
+			out.writeLong(readerPosition.get().getOffset());
+			out.writeLong(readerPosition.get().getRecordsAfterOffset());
+		}
+
+		final byte[] result = out.getCopyOfBuffer();
+		out.clear();
+
+		// optimization: cache the serialized from, so we avoid the byte work during repeated serialization
+		split.serializedFormCache = result;
+
+		return result;
+	}
+
+	@Override
+	public FileSourceSplit deserialize(int version, byte[] serialized) throws IOException {
+		if (version == 1) {
+			return deserializeV1(serialized);
+		}
+		throw new IOException("Unknown version: " + version);
+	}
+
+	private static FileSourceSplit deserializeV1(byte[] serialized) throws IOException {
+		final DataInputDeserializer in = new DataInputDeserializer(serialized);
+
+		final String id = in.readUTF();
+		final Path path = new Path();
+		path.read(in);
+		final long offset = in.readLong();
+		final long len = in.readLong();
+		final String[] hosts = readStringArray(in);
+
+		final CheckpointedPosition readerPosition = in.readBoolean()
+				? new CheckpointedPosition(in.readLong(), in.readLong()) : null;
+
+		// instantiate a new split and cache the serialized form
+		return new FileSourceSplit(id, path, offset, len, hosts, readerPosition, serialized);
+	}
+
+	private static void writeStringArray(DataOutputView out, String[] strings) throws IOException {
+		out.writeInt(strings.length);
+		for (String string : strings) {
+			out.writeUTF(string);
+		}
+	}
+
+	private static String[] readStringArray(DataInputView in) throws IOException {
+		final int len = in.readInt();
+		final String[] strings = new String[len];
+		for (int i = 0; i < len; i++) {
+			strings[i] = in.readUTF();
+		}
+		return strings;
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSourceSplitState.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSourceSplitState.java
new file mode 100644
index 0000000..e1f6b68
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSourceSplitState.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.file.src.util.CheckpointedPosition;
+
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * State of the reader, essentially a mutable version of the {@link FileSourceSplit}.
+ * Has a modifiable offset and records-to-skip-count.
+ *
+ * <p>The {@link FileSourceSplit} assigned to the reader or stored in the checkpoint points to the
+ * position from where to start reading (after recovery), so the current offset and records-to-skip
+ * need to always point to the record after the last emitted record.
+ */
+@PublicEvolving
+public final class FileSourceSplitState {
+
+	private final FileSourceSplit split;
+
+	private long offset;
+
+	private long recordsToSkipAfterOffset;
+
+	public FileSourceSplitState(FileSourceSplit split) {
+		this.split = checkNotNull(split);
+
+		final Optional<CheckpointedPosition> readerPosition = split.getReaderPosition();
+		if (readerPosition.isPresent()) {
+			this.offset = readerPosition.get().getOffset();
+			this.recordsToSkipAfterOffset = readerPosition.get().getRecordsAfterOffset();
+		} else {
+			this.offset = CheckpointedPosition.NO_OFFSET;
+			this.recordsToSkipAfterOffset = 0L;
+		}
+	}
+
+	public long getOffset() {
+		return offset;
+	}
+
+	public long getRecordsToSkipAfterOffset() {
+		return recordsToSkipAfterOffset;
+	}
+
+	public void setOffset(long offset) {
+		// we skip sanity / boundary checks here for efficiency.
+		// illegal boundaries will eventually be caught when constructing the split on checkpoint.
+		this.offset = offset;
+	}
+
+	public void setRecordsToSkipAfterOffset(long recordsToSkipAfterOffset) {
+		// we skip sanity / boundary checks here for efficiency.
+		// illegal boundaries will eventually be caught when constructing the split on checkpoint.
+		this.recordsToSkipAfterOffset = recordsToSkipAfterOffset;
+	}
+
+	public void setPosition(long offset, long recordsToSkipAfterOffset) {
+		// we skip sanity / boundary checks here for efficiency.
+		// illegal boundaries will eventually be caught when constructing the split on checkpoint.
+		this.offset = offset;
+		this.recordsToSkipAfterOffset = recordsToSkipAfterOffset;
+	}
+
+	public void setPosition(CheckpointedPosition position) {
+		this.offset = position.getOffset();
+		this.recordsToSkipAfterOffset = position.getRecordsAfterOffset();
+	}
+
+	/**
+	 * Use the current row count as the starting row count to create a new FileSourceSplit.
+	 */
+	public FileSourceSplit toFileSourceSplit() {
+		final CheckpointedPosition position =
+				(offset == CheckpointedPosition.NO_OFFSET && recordsToSkipAfterOffset == 0) ?
+						null : new CheckpointedPosition(offset, recordsToSkipAfterOffset);
+
+		return new FileSourceSplit(
+				split.splitId(),
+				split.path(),
+				split.offset(),
+				split.length(),
+				split.hostnames(),
+				position);
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/PendingSplitsCheckpoint.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/PendingSplitsCheckpoint.java
new file mode 100644
index 0000000..1515690
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/PendingSplitsCheckpoint.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.fs.Path;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A checkpoint of the current state of the containing the currently pending splits that are not yet assigned.
+ */
+@PublicEvolving
+public final class PendingSplitsCheckpoint {
+
+	/** The splits in the checkpoint. */
+	private final Collection<FileSourceSplit> splits;
+
+	/** The paths that are no longer in the enumerator checkpoint, but have been processed
+	 * before and should this be ignored. Relevant only for sources in continuous monitoring mode. */
+	private final Collection<Path> alreadyProcessedPaths;
+
+	/** The cached byte representation from the last serialization step. This helps to avoid
+	 * paying repeated serialization cost for the same checkpoint object. This field is used
+	 * by {@link PendingSplitsCheckpointSerializer}. */
+	@Nullable
+	byte[] serializedFormCache;
+
+	private PendingSplitsCheckpoint(Collection<FileSourceSplit> splits, Collection<Path> alreadyProcessedPaths) {
+		this.splits = Collections.unmodifiableCollection(splits);
+		this.alreadyProcessedPaths = Collections.unmodifiableCollection(alreadyProcessedPaths);
+	}
+
+	// ------------------------------------------------------------------------
+
+	public Collection<FileSourceSplit> getSplits() {
+		return splits;
+	}
+
+	public Collection<Path> getAlreadyProcessedPaths() {
+		return alreadyProcessedPaths;
+	}
+
+	// ------------------------------------------------------------------------
+	//  factories
+	// ------------------------------------------------------------------------
+
+	public static PendingSplitsCheckpoint fromCollectionSnapshot(Collection<FileSourceSplit> splits) {
+		checkNotNull(splits);
+
+		// create a copy of the collection to make sure this checkpoint is immutable
+		final Collection<FileSourceSplit> copy = new ArrayList<>(splits);
+		return new PendingSplitsCheckpoint(copy, Collections.emptySet());
+	}
+
+	public static PendingSplitsCheckpoint fromCollectionSnapshot(
+			Collection<FileSourceSplit> splits,
+			Collection<Path> alreadyProcessedPaths) {
+		checkNotNull(splits);
+
+		// create a copy of the collection to make sure this checkpoint is immutable
+		final Collection<FileSourceSplit> splitsCopy = new ArrayList<>(splits);
+		final Collection<Path> pathsCopy = new ArrayList<>(alreadyProcessedPaths);
+
+		return new PendingSplitsCheckpoint(splitsCopy, pathsCopy);
+	}
+
+	static PendingSplitsCheckpoint reusingCollection(Collection<FileSourceSplit> splits, Collection<Path> alreadyProcessedPaths) {
+		return new PendingSplitsCheckpoint(splits, alreadyProcessedPaths);
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/PendingSplitsCheckpointSerializer.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/PendingSplitsCheckpointSerializer.java
new file mode 100644
index 0000000..e87ca7d
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/PendingSplitsCheckpointSerializer.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A serializer for the {@link PendingSplitsCheckpoint}.
+ */
+@PublicEvolving
+public final class PendingSplitsCheckpointSerializer implements SimpleVersionedSerializer<PendingSplitsCheckpoint> {
+
+	public static final PendingSplitsCheckpointSerializer INSTANCE = new PendingSplitsCheckpointSerializer();
+
+	private static final int VERSION = 1;
+
+	private static final int VERSION_1_MAGIC_NUMBER = 0xDEADBEEF;
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public int getVersion() {
+		return VERSION;
+	}
+
+	@Override
+	public byte[] serialize(PendingSplitsCheckpoint checkpoint) throws IOException {
+		checkArgument(checkpoint.getClass() == PendingSplitsCheckpoint.class,
+				"Cannot serialize subclasses of PendingSplitsCheckpoint");
+
+		// optimization: the splits lazily cache their own serialized form
+		if (checkpoint.serializedFormCache != null) {
+			return checkpoint.serializedFormCache;
+		}
+
+		final FileSourceSplitSerializer serializer = FileSourceSplitSerializer.INSTANCE;
+		final Collection<FileSourceSplit> splits = checkpoint.getSplits();
+		final Collection<Path> processedPaths = checkpoint.getAlreadyProcessedPaths();
+
+		final ArrayList<byte[]> serializedSplits = new ArrayList<>(splits.size());
+		final ArrayList<byte[]> serializedPaths = new ArrayList<>(processedPaths.size());
+
+		int totalLen = 16; // four ints: magic, version, count splits, count paths
+
+		for (FileSourceSplit split : splits) {
+			final byte[] serSplit = serializer.serialize(split);
+			serializedSplits.add(serSplit);
+			totalLen += serSplit.length + 4; // 4 bytes for the length field
+		}
+
+		for (Path path : processedPaths) {
+			final byte[] serPath = path.toString().getBytes(StandardCharsets.UTF_8);
+			serializedPaths.add(serPath);
+			totalLen += serPath.length + 4; // 4 bytes for the length field
+		}
+
+		final byte[] result = new byte[totalLen];
+		final ByteBuffer byteBuffer = ByteBuffer.wrap(result).order(ByteOrder.LITTLE_ENDIAN);
+		byteBuffer.putInt(VERSION_1_MAGIC_NUMBER);
+		byteBuffer.putInt(serializer.getVersion());
+		byteBuffer.putInt(serializedSplits.size());
+		byteBuffer.putInt(serializedPaths.size());
+
+		for (byte[] splitBytes : serializedSplits) {
+			byteBuffer.putInt(splitBytes.length);
+			byteBuffer.put(splitBytes);
+		}
+
+		for (byte[] pathBytes : serializedPaths) {
+			byteBuffer.putInt(pathBytes.length);
+			byteBuffer.put(pathBytes);
+		}
+
+		assert byteBuffer.remaining() == 0;
+
+		// optimization: cache the serialized from, so we avoid the byte work during repeated serialization
+		checkpoint.serializedFormCache = result;
+
+		return result;
+	}
+
+	@Override
+	public PendingSplitsCheckpoint deserialize(int version, byte[] serialized) throws IOException {
+		if (version == 1) {
+			return deserializeV1(serialized);
+		}
+		throw new IOException("Unknown version: " + version);
+	}
+
+	private static PendingSplitsCheckpoint deserializeV1(byte[] serialized) throws IOException {
+		final ByteBuffer bb = ByteBuffer.wrap(serialized).order(ByteOrder.LITTLE_ENDIAN);
+
+		final int magic = bb.getInt();
+		if (magic != VERSION_1_MAGIC_NUMBER) {
+			throw new IOException(String.format("Invalid magic number for PendingSplitsCheckpoint. " +
+				"Expected: %X , found %X", VERSION_1_MAGIC_NUMBER, magic));
+		}
+
+		final int version = bb.getInt();
+		final int numSplits = bb.getInt();
+		final int numPaths = bb.getInt();
+
+		final FileSourceSplitSerializer serializer = FileSourceSplitSerializer.INSTANCE;
+		final ArrayList<FileSourceSplit> splits = new ArrayList<>(numSplits);
+		final ArrayList<Path> paths = new ArrayList<>(numPaths);
+
+		for (int remaining = numSplits; remaining > 0; remaining--) {
+			final byte[] bytes = new byte[bb.getInt()];
+			bb.get(bytes);
+			final FileSourceSplit split = serializer.deserialize(version, bytes);
+			splits.add(split);
+		}
+
+		for (int remaining = numPaths; remaining > 0; remaining--) {
+			final byte[] bytes = new byte[bb.getInt()];
+			bb.get(bytes);
+			final Path path = new Path(new String(bytes, StandardCharsets.UTF_8));
+			paths.add(path);
+		}
+
+		return PendingSplitsCheckpoint.reusingCollection(splits, paths);
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/assigners/FileSplitAssigner.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/assigners/FileSplitAssigner.java
new file mode 100644
index 0000000..9182b61
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/assigners/FileSplitAssigner.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.assigners;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Optional;
+
+/**
+ * The {@code FileSplitAssigner} is responsible for deciding what split should be processed next by
+ * which node. It determines split processing order and locality.
+ */
+@PublicEvolving
+public interface FileSplitAssigner {
+
+	/**
+	 * Gets the next split.
+	 *
+	 * <p>When this method returns an empty {@code Optional}, then the set of splits is
+	 * assumed to be done and the source will finish once the readers finished their current
+	 * splits.
+	 */
+	Optional<FileSourceSplit> getNext(@Nullable String hostname);
+
+	/**
+	 * Adds a set of splits to this assigner. This happens for example when some split processing
+	 * failed and the splits need to be re-added, or when new splits got discovered.
+	 */
+	void addSplits(Collection<FileSourceSplit> splits);
+
+	/**
+	 * Gets the remaining splits that this assigner has pending.
+	 */
+	Collection<FileSourceSplit> remainingSplits();
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Factory for the {@code FileSplitAssigner}, to allow the {@code FileSplitAssigner} to be eagerly
+	 * initialized and to not be serializable.
+	 */
+	@FunctionalInterface
+	interface Provider extends Serializable {
+
+		/**
+		 * Creates a new {@code FileSplitAssigner} that starts with the given set of initial splits.
+		 */
+		FileSplitAssigner create(Collection<FileSourceSplit> initialSplits);
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/assigners/SimpleSplitAssigner.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/assigners/SimpleSplitAssigner.java
new file mode 100644
index 0000000..ed52f97
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/assigners/SimpleSplitAssigner.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.assigners;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Optional;
+
+/**
+ * The {@code SimpleSplitAssigner} hands out splits in a random order, without any consideration
+ * for order or locality.
+ */
+@PublicEvolving
+public class SimpleSplitAssigner implements FileSplitAssigner {
+
+	private final ArrayList<FileSourceSplit> splits;
+
+	public SimpleSplitAssigner(Collection<FileSourceSplit> splits) {
+		this.splits = new ArrayList<>(splits);
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public Optional<FileSourceSplit> getNext(String hostname) {
+		final int size = splits.size();
+		return size == 0 ? Optional.empty() : Optional.of(splits.remove(size - 1));
+	}
+
+	@Override
+	public void addSplits(Collection<FileSourceSplit> newSplits) {
+		splits.addAll(newSplits);
+	}
+
+	@Override
+	public Collection<FileSourceSplit> remainingSplits() {
+		return splits;
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public String toString() {
+		return "SimpleSplitAssigner " + splits;
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/compression/StandardDeCompressors.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/compression/StandardDeCompressors.java
new file mode 100644
index 0000000..14a732e
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/compression/StandardDeCompressors.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.compression;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.io.compression.Bzip2InputStreamFactory;
+import org.apache.flink.api.common.io.compression.DeflateInflaterInputStreamFactory;
+import org.apache.flink.api.common.io.compression.GzipInflaterInputStreamFactory;
+import org.apache.flink.api.common.io.compression.InflaterInputStreamFactory;
+import org.apache.flink.api.common.io.compression.XZInputStreamFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * A collection of common compression formats and de-compressors.
+ */
+@PublicEvolving
+public final class StandardDeCompressors {
+
+	/** All supported file compression formats, by common file extensions. */
+	private static final Map<String, InflaterInputStreamFactory<?>> DECOMPRESSORS =
+			buildDecompressorMap(
+					DeflateInflaterInputStreamFactory.getInstance(),
+					GzipInflaterInputStreamFactory.getInstance(),
+					Bzip2InputStreamFactory.getInstance(),
+					XZInputStreamFactory.getInstance());
+
+	/** All common file extensions of supported file compression formats. */
+	private static final Collection<String> COMMON_SUFFIXES =
+			Collections.unmodifiableList(new ArrayList<>(DECOMPRESSORS.keySet()));
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Gets all common file extensions of supported file compression formats.
+	 */
+	public static Collection<String> getCommonSuffixes() {
+		return COMMON_SUFFIXES;
+	}
+
+	/**
+	 * Gets the decompressor for a file extension. Returns null if there is no decompressor for this
+	 * file extension.
+	 */
+	@Nullable
+	public static InflaterInputStreamFactory<?> getDecompressorForExtension(String extension) {
+		return DECOMPRESSORS.get(extension);
+	}
+
+	/**
+	 * Gets the decompressor for a file name. This checks the file against all known and supported
+	 * file extensions.
+	 * Returns null if there is no decompressor for this file name.
+	 */
+	@Nullable
+	public static InflaterInputStreamFactory<?> getDecompressorForFileName(String fileName) {
+		for (final Map.Entry<String, InflaterInputStreamFactory<?>> entry : DECOMPRESSORS.entrySet()) {
+			if (fileName.endsWith(entry.getKey())) {
+				return entry.getValue();
+			}
+		}
+		return null;
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static Map<String, InflaterInputStreamFactory<?>> buildDecompressorMap(
+			final InflaterInputStreamFactory<?>... decompressors) {
+
+		final LinkedHashMap<String, InflaterInputStreamFactory<?>> map = new LinkedHashMap<>(decompressors.length);
+		for (InflaterInputStreamFactory<?> decompressor : decompressors) {
+			for (String suffix : decompressor.getCommonFileExtensions()) {
+				map.put(suffix, decompressor);
+			}
+		}
+		return map;
+	}
+
+	// ------------------------------------------------------------------------
+
+	/** This class has purely static utility methods and is not meant to be instantiated. */
+	private StandardDeCompressors() {}
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/BlockSplittingRecursiveEnumerator.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/BlockSplittingRecursiveEnumerator.java
new file mode 100644
index 0000000..f475a68
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/BlockSplittingRecursiveEnumerator.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.enumerate;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.compression.StandardDeCompressors;
+import org.apache.flink.core.fs.BlockLocation;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.function.Predicate;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This {@code FileEnumerator} enumerates all files under the given paths recursively,
+ * and creates a separate split for each file block.
+ *
+ * <p>Please note that file blocks are only exposed by some file systems, such as HDFS. File systems
+ * that do not expose block information will not create multiple file splits per file, but keep
+ * the files as one source split.
+ *
+ * <p>Files with suffixes corresponding to known compression formats (for example '.gzip', '.bz2', ...)
+ * will not be split. See {@link StandardDeCompressors} for a list of known formats and suffixes.
+ *
+ * <p>The default instantiation of this enumerator filters files with the common hidden file prefixes
+ * '.' and '_'. A custom file filter can be specified.
+ */
+@PublicEvolving
+public class BlockSplittingRecursiveEnumerator extends NonSplittingRecursiveEnumerator {
+
+	private static final Logger LOG = LoggerFactory.getLogger(BlockSplittingRecursiveEnumerator.class);
+
+	private final String[] nonSplittableFileSuffixes;
+
+	/**
+	 * Creates a new enumerator that enumerates all files except hidden files.
+	 * Hidden files are considered files where the filename starts with '.' or with '_'.
+	 *
+	 * <p>The enumerator does not split files that have a suffix corresponding to a known
+	 * compression format (for example '.gzip', '.bz2', '.xy', '.zip', ...).
+	 * See {@link StandardDeCompressors} for details.
+	 */
+	public BlockSplittingRecursiveEnumerator() {
+		this(new DefaultFileFilter(), StandardDeCompressors.getCommonSuffixes().toArray(new String[0]));
+	}
+
+	/**
+	 * Creates a new enumerator that uses the given predicate as a filter
+	 * for file paths, and avoids splitting files with the given extension (typically
+	 * to avoid splitting compressed files).
+	 */
+	public BlockSplittingRecursiveEnumerator(
+			final Predicate<Path> fileFilter,
+			final String[] nonSplittableFileSuffixes) {
+		super(fileFilter);
+		this.nonSplittableFileSuffixes = checkNotNull(nonSplittableFileSuffixes);
+	}
+
+	protected void convertToSourceSplits(
+			final FileStatus file,
+			final FileSystem fs,
+			final List<FileSourceSplit> target) throws IOException {
+
+		if (!isFileSplittable(file.getPath())) {
+			super.convertToSourceSplits(file, fs, target);
+			return;
+		}
+
+		final BlockLocation[] blocks = getBlockLocationsForFile(file, fs);
+		if (blocks == null) {
+			target.add(new FileSourceSplit(getNextId(), file.getPath(), 0L, file.getLen()));
+		} else {
+			for (BlockLocation block : blocks) {
+				target.add(new FileSourceSplit(
+						getNextId(),
+						file.getPath(),
+						block.getOffset(),
+						block.getLength(),
+						block.getHosts()));
+			}
+		}
+	}
+
+	protected boolean isFileSplittable(Path filePath) {
+		if (nonSplittableFileSuffixes.length == 0) {
+			return true;
+		}
+
+		final String path = filePath.getPath();
+		for (String suffix : nonSplittableFileSuffixes) {
+			if (path.endsWith(suffix)) {
+				return false;
+			}
+		}
+		return true;
+	}
+
+	@Nullable
+	private static BlockLocation[] getBlockLocationsForFile(FileStatus file, FileSystem fs) throws IOException {
+		final long len = file.getLen();
+
+		final BlockLocation[] blocks = fs.getFileBlockLocations(file, 0, len);
+		if (blocks == null || blocks.length == 0) {
+			return null;
+		}
+
+		// a cheap check whether we have all blocks. we don't check whether the blocks fully cover the
+		// file (too expensive) but make some sanity checks to catch early the common cases where incorrect
+		// bloc info is returned by the implementation.
+
+		long totalLen = 0L;
+		for (BlockLocation block : blocks) {
+			totalLen += block.getLength();
+		}
+		if (totalLen != len) {
+			LOG.warn("Block lengths do not match file length for {}. File length is {}, blocks are {}",
+					file.getPath(), len, Arrays.toString(blocks));
+			return null;
+		}
+
+		return blocks;
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/DefaultFileFilter.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/DefaultFileFilter.java
new file mode 100644
index 0000000..7ac7d46
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/DefaultFileFilter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.enumerate;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.fs.Path;
+
+import java.util.function.Predicate;
+
+/**
+ * A file filter that filters out hidden files based on common naming patterns,
+ * i.e., files where the filename starts with '.' or with '_'.
+ */
+@PublicEvolving
+public final class DefaultFileFilter implements Predicate<Path> {
+
+	@Override
+	public boolean test(Path path) {
+		final String fileName = path.getName();
+		if (fileName == null || fileName.length() == 0) {
+			return true;
+		}
+		final char first = fileName.charAt(0);
+		return first != '.' && first != '_';
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/FileEnumerator.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/FileEnumerator.java
new file mode 100644
index 0000000..67c2790
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/FileEnumerator.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.enumerate;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.core.fs.Path;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collection;
+
+/**
+ * The {@code FileEnumerator}'s task is to discover all files to be read and to split them into
+ * a set of {@link FileSourceSplit}.
+ *
+ * <p>This includes possibly, path traversals, file filtering (by name or other patterns) and
+ * deciding whether to split files into multiple splits, and how to split them.
+ */
+@PublicEvolving
+public interface FileEnumerator {
+
+	/**
+	 * Generates all file splits for the relevant files under the given paths.
+	 * The {@code minDesiredSplits} is an optional hint indicating how many splits would be necessary
+	 * to exploit parallelism properly.
+	 */
+	Collection<FileSourceSplit> enumerateSplits(Path[] paths, int minDesiredSplits) throws IOException;
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Factory for the {@code FileEnumerator}, to allow the {@code FileEnumerator} to be eagerly
+	 * initialized and to not be serializable.
+	 */
+	@FunctionalInterface
+	interface Provider extends Serializable {
+
+		FileEnumerator create();
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/NonSplittingRecursiveEnumerator.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/NonSplittingRecursiveEnumerator.java
new file mode 100644
index 0000000..a9a06f3
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/NonSplittingRecursiveEnumerator.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.enumerate;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.core.fs.BlockLocation;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.StringUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.function.Predicate;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This {@code FileEnumerator} enumerates all files under the given paths recursively.
+ * Each file becomes one split; this enumerator does not split files into smaller "block" units.
+ *
+ * <p>The default instantiation of this enumerator filters files with the common hidden file prefixes
+ * '.' and '_'. A custom file filter can be specified.
+ */
+@PublicEvolving
+public class NonSplittingRecursiveEnumerator implements FileEnumerator {
+
+	/** The filter predicate to filter out unwanted files. */
+	private final Predicate<Path> fileFilter;
+
+	/** The current Id as a mutable string representation. This covers more values than the
+	 * integer value range, so we should never overflow. */
+	private final char[] currentId = "0000000000".toCharArray();
+
+	/**
+	 * Creates a NonSplittingRecursiveEnumerator that enumerates all files except hidden files.
+	 * Hidden files are considered files where the filename starts with '.' or with '_'.
+	 */
+	public NonSplittingRecursiveEnumerator() {
+		this(new DefaultFileFilter());
+	}
+
+	/**
+	 * Creates a NonSplittingRecursiveEnumerator that uses the given predicate as a filter
+	 * for file paths.
+	 */
+	public NonSplittingRecursiveEnumerator(Predicate<Path> fileFilter) {
+		this.fileFilter = checkNotNull(fileFilter);
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public Collection<FileSourceSplit> enumerateSplits(Path[] paths, int minDesiredSplits) throws IOException {
+		final ArrayList<FileSourceSplit> splits = new ArrayList<>();
+
+		for (Path path : paths) {
+			final FileSystem fs = path.getFileSystem();
+			final FileStatus status = fs.getFileStatus(path);
+			addSplitsForPath(status, fs, splits);
+		}
+
+		return splits;
+	}
+
+	private void addSplitsForPath(FileStatus fileStatus, FileSystem fs, ArrayList<FileSourceSplit> target) throws IOException {
+		if (!fileFilter.test(fileStatus.getPath())) {
+			return;
+		}
+
+		if (!fileStatus.isDir()) {
+			convertToSourceSplits(fileStatus, fs, target);
+			return;
+		}
+
+		final FileStatus[] containedFiles = fs.listStatus(fileStatus.getPath());
+		for (FileStatus containedStatus : containedFiles) {
+			addSplitsForPath(containedStatus, fs, target);
+		}
+	}
+
+	protected void convertToSourceSplits(
+			final FileStatus file,
+			final FileSystem fs,
+			final List<FileSourceSplit> target) throws IOException {
+
+		final String[] hosts = getHostsFromBlockLocations(fs.getFileBlockLocations(file, 0L, file.getLen()));
+		target.add(new FileSourceSplit(getNextId(), file.getPath(), 0, file.getLen(), hosts));
+	}
+
+	protected final String getNextId() {
+		// because we just increment numbers, we increment the char representation directly,
+		// rather than incrementing an integer and converting it to a string representation
+		// every time again (requires quite some expensive conversion logic).
+		incrementCharArrayByOne(currentId, currentId.length - 1);
+		return new String(currentId);
+	}
+
+	private static String[] getHostsFromBlockLocations(BlockLocation[] blockLocations) throws IOException {
+		if (blockLocations.length == 0) {
+			return StringUtils.EMPTY_STRING_ARRAY;
+		}
+		if (blockLocations.length == 1) {
+			return blockLocations[0].getHosts();
+		}
+		final LinkedHashSet<String> allHosts = new LinkedHashSet<>();
+		for (BlockLocation block : blockLocations) {
+			allHosts.addAll(Arrays.asList(block.getHosts()));
+		}
+		return allHosts.toArray(new String[allHosts.size()]);
+	}
+
+	private static void incrementCharArrayByOne(char[] array, int pos) {
+		char c = array[pos];
+		c++;
+
+		if (c > '9') {
+			c = '0';
+			incrementCharArrayByOne(array, pos - 1);
+		}
+		array[pos] = c;
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/ContinuousFileSplitEnumerator.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/ContinuousFileSplitEnumerator.java
new file mode 100644
index 0000000..72fa0ad
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/ContinuousFileSplitEnumerator.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.impl;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.connector.base.source.event.RequestSplitEvent;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.PendingSplitsCheckpoint;
+import org.apache.flink.connector.file.src.assigners.FileSplitAssigner;
+import org.apache.flink.connector.file.src.enumerate.FileEnumerator;
+import org.apache.flink.core.fs.Path;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A continuously monitoring enumerator.
+ */
+@Internal
+public class ContinuousFileSplitEnumerator implements SplitEnumerator<FileSourceSplit, PendingSplitsCheckpoint> {
+
+	private static final Logger LOG = LoggerFactory.getLogger(ContinuousFileSplitEnumerator.class);
+
+	private final SplitEnumeratorContext<FileSourceSplit> context;
+
+	private final FileSplitAssigner splitAssigner;
+
+	private final FileEnumerator enumerator;
+
+	private final HashSet<Path> pathsAlreadyProcessed;
+
+	private final LinkedHashMap<Integer, String> readersAwaitingSplit;
+
+	private final Path[] paths;
+
+	private final long discoveryInterval;
+
+	// ------------------------------------------------------------------------
+
+	public ContinuousFileSplitEnumerator(
+			SplitEnumeratorContext<FileSourceSplit> context,
+			FileEnumerator enumerator,
+			FileSplitAssigner splitAssigner,
+			Path[] paths,
+			Collection<Path> alreadyDiscoveredPaths,
+			long discoveryInterval) {
+
+		checkArgument(discoveryInterval > 0L);
+		this.context = checkNotNull(context);
+		this.enumerator = checkNotNull(enumerator);
+		this.splitAssigner = checkNotNull(splitAssigner);
+		this.paths = paths;
+		this.discoveryInterval = discoveryInterval;
+		this.pathsAlreadyProcessed = new HashSet<>(alreadyDiscoveredPaths);
+		this.readersAwaitingSplit = new LinkedHashMap<>();
+	}
+
+	@Override
+	public void start() {
+		context.callAsync(
+			() -> enumerator.enumerateSplits(paths, 1),
+			this::processDiscoveredSplits,
+			discoveryInterval, discoveryInterval);
+	}
+
+	@Override
+	public void close() throws IOException {
+		// no resources to close
+	}
+
+	@Override
+	public void addReader(int subtaskId) {
+		// this source is purely lazy-pull-based, nothing to do upon registration
+	}
+
+	@Override
+	public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
+		if (sourceEvent instanceof RequestSplitEvent) {
+			readersAwaitingSplit.put(subtaskId, ((RequestSplitEvent) sourceEvent).hostName());
+			assignSplits();
+		}
+		else {
+			LOG.error("Received unrecognized event: {}", sourceEvent);
+		}
+	}
+
+	@Override
+	public void addSplitsBack(List<FileSourceSplit> splits, int subtaskId) {
+		LOG.debug("File Source Enumerator adds splits back: {}", splits);
+		splitAssigner.addSplits(splits);
+	}
+
+	@Override
+	public PendingSplitsCheckpoint snapshotState() throws Exception {
+		return PendingSplitsCheckpoint.fromCollectionSnapshot(splitAssigner.remainingSplits(), pathsAlreadyProcessed);
+	}
+
+	// ------------------------------------------------------------------------
+
+	private void processDiscoveredSplits(Collection<FileSourceSplit> splits, Throwable error) {
+		if (error != null) {
+			LOG.error("Failed to enumerate files", error);
+			return;
+		}
+
+		final Collection<FileSourceSplit> newSplits = splits.stream()
+				.filter((split) -> pathsAlreadyProcessed.add(split.path()))
+				.collect(Collectors.toList());
+		splitAssigner.addSplits(newSplits);
+
+		assignSplits();
+	}
+
+	private void assignSplits() {
+		final Iterator<Map.Entry<Integer, String>> awaitingReader = readersAwaitingSplit.entrySet().iterator();
+
+		while (awaitingReader.hasNext()) {
+			final Map.Entry<Integer, String> nextAwaiting = awaitingReader.next();
+			final String hostname = nextAwaiting.getValue();
+			final int awaitingSubtask = nextAwaiting.getKey();
+			final Optional<FileSourceSplit> nextSplit = splitAssigner.getNext(hostname);
+			if (nextSplit.isPresent()) {
+				context.assignSplit(nextSplit.get(), awaitingSubtask);
+				awaitingReader.remove();
+			} else {
+				break;
+			}
+		}
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/FileRecordFormatAdapter.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/FileRecordFormatAdapter.java
new file mode 100644
index 0000000..e084926
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/FileRecordFormatAdapter.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.impl;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connector.file.src.reader.FileRecordFormat;
+import org.apache.flink.connector.file.src.util.CheckpointedPosition;
+import org.apache.flink.connector.file.src.util.IteratorResultIterator;
+import org.apache.flink.core.fs.Path;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import static org.apache.flink.connector.file.src.util.Utils.doWithCleanupOnException;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The FormatReaderAdapter turns a {@link FileRecordFormat} into a {@link BulkFormat}.
+ */
+@Internal
+public final class FileRecordFormatAdapter<T> implements BulkFormat<T> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final FileRecordFormat<T> fileFormat;
+
+	public FileRecordFormatAdapter(FileRecordFormat<T> fileFormat) {
+		this.fileFormat = checkNotNull(fileFormat);
+	}
+
+	@Override
+	public BulkFormat.Reader<T> createReader(
+			final Configuration config,
+			final Path filePath,
+			final long splitOffset,
+			final long splitLength) throws IOException {
+
+		final FileRecordFormat.Reader<T> reader = fileFormat.createReader(config, filePath, splitOffset, splitLength);
+		return doWithCleanupOnException(reader, () -> {
+			//noinspection CodeBlock2Expr
+			return wrapReader(reader, config, CheckpointedPosition.NO_OFFSET, 0L);
+		});
+	}
+
+	@Override
+	public BulkFormat.Reader<T> restoreReader(
+			final Configuration config,
+			final Path filePath,
+			final long splitOffset,
+			final long splitLength,
+			final CheckpointedPosition checkpointedPosition) throws IOException {
+
+		final FileRecordFormat.Reader<T> reader = checkpointedPosition.getOffset() == CheckpointedPosition.NO_OFFSET
+				? fileFormat.createReader(config, filePath, splitOffset, splitLength)
+				: fileFormat.restoreReader(config, filePath, checkpointedPosition.getOffset(), splitOffset, splitLength);
+
+		return doWithCleanupOnException(reader, () -> {
+			long remaining = checkpointedPosition.getRecordsAfterOffset();
+			while (remaining > 0 && reader.read() != null) {
+				remaining--;
+			}
+
+			return wrapReader(reader, config, checkpointedPosition.getOffset(), checkpointedPosition.getRecordsAfterOffset());
+		});
+	}
+
+	@Override
+	public boolean isSplittable() {
+		return fileFormat.isSplittable();
+	}
+
+	@Override
+	public TypeInformation<T> getProducedType() {
+		return fileFormat.getProducedType();
+	}
+
+	private static <T> Reader<T> wrapReader(
+			final FileRecordFormat.Reader<T> reader,
+			final Configuration config,
+			final long startingOffset,
+			final long startingSkipCount) {
+
+		final int numRecordsPerBatch = config.get(FileRecordFormat.RECORDS_PER_FETCH);
+		return new Reader<>(reader, numRecordsPerBatch, startingOffset, startingSkipCount);
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * The reader adapter, from {@link FileRecordFormat.Reader} to {@link BulkFormat.Reader}.
+	 */
+	public static final class Reader<T> implements BulkFormat.Reader<T> {
+
+		private final FileRecordFormat.Reader<T> reader;
+		private final int numRecordsPerBatch;
+
+		private long lastOffset;
+		private long lastRecordsAfterOffset;
+
+		Reader(
+				final FileRecordFormat.Reader<T> reader,
+				final int numRecordsPerBatch,
+				final long initialOffset,
+				final long initialSkipCount) {
+			checkArgument(numRecordsPerBatch > 0, "numRecordsPerBatch must be > 0");
+			this.reader = checkNotNull(reader);
+			this.numRecordsPerBatch = numRecordsPerBatch;
+			this.lastOffset = initialOffset;
+			this.lastRecordsAfterOffset = initialSkipCount;
+		}
+
+		@Nullable
+		@Override
+		public RecordIterator<T> readBatch() throws IOException {
+			updateCheckpointablePosition();
+
+			final ArrayList<T> result = new ArrayList<>(numRecordsPerBatch);
+			T next;
+			int remaining = numRecordsPerBatch;
+			while (remaining-- > 0 && ((next = reader.read()) != null)) {
+				result.add(next);
+			}
+
+			if (result.isEmpty()) {
+				return null;
+			}
+
+			final RecordIterator<T> iter = new IteratorResultIterator<>(result.iterator(), lastOffset, lastRecordsAfterOffset);
+			lastRecordsAfterOffset += result.size();
+			return iter;
+		}
+
+		@Override
+		public void close() throws IOException {
+			reader.close();
+		}
+
+		private void updateCheckpointablePosition() {
+			final CheckpointedPosition position = reader.getCheckpointedPosition();
+			if (position != null) {
+				this.lastOffset = position.getOffset();
+				this.lastRecordsAfterOffset = position.getRecordsAfterOffset();
+			}
+		}
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/FileRecords.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/FileRecords.java
new file mode 100644
index 0000000..b3d2157f
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/FileRecords.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.impl;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.Set;
+
+/**
+ * A collection of records for one file split.
+ *
+ * <p>This is essentially a slim wrapper around the {@link BulkFormat.RecordIterator} that only
+ * adds information about the current split, or finished splits (to keep knowledge about current split
+ * IDs out of the reader formats).
+ */
+@Internal
+public final class FileRecords<T> implements RecordsWithSplitIds<RecordAndPosition<T>> {
+
+	@Nullable
+	private String splitId;
+
+	@Nullable
+	private BulkFormat.RecordIterator<T> recordsForSplitCurrent;
+
+	@Nullable
+	private final BulkFormat.RecordIterator<T> recordsForSplit;
+
+	private final Set<String> finishedSplits;
+
+	private FileRecords(
+			@Nullable String splitId,
+			@Nullable BulkFormat.RecordIterator<T> recordsForSplit,
+			Set<String> finishedSplits) {
+
+		this.splitId = splitId;
+		this.recordsForSplit = recordsForSplit;
+		this.finishedSplits = finishedSplits;
+	}
+
+	@Nullable
+	@Override
+	public String nextSplit() {
+		// move the split one (from current value to null)
+		final String nextSplit = this.splitId;
+		this.splitId = null;
+
+		// move the iterator, from null to value (if first move) or to null (if second move)
+		this.recordsForSplitCurrent = nextSplit != null ? this.recordsForSplit : null;
+
+		return nextSplit;
+	}
+
+	@Nullable
+	@Override
+	public RecordAndPosition<T> nextRecordFromSplit() {
+		final BulkFormat.RecordIterator<T> recordsForSplit = this.recordsForSplitCurrent;
+		if (recordsForSplit != null) {
+			return recordsForSplit.next();
+		} else {
+			throw new IllegalStateException();
+		}
+	}
+
+	@Override
+	public void recycle() {
+		if (recordsForSplit != null) {
+			recordsForSplit.releaseBatch();
+		}
+	}
+
+	@Override
+	public Set<String> finishedSplits() {
+		return finishedSplits;
+	}
+
+	// ------------------------------------------------------------------------
+
+	public static <T> FileRecords<T> forRecords(
+			final String splitId,
+			final BulkFormat.RecordIterator<T> recordsForSplit) {
+		return new FileRecords<>(splitId, recordsForSplit, Collections.emptySet());
+	}
+
+	public static <T> FileRecords<T> finishedSplit(String splitId) {
+		return new FileRecords<>(null, null, Collections.singleton(splitId));
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/FileSourceReader.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/FileSourceReader.java
new file mode 100644
index 0000000..08c965e
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/FileSourceReader.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.impl;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.event.RequestSplitEvent;
+import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.FileSourceSplitState;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+
+import java.util.Collection;
+
+/**
+ * A {@link SourceReader} that read records from {@link FileSourceSplit}.
+ */
+@Internal
+public final class FileSourceReader<T>
+		extends SingleThreadMultiplexSourceReaderBase<RecordAndPosition<T>, T, FileSourceSplit, FileSourceSplitState> {
+
+	public FileSourceReader(SourceReaderContext readerContext, BulkFormat<T> readerFormat, Configuration config) {
+		super(
+			() -> new FileSourceSplitReader<>(config, readerFormat),
+			new FileSourceRecordEmitter<>(),
+			config,
+			readerContext);
+	}
+
+	@Override
+	public void start() {
+		requestSplit();
+	}
+
+	@Override
+	protected void onSplitFinished(Collection<String> finishedSplitIds) {
+		requestSplit();
+	}
+
+	@Override
+	protected FileSourceSplitState initializedState(FileSourceSplit split) {
+		return new FileSourceSplitState(split);
+	}
+
+	@Override
+	protected FileSourceSplit toSplitType(String splitId, FileSourceSplitState splitState) {
+		return splitState.toFileSourceSplit();
+	}
+
+	private void requestSplit() {
+		context.sendSourceEventToCoordinator(new RequestSplitEvent(context.getLocalHostName()));
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/FileSourceRecordEmitter.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/FileSourceRecordEmitter.java
new file mode 100644
index 0000000..5f55159
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/FileSourceRecordEmitter.java
@@ -0,0 +1,47 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.impl;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.connector.base.source.reader.RecordEmitter;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.FileSourceSplitState;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+
+/**
+ * The {@link RecordEmitter} implementation for {@link FileSourceReader}.
+ *
+ * <p>This updates the {@link FileSourceSplit} for every emitted record.
+ * Because the {@link FileSourceSplit} points to the position from where to start reading (after recovery),
+ * the current offset and records-to-skip need to always point to the record after the emitted record.
+ */
+@Internal
+final class FileSourceRecordEmitter<T> implements RecordEmitter<RecordAndPosition<T>, T, FileSourceSplitState> {
+
+	@Override
+	public void emitRecord(
+			final RecordAndPosition<T> element,
+			final SourceOutput<T> output,
+			final FileSourceSplitState splitState) {
+
+		output.collect(element.getRecord());
+		splitState.setPosition(element.getOffset(), element.getRecordSkipCount());
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/FileSourceSplitReader.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/FileSourceSplitReader.java
new file mode 100644
index 0000000..ed9159b
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/FileSourceSplitReader.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.impl;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connector.file.src.util.CheckpointedPosition;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Optional;
+import java.util.Queue;
+
+/**
+ * The {@link SplitReader} implementation for the file source.
+ */
+@Internal
+final class FileSourceSplitReader<T> implements SplitReader<RecordAndPosition<T>, FileSourceSplit> {
+
+	private static final Logger LOG = LoggerFactory.getLogger(FileSourceSplitReader.class);
+
+	private final Configuration config;
+	private final BulkFormat<T> readerFactory;
+
+	private final Queue<FileSourceSplit> splits;
+
+	@Nullable
+	private BulkFormat.Reader<T> currentReader;
+	@Nullable
+	private String currentSplitId;
+
+	public FileSourceSplitReader(Configuration config, BulkFormat<T> readerFactory) {
+		this.config = config;
+		this.readerFactory = readerFactory;
+		this.splits = new ArrayDeque<>();
+	}
+
+	@Override
+	public RecordsWithSplitIds<RecordAndPosition<T>> fetch() throws IOException {
+		checkSplitOrStartNext();
+
+		final BulkFormat.RecordIterator<T> nextBatch = currentReader.readBatch();
+		return nextBatch == null ? finishSplit() : FileRecords.forRecords(currentSplitId, nextBatch);
+	}
+
+	@Override
+	public void handleSplitsChanges(final SplitsChange<FileSourceSplit> splitChange) {
+		if (!(splitChange instanceof SplitsAddition)) {
+			throw new UnsupportedOperationException(String.format(
+					"The SplitChange type of %s is not supported.", splitChange.getClass()));
+		}
+
+		LOG.debug("Handling split change {}", splitChange);
+		splits.addAll(splitChange.splits());
+	}
+
+	@Override
+	public void wakeUp() {}
+
+	private void checkSplitOrStartNext() throws IOException {
+		if (currentReader != null) {
+			return;
+		}
+
+		final FileSourceSplit nextSplit = splits.poll();
+		if (nextSplit == null) {
+			throw new IOException("Cannot fetch from another split - no split remaining");
+		}
+
+		currentSplitId = nextSplit.splitId();
+
+		final Optional<CheckpointedPosition> position = nextSplit.getReaderPosition();
+		currentReader = position.isPresent()
+				? readerFactory.restoreReader(config, nextSplit.path(), nextSplit.offset(), nextSplit.length(), position.get())
+				: readerFactory.createReader(config, nextSplit.path(), nextSplit.offset(), nextSplit.length());
+	}
+
+	private FileRecords<T> finishSplit() throws IOException {
+		if (currentReader != null) {
+			currentReader.close();
+			currentReader = null;
+		}
+
+		final FileRecords<T> finishRecords = FileRecords.finishedSplit(currentSplitId);
+		currentSplitId = null;
+		return finishRecords;
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/StaticFileSplitEnumerator.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/StaticFileSplitEnumerator.java
new file mode 100644
index 0000000..05d7897
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/StaticFileSplitEnumerator.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.impl;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.connector.base.source.event.NoMoreSplitsEvent;
+import org.apache.flink.connector.base.source.event.RequestSplitEvent;
+import org.apache.flink.connector.file.src.FileSource;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.PendingSplitsCheckpoint;
+import org.apache.flink.connector.file.src.assigners.FileSplitAssigner;
+import org.apache.flink.connector.file.src.enumerate.FileEnumerator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A SplitEnumerator implementation for bounded / batch {@link FileSource} input.
+ *
+ * <p>This enumerator takes all files that are present in the configured input directories and assigns
+ * them to the readers. Once all files are processed, the source is finished.
+ *
+ * <p>The implementation of this class is rather thin. The actual logic for creating the set of
+ * FileSourceSplits to process, and the logic to decide which reader gets what split, are in
+ * {@link FileEnumerator} and in {@link FileSplitAssigner}, respectively.
+ */
+@Internal
+public class StaticFileSplitEnumerator implements SplitEnumerator<FileSourceSplit, PendingSplitsCheckpoint> {
+
+	private static final Logger LOG = LoggerFactory.getLogger(StaticFileSplitEnumerator.class);
+
+	private final SplitEnumeratorContext<FileSourceSplit> context;
+
+	private final FileSplitAssigner splitAssigner;
+
+	// ------------------------------------------------------------------------
+
+	public StaticFileSplitEnumerator(
+			SplitEnumeratorContext<FileSourceSplit> context,
+			FileSplitAssigner splitAssigner) {
+		this.context = checkNotNull(context);
+		this.splitAssigner = checkNotNull(splitAssigner);
+	}
+
+	@Override
+	public void start() {
+		// no resources to start
+	}
+
+	@Override
+	public void close() throws IOException {
+		// no resources to close
+	}
+
+	@Override
+	public void addReader(int subtaskId) {
+		// this source is purely lazy-pull-based, nothing to do upon registration
+	}
+
+	@Override
+	public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
+		if (sourceEvent instanceof RequestSplitEvent) {
+			final RequestSplitEvent requestSplitEvent = (RequestSplitEvent) sourceEvent;
+			assignNextEvents(subtaskId, requestSplitEvent.hostName());
+		}
+		else {
+			LOG.error("Received unrecognized event: {}", sourceEvent);
+		}
+	}
+
+	@Override
+	public void addSplitsBack(List<FileSourceSplit> splits, int subtaskId) {
+		LOG.debug("File Source Enumerator adds splits back: {}", splits);
+		splitAssigner.addSplits(splits);
+	}
+
+	@Override
+	public PendingSplitsCheckpoint snapshotState() {
+		return PendingSplitsCheckpoint.fromCollectionSnapshot(splitAssigner.remainingSplits());
+	}
+
+	// ------------------------------------------------------------------------
+
+	private void assignNextEvents(int subtask, @Nullable String hostname) {
+		if (LOG.isInfoEnabled()) {
+			final String hostInfo = hostname == null ? "(no host locality info)" : "(on host '" + hostname + "')";
+			LOG.info("Subtask {} {} is requesting a file source split", subtask, hostInfo);
+		}
+
+		final Optional<FileSourceSplit> nextSplit = splitAssigner.getNext(hostname);
+		if (nextSplit.isPresent()) {
+			final FileSourceSplit split = nextSplit.get();
+			context.assignSplit(split, subtask);
+			LOG.info("Assigned split to subtask {} : {}", subtask, split);
+		}
+		else {
+			context.sendEventToSourceReader(subtask, new NoMoreSplitsEvent());
+			LOG.info("No more splits available for subtask {}", subtask);
+		}
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/StreamFormatAdapter.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/StreamFormatAdapter.java
new file mode 100644
index 0000000..e085a0e
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/StreamFormatAdapter.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.impl;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connector.file.src.reader.StreamFormat;
+import org.apache.flink.connector.file.src.util.CheckpointedPosition;
+import org.apache.flink.connector.file.src.util.IteratorResultIterator;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.MathUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import static org.apache.flink.connector.file.src.util.Utils.doWithCleanupOnException;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Adapter to turn a {@link StreamFormat} into a {@link BulkFormat}.
+ */
+@Internal
+public final class StreamFormatAdapter<T> implements BulkFormat<T> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final StreamFormat<T> streamFormat;
+
+	public StreamFormatAdapter(StreamFormat<T> streamFormat) {
+		this.streamFormat = checkNotNull(streamFormat);
+	}
+
+	@Override
+	public BulkFormat.Reader<T> createReader(
+			final Configuration config,
+			final Path filePath,
+			final long splitOffset,
+			final long splitLength) throws IOException {
+
+		final TrackingFsDataInputStream trackingStream = openStream(filePath, config, splitOffset);
+
+		return doWithCleanupOnException(trackingStream, () -> {
+			final StreamFormat.Reader<T> streamReader = streamFormat.createReader(
+					config, trackingStream, trackingStream.getFileLength(), splitOffset + splitLength);
+			return new Reader<>(streamReader, trackingStream, CheckpointedPosition.NO_OFFSET, 0L);
+		});
+	}
+
+	@Override
+	public BulkFormat.Reader<T> restoreReader(
+			final Configuration config,
+			final Path filePath,
+			final long splitOffset,
+			final long splitLength,
+			final CheckpointedPosition checkpointedPosition) throws IOException {
+
+		final TrackingFsDataInputStream trackingStream = openStream(filePath, config, splitOffset);
+
+		return doWithCleanupOnException(trackingStream, () -> {
+			// if there never was a checkpointed offset, yet, we need to initialize the reader like a fresh reader.
+			// see the JavaDocs on StreamFormat.restoreReader() for details
+			final StreamFormat.Reader<T> streamReader = checkpointedPosition.getOffset() == CheckpointedPosition.NO_OFFSET
+					? streamFormat.createReader(
+							config, trackingStream, trackingStream.getFileLength(), splitOffset + splitLength)
+					: streamFormat.restoreReader(
+							config, trackingStream, checkpointedPosition.getOffset(),
+							trackingStream.getFileLength(), splitOffset + splitLength);
+
+			// skip the records to skip, but make sure we close the reader if something goes wrong
+			doWithCleanupOnException(streamReader, () -> {
+				long toSkip = checkpointedPosition.getRecordsAfterOffset();
+				while (toSkip > 0 && streamReader.read() != null) {
+					toSkip--;
+				}
+			});
+
+			return new Reader<>(
+					streamReader, trackingStream,
+					checkpointedPosition.getOffset(), checkpointedPosition.getRecordsAfterOffset());
+		});
+	}
+
+	@Override
+	public boolean isSplittable() {
+		return streamFormat.isSplittable();
+	}
+
+	@Override
+	public TypeInformation<T> getProducedType() {
+		return streamFormat.getProducedType();
+	}
+
+	private static TrackingFsDataInputStream openStream(
+			final Path file,
+			final Configuration config,
+			final long seekPosition) throws IOException {
+
+		final FileSystem fs = file.getFileSystem();
+		final long fileLength = fs.getFileStatus(file).getLen();
+
+		final int fetchSize = MathUtils.checkedDownCast(config.get(StreamFormat.FETCH_IO_SIZE).getBytes());
+		if (fetchSize <= 0) {
+			throw new IllegalConfigurationException(
+					String.format("The fetch size (%s) must be > 0, but is %d",
+							StreamFormat.FETCH_IO_SIZE.key(), fetchSize));
+		}
+
+		final FSDataInputStream inStream = fs.open(file);
+		return doWithCleanupOnException(inStream, () -> {
+			inStream.seek(seekPosition);
+			return new TrackingFsDataInputStream(inStream, fileLength, fetchSize);
+		});
+
+	}
+
+	// ----------------------------------------------------------------------------------
+
+	/**
+	 * The reader adapter, from {@link StreamFormat.Reader} to {@link BulkFormat.Reader}.
+	 */
+	public static final class Reader<T> implements BulkFormat.Reader<T> {
+
+		private final StreamFormat.Reader<T> reader;
+		private final TrackingFsDataInputStream stream;
+		private long lastOffset;
+		private long lastRecordsAfterOffset;
+
+		Reader(
+				final StreamFormat.Reader<T> reader,
+				final TrackingFsDataInputStream stream,
+				final long initialOffset,
+				final long initialSkipCount) {
+
+			this.reader = checkNotNull(reader);
+			this.stream = checkNotNull(stream);
+			this.lastOffset = initialOffset;
+			this.lastRecordsAfterOffset = initialSkipCount;
+		}
+
+		@Nullable
+		@Override
+		public RecordIterator<T> readBatch() throws IOException {
+			updateCheckpointedPosition();
+			stream.newBatch();
+
+			final ArrayList<T> result = new ArrayList<>();
+			T next;
+			while (stream.hasRemainingInBatch() && (next = reader.read()) != null) {
+				result.add(next);
+			}
+
+			if (result.isEmpty()) {
+				return null;
+			}
+
+			final RecordIterator<T> iter = new IteratorResultIterator<>(
+					result.iterator(), lastOffset, lastRecordsAfterOffset);
+			lastRecordsAfterOffset += result.size();
+			return iter;
+		}
+
+		@Override
+		public void close() throws IOException {
+			try {
+				reader.close();
+			} finally {
+				// this is just in case, to guard against resource leaks
+				IOUtils.closeQuietly(stream);
+			}
+		}
+
+		private void updateCheckpointedPosition() {
+			final CheckpointedPosition position = reader.getCheckpointedPosition();
+			if (position != null) {
+				this.lastOffset = position.getOffset();
+				this.lastRecordsAfterOffset = position.getRecordsAfterOffset();
+			}
+		}
+	}
+
+	// ----------------------------------------------------------------------------------
+
+	/**
+	 * Utility stream that tracks how much has been read. This is used to decide when the reader
+	 * should finish the current batch and start the next batch. That way we make the batch sizes
+	 * dependent on the consumed data volume, which is more robust than making it dependent on a
+	 * record count.
+	 */
+	private static final class TrackingFsDataInputStream extends FSDataInputStream {
+
+		private final FSDataInputStream stream;
+		private final long fileLength;
+		private final int batchSize;
+		private int remainingInBatch;
+
+		TrackingFsDataInputStream(FSDataInputStream stream, long fileLength, int batchSize) {
+			checkArgument(fileLength > 0L);
+			checkArgument(batchSize > 0);
+			this.stream = stream;
+			this.fileLength = fileLength;
+			this.batchSize = batchSize;
+		}
+
+		@Override
+		public void seek(long desired) throws IOException {
+			stream.seek(desired);
+			remainingInBatch = 0;  // after each seek, we need to start a new batch
+		}
+
+		@Override
+		public long getPos() throws IOException {
+			return stream.getPos();
+		}
+
+		@Override
+		public int read() throws IOException {
+			remainingInBatch--;
+			return stream.read();
+		}
+
+		@Override
+		public int read(byte[] b, int off, int len) throws IOException {
+			remainingInBatch -= len;
+			return stream.read(b, off, len);
+		}
+
+		@Override
+		public void close() throws IOException {
+			stream.close();
+		}
+
+		boolean hasRemainingInBatch() {
+			return remainingInBatch > 0;
+		}
+
+		void newBatch() {
+			remainingInBatch = batchSize;
+		}
+
+		long getFileLength() {
+			return fileLength;
+		}
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/BulkFormat.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/BulkFormat.java
new file mode 100644
index 0000000..5c378ed
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/BulkFormat.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.reader;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.util.CheckpointedPosition;
+import org.apache.flink.connector.file.src.util.MutableRecordAndPosition;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+import org.apache.flink.core.fs.Path;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * The {@code BulkFormat} reads and decodes batches of records at a time. Examples of bulk formats
+ * are formats like ORC or Parquet.
+ *
+ * <p>The outer {@code 'BulkFormat'} class acts mainly as a configuration holder and factory for the reader.
+ * The actual reading is done by the {@link BulkFormat.Reader}, which is created in the
+ * {@link BulkFormat#createReader(Configuration, Path, long, long)} method. If a bulk reader is created
+ * based on a checkpoint during checkpointed streaming execution, then the reader is re-created in
+ * the {@link BulkFormat#restoreReader(Configuration, Path, long, long, CheckpointedPosition)} method.
+ *
+ * <h2>Splitting</h2>
+ *
+ * <p>File splitting means dividing a file into multiple regions that can be read independently.
+ * Whether a format supports splitting is indicated via the {@link #isSplittable()} method.
+ *
+ * <p>Splitting has the potential to increase parallelism and performance, but poses additional
+ * constraints on the format readers: Readers need to be able to find a consistent starting point
+ * within the file near the offset where the split starts, (like the next record delimiter, or a
+ * block start or a sync marker). This is not necessarily possible for all formats, which is why
+ * splitting is optional.
+ *
+ * <h2>Checkpointing</h2>
+ *
+ * <p>The bulk reader returns an iterator per batch that it reads. The iterator produces records together
+ * with a position. That position is stored in the checkpointed state atomically with the processing of
+ * the record. That means it must be the position from where the reading can be resumed AFTER
+ * the record was processed; the position hence points effectively to the record AFTER the current record.
+ *
+ * <p>The simplest way to return this position information is to store no offset and simply store an
+ * incrementing count of records to skip after recovery. Given the above contract, the fist record would
+ * be returned with a records-to-skip count of one, the second one with a record count of two, etc.
+ *
+ * <p>Formats that have the ability to efficiently seek to a record (or to every n-th record) can use
+ * the position field to seek to a record directly and avoid having to read and discard many records
+ * on recovery.
+ *
+ * <p>Note on this design: Why do we not make the position point to the current record and always skip
+ * one record after recovery (the just processed record)? We need to be able to support formats where
+ * skipping records (even one) is not an option. For example formats that execute (pushed down) filters
+ * may want to avoid a skip-record-count all together, so that they don't skip the wrong records when
+ * the filter gets updated around a checkpoint/savepoint.
+ *
+ * <h2>Serializable</h2>
+ *
+ * <p>Like many other API classes in Flink, the outer class is serializable to support sending instances
+ * to distributed workers for parallel execution. This is purely short-term serialization for RPC and
+ * no instance of this will be long-term persisted in a serialized form.
+ *
+ * <h2>Record Batching</h2>
+ *
+ * <p>Internally in the file source, the readers pass batches of records from the reading
+ * threads (that perform the typically blocking I/O operations) to the async mailbox threads that
+ * do the streaming and batch data processing. Passing records in batches (rather than one-at-a-time)
+ * much reduce the thread-to-thread handover overhead.
+ *
+ * <p>For the {@code BulkFormat}, one batch (as returned by {@link BulkFormat.Reader#readBatch()}) is
+ * handed over as one.
+ */
+@PublicEvolving
+public interface BulkFormat<T> extends Serializable, ResultTypeQueryable<T> {
+
+	/**
+	 * Creates a new reader that reads from {@code filePath} starting at {@code offset} and reads
+	 * until {@code length} bytes after the offset.
+	 */
+	BulkFormat.Reader<T> createReader(
+			Configuration config,
+			Path filePath,
+			long splitOffset,
+			long splitLength) throws IOException;
+
+	/**
+	 * Creates a new reader that reads from {@code filePath} starting at {@code offset} and reads
+	 * until {@code length} bytes after the offset. A number of {@code recordsToSkip} records should be
+	 * read and discarded after the offset. This is typically part of restoring a reader to a checkpointed
+	 * position.
+	 */
+	BulkFormat.Reader<T> restoreReader(
+			Configuration config,
+			Path filePath,
+			long splitOffset,
+			long splitLength,
+			CheckpointedPosition checkpointedPosition) throws IOException;
+
+	/**
+	 * Checks whether this format is splittable. Splittable formats allow Flink to create multiple splits
+	 * per file, so that Flink can read multiple regions of the file concurrently.
+	 *
+	 * <p>See {@link BulkFormat top-level JavaDocs} (section "Splitting") for details.
+	 */
+	boolean isSplittable();
+
+	/**
+	 * Gets the type produced by this format. This type will be the type produced by the file
+	 * source as a whole.
+	 */
+	@Override
+	TypeInformation<T> getProducedType();
+
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * The actual reader that reads the batches of records.
+	 */
+	interface Reader<T> extends Closeable {
+
+		/**
+		 * Reads one batch. The method should return null when reaching the end of the input.
+		 * The returned batch will be handed over to the processing threads as one.
+		 *
+		 * <p>The returned iterator object and any contained objects may be held onto by the file source
+		 * for some time, so it should not be immediately reused by the reader.
+		 *
+		 * <p>To implement reuse and to save object allocation, consider using a
+		 * {@link org.apache.flink.connector.file.src.util.Pool} and recycle objects into the Pool in the
+		 * the {@link RecordIterator#releaseBatch()} method.
+		 */
+		@Nullable
+		RecordIterator<T> readBatch() throws IOException;
+
+		/**
+		 * Closes the reader and should release all resources.
+		 */
+		@Override
+		void close() throws IOException;
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * An iterator over records with their position in the file.
+	 * The iterator is closeable to support clean resource release and recycling.
+	 *
+	 * @param <T> The type of the record.
+	 */
+	interface RecordIterator<T> {
+
+		/**
+		 * Gets the next record from the file, together with its position.
+		 *
+		 * <p>The position information returned with the record point to the record AFTER the returned
+		 * record, because it defines the point where the reading should resume once the current record
+		 * is emitted. The position information is put in the source's state when the record
+		 * is emitted. If a checkpoint is taken directly after the record is emitted, the checkpoint must
+		 * to describe where to resume the source reading from after that record.
+		 *
+		 * <p>Objects returned by this method may be reused by the iterator. By the time that this
+		 * method is called again, no object returned from the previous call will be referenced any more.
+		 * That makes it possible to have a single {@link MutableRecordAndPosition} object and return the
+		 * same instance (with updated record and position) on every call.
+		 */
+		@Nullable
+		RecordAndPosition<T> next();
+
+		/**
+		 * Releases the batch that this iterator iterated over.
+		 * This is not supposed to close the reader and its resources, but is simply a signal that this
+		 * iterator is no used any more. This method can be used as a hook to recycle/reuse heavyweight
+		 * object structures.
+		 */
+		void releaseBatch();
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/FileRecordFormat.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/FileRecordFormat.java
new file mode 100644
index 0000000..a5f1def
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/FileRecordFormat.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.reader;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.util.CheckpointedPosition;
+import org.apache.flink.core.fs.Path;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * A reader format that reads individual records from a file.
+ *
+ * <p>This format is for cases where the readers need access to the file directly or need to create a custom
+ * stream. For readers that can directly on input streams, consider using the {@link StreamFormat}, which
+ * is more robust.
+ *
+ * <p>The outer class {@code FileRecordFormat} acts mainly as a configuration holder and factory for the reader.
+ * The actual reading is done by the {@link FileRecordFormat.Reader}, which is created based on an
+ * input stream in the {@link #createReader(Configuration, Path, long, long)} method
+ * and restored (from checkpointed positions) in the method
+ * {@link #restoreReader(Configuration, Path, long, long, long)}.
+ *
+ * <h2>Splitting</h2>
+ *
+ * <p>File splitting means dividing a file into multiple regions that can be read independently.
+ * Whether a format supports splitting is indicated via the {@link #isSplittable()} method.
+ *
+ * <p>Splitting has the potential to increase parallelism and performance, but poses additional
+ * constraints on the format readers: Readers need to be able to find a consistent starting point
+ * within the file near the offset where the split starts, (like the next record delimiter, or a
+ * block start or a sync marker). This is not necessarily possible for all formats, which is why
+ * splitting is optional.
+ *
+ * <h2>Checkpointing</h2>
+ *
+ * <p>Readers can optionally return the current position of the reader, via the
+ * {@link FileRecordFormat.Reader#getCheckpointedPosition()}. This can improve recovery speed from
+ * a checkpoint.
+ *
+ * <p>By default (if that method is not overridden or returns null), then recovery from a checkpoint
+ * works by reading the split again and skipping the number of records that were processed before
+ * the checkpoint. Implementing this method allows formats to directly seek to that position, rather
+ * than read and discard a number or records.
+ *
+ * <p>The position is a combination of offset in the file and a number of records to skip after
+ * this offset (see {@link CheckpointedPosition}). This helps formats that cannot describe all
+ * record positions by an offset, for example  because records are compressed in batches or stored
+ * in a columnar layout (e.g., ORC, Parquet).
+ * The default behavior can be viewed as returning a {@code CheckpointedPosition} where the offset
+ * is always zero and only the {@link CheckpointedPosition#getRecordsAfterOffset()} is incremented
+ * with each emitted record.
+ *
+ * <h2>Serializable</h2>
+ *
+ * <p>Like many other API classes in Flink, the outer class is serializable to support sending instances
+ * to distributed workers for parallel execution. This is purely short-term serialization for RPC and
+ * no instance of this will be long-term persisted in a serialized form.
+ *
+ * <h2>Record Batching</h2>
+ *
+ * <p>Internally in the file source, the readers pass batches of records from the reading
+ * threads (that perform the typically blocking I/O operations) to the async mailbox threads that
+ * do the streaming and batch data processing. Passing records in batches (rather than one-at-a-time)
+ * much reduce the thread-to-thread handover overhead.
+ *
+ * <p>This batching is by default based a number of records. See {@link FileRecordFormat#RECORDS_PER_FETCH}
+ * to configure that handover batch size.
+ */
+@PublicEvolving
+public interface FileRecordFormat<T> extends Serializable, ResultTypeQueryable<T> {
+
+	/**
+	 * Creates a new reader to read in this format. This method is called when a fresh reader is
+	 * created for a split that was assigned from the enumerator. This method may also be called on
+	 * recovery from a checkpoint, if the reader never stored an offset in the checkpoint
+	 * (see {@link #restoreReader(Configuration, Path, long, long, long)} for details.
+	 *
+	 * <p>The {@code fileLen} is the length of the entire file, while {@code splitEnd} is the offset of
+	 * the first byte after the split end boundary (exclusive end boundary). For non-splittable formats,
+	 * both values are identical.
+	 */
+	FileRecordFormat.Reader<T> createReader(
+			Configuration config,
+			Path filePath,
+			long splitOffset,
+			long splitLength) throws IOException;
+
+	/**
+	 * Restores a reader from a checkpointed position. This method is called when the reader is recovered
+	 * from a checkpoint and the reader has previously stored an offset into the checkpoint, by returning
+	 * from the {@link FileRecordFormat.Reader#getCheckpointedPosition()} a value with non-negative
+	 * {@link CheckpointedPosition#getOffset() offset}. That value is supplied as the {@code restoredOffset}.
+	 *
+	 * <p>If the reader never produced a {@code CheckpointedPosition} with a non-negative offset before, then
+	 * this method is not called, and the reader is created in the same way as a fresh reader via the method
+	 * {@link #createReader(Configuration, Path, long, long)} and the appropriate number of
+	 * records are read and discarded, to position to reader to the checkpointed position.
+	 *
+	 * <p>The {@code fileLen} is the length of the entire file, while {@code splitEnd} is the offset of
+	 * the first byte after the split end boundary (exclusive end boundary). For non-splittable formats,
+	 * both values are identical.
+	 */
+	FileRecordFormat.Reader<T> restoreReader(
+			Configuration config,
+			Path filePath,
+			long restoredOffset,
+			long splitOffset,
+			long splitLength) throws IOException;
+
+	/**
+	 * Checks whether this format is splittable. Splittable formats allow Flink to create multiple splits
+	 * per file, so that Flink can read multiple regions of the file concurrently.
+	 *
+	 * <p>See {@link StreamFormat top-level JavaDocs} (section "Splitting") for details.
+	 */
+	boolean isSplittable();
+
+	/**
+	 * Gets the type produced by this format. This type will be the type produced by the file
+	 * source as a whole.
+	 */
+	@Override
+	TypeInformation<T> getProducedType();
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Config option for the number of records to hand over in each fetch.
+	 *
+	 * <p>The number should be large enough so that the thread-to-thread handover overhead
+	 * is amortized across the records, but small enough so that the these records together do
+	 * not consume too memory to be feasible.
+	 */
+	ConfigOption<Integer> RECORDS_PER_FETCH = ConfigOptions
+		.key("source.file.records.fetch-size")
+		.intType()
+		.defaultValue(128)
+		.withDescription("The number of records to hand over from the I/O thread to file reader in one unit.");
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * The actual reader that reads the records.
+	 */
+	interface Reader<T> extends Closeable {
+
+		/**
+		 * Reads the next record. Returns {@code null} when the input has reached its end.
+		 */
+		@Nullable
+		T read() throws IOException;
+
+		/**
+		 * Closes the reader to release all resources.
+		 */
+		@Override
+		void close() throws IOException;
+
+		/**
+		 * Optionally returns the current position of the reader. This can be implemented by readers that
+		 * want to speed up recovery from a checkpoint.
+		 *
+		 * <p>The current position of the reader is the position of the next record that will be returned
+		 * in a call to {@link #read()}. This can be implemented by readers that want to speed up recovery
+		 * from a checkpoint.
+		 *
+		 * <p>See the {@link FileRecordFormat top-level class comment} (section "Checkpointing") for details.
+		 */
+		@Nullable
+		default CheckpointedPosition getCheckpointedPosition() {
+			return null;
+		}
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/SimpleStreamFormat.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/SimpleStreamFormat.java
new file mode 100644
index 0000000..938b40e
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/SimpleStreamFormat.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.reader;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.util.CheckpointedPosition;
+import org.apache.flink.core.fs.FSDataInputStream;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A simple version of the {@link StreamFormat}, for formats that are not splittable.
+ *
+ * <p>This format makes no difference between creating readers from scratch (new file) or from a
+ * checkpoint. Because of that, if the reader actively checkpoints its position
+ * (via the {@link Reader#getCheckpointedPosition()} method) then the checkpointed offset must be
+ * a byte offset in the file from which the stream can be resumed as if it were te beginning of the file.
+ *
+ * <p>For all other details, please check the docs of {@link StreamFormat}.
+ *
+ * @param <T> The type of records created by this format reader.
+ */
+@PublicEvolving
+public abstract class SimpleStreamFormat<T> implements StreamFormat<T> {
+
+	private static final long serialVersionUID = 1L;
+
+	/**
+	 * Creates a new reader. This method is called both for the creation of new reader (from the beginning
+	 * of a file) and for restoring checkpointed readers.
+	 *
+	 * <p>If the reader previously checkpointed an offset, then the input stream will be positioned to
+	 * that particular offset. Readers checkpoint an offset by returning a value from the method
+	 * {@link Reader#getCheckpointedPosition()} method with an offset other than
+	 * {@link CheckpointedPosition#NO_OFFSET}).
+	 */
+	public abstract Reader<T> createReader(Configuration config, FSDataInputStream stream) throws IOException;
+
+	/**
+	 * Gets the type produced by this format. This type will be the type produced by the file
+	 * source as a whole.
+	 */
+	@Override
+	public abstract TypeInformation<T> getProducedType();
+
+	// ------------------------------------------------------------------------
+	//  pre-defined methods from Stream Format
+	// ------------------------------------------------------------------------
+
+	/**
+	 * This format is always not splittable.
+	 */
+	@Override
+	public final boolean isSplittable() {
+		return false;
+	}
+
+	@Override
+	public final Reader<T> createReader(
+			Configuration config,
+			FSDataInputStream stream,
+			long fileLen,
+			long splitEnd) throws IOException {
+
+		checkNotSplit(fileLen, splitEnd);
+
+		final long streamPos = stream.getPos();
+		checkArgument(streamPos == 0L,
+				"SimpleStreamFormat is not splittable, but found non-zero stream position (%s)",
+				streamPos);
+
+		return createReader(config, stream);
+	}
+
+	@Override
+	public final Reader<T> restoreReader(
+			final Configuration config,
+			final FSDataInputStream stream,
+			final long restoredOffset,
+			final long fileLen,
+			final long splitEnd) throws IOException {
+
+		checkNotSplit(fileLen, splitEnd);
+		stream.seek(restoredOffset);
+		return createReader(config, stream);
+	}
+
+	private static void checkNotSplit(long fileLen, long splitEnd) {
+		if (splitEnd != fileLen) {
+			throw new IllegalArgumentException(String.format(
+					"SimpleStreamFormat is not splittable, but found split end (%d) different from file length (%d)",
+					splitEnd, fileLen));
+		}
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/StreamFormat.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/StreamFormat.java
new file mode 100644
index 0000000..57424b2
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/StreamFormat.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.reader;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.connector.file.src.util.CheckpointedPosition;
+import org.apache.flink.core.fs.FSDataInputStream;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * A reader format that reads individual records from a stream.
+ *
+ * <p>The outer class {@code StreamFormat} acts mainly as a configuration holder and factory for the reader.
+ * The actual reading is done by the {@link StreamFormat.Reader}, which is created based on an
+ * input stream in the {@link #createReader(Configuration, FSDataInputStream, long, long)} method
+ * and restored (from checkpointed positions) in the method
+ * {@link #restoreReader(Configuration, FSDataInputStream, long, long, long)}.
+ *
+ * <p>Compared to the {@link BulkFormat}, the stream format handles a few things out-of-the-box, like
+ * deciding how to batch records or dealing with compression.
+ *
+ * <p>For a simpler version of this interface, for format that do not support splitting or logical record
+ * offsets during checkpointing, see {@link SimpleStreamFormat}.
+ *
+ * <h2>Splitting</h2>
+ *
+ * <p>File splitting means dividing a file into multiple regions that can be read independently.
+ * Whether a format supports splitting is indicated via the {@link #isSplittable()} method.
+ *
+ * <p>Splitting has the potential to increase parallelism and performance, but poses additional
+ * constraints on the format readers: Readers need to be able to find a consistent starting point
+ * within the file near the offset where the split starts, (like the next record delimiter, or a
+ * block start or a sync marker). This is not necessarily possible for all formats, which is why
+ * splitting is optional.
+ *
+ * <h2>Checkpointing</h2>
+ *
+ * <p>Readers can optionally return the current position of the reader, via the
+ * {@link StreamFormat.Reader#getCheckpointedPosition()}. This can improve recovery speed from
+ * a checkpoint.
+ *
+ * <p>By default (if that method is not overridden or returns null), then recovery from a checkpoint
+ * works by reading the split again and skipping the number of records that were processed before
+ * the checkpoint. Implementing this method allows formats to directly seek to that position, rather
+ * than read and discard a number or records.
+ *
+ * <p>The position is a combination of offset in the file and a number of records to skip after
+ * this offset (see {@link CheckpointedPosition}). This helps formats that cannot describe all
+ * record positions by an offset, for example  because records are compressed in batches or stored
+ * in a columnar layout (e.g., ORC, Parquet).
+ * The default behavior can be viewed as returning a {@code CheckpointedPosition} where the offset
+ * is always zero and only the {@link CheckpointedPosition#getRecordsAfterOffset()} is incremented
+ * with each emitted record.
+ *
+ * <h2>Serializable</h2>
+ *
+ * <p>Like many other API classes in Flink, the outer class is serializable to support sending instances
+ * to distributed workers for parallel execution. This is purely short-term serialization for RPC and
+ * no instance of this will be long-term persisted in a serialized form.
+ *
+ * <h2>Record Batching</h2>
+ *
+ * <p>Internally in the file source, the readers pass batches of records from the reading
+ * threads (that perform the typically blocking I/O operations) to the async mailbox threads that
+ * do the streaming and batch data processing. Passing records in batches (rather than one-at-a-time)
+ * much reduces the thread-to-thread handover overhead.
+ *
+ * <p>This batching is by default based on I/O fetch size for the {@code StreamFormat}, meaning
+ * the set of records derived from one I/O buffer will be handed over as one.
+ * See {@link StreamFormat#FETCH_IO_SIZE} to configure that fetch size.
+ *
+ * @param <T> The type of records created by this format reader.
+ */
+@PublicEvolving
+public interface StreamFormat<T> extends Serializable, ResultTypeQueryable<T> {
+
+	/**
+	 * Creates a new reader to read in this format. This method is called when a fresh reader is
+	 * created for a split that was assigned from the enumerator. This method may also be called on
+	 * recovery from a checkpoint, if the reader never stored an offset in the checkpoint
+	 * (see {@link #restoreReader(Configuration, FSDataInputStream, long, long, long)} for details.
+	 *
+	 * <p>If the format is {@link #isSplittable() splittable}, then the {@code stream} is positioned
+	 * to the beginning of the file split, otherwise it will be at position zero.
+	 *
+	 * <p>The {@code fileLen} is the length of the entire file, while {@code splitEnd} is the offset of
+	 * the first byte after the split end boundary (exclusive end boundary). For non-splittable formats,
+	 * both values are identical.
+	 */
+	Reader<T> createReader(
+			Configuration config,
+			FSDataInputStream stream,
+			long fileLen,
+			long splitEnd) throws IOException;
+
+	/**
+	 * Restores a reader from a checkpointed position. This method is called when the reader is recovered
+	 * from a checkpoint and the reader has previously stored an offset into the checkpoint, by returning
+	 * from the {@link Reader#getCheckpointedPosition()} a value with non-negative
+	 * {@link CheckpointedPosition#getOffset() offset}. That value is supplied as the {@code restoredOffset}.
+	 *
+	 * <p>If the format is {@link #isSplittable() splittable}, then the {@code stream} is positioned
+	 * to the beginning of the file split, otherwise it will be at position zero. The stream is NOT
+	 * positioned to the checkpointed offset, because the format is free to interpret this offset in
+	 * a different way than the byte offset in the file (for example as a record index).
+	 *
+	 * <p>If the reader never produced a {@code CheckpointedPosition} with a non-negative offset before, then
+	 * this method is not called, and the reader is created in the same way as a fresh reader via the method
+	 * {@link #createReader(Configuration, FSDataInputStream, long, long)} and the appropriate number of
+	 * records are read and discarded, to position to reader to the checkpointed position.
+	 *
+	 * <p>Having a different method for restoring readers to a checkpointed position allows readers to
+	 * seek to the start position differently in that case, compared to when the reader is created from
+	 * a split offset generated at the enumerator. In the latter case, the offsets are commonly "approximate",
+	 * because the enumerator typically generates splits based only on metadata. Reader then have to skip some
+	 * bytes while searching for the next position to start from (based on a delimiter, sync marker, block
+	 * offset, etc.). In contrast, checkpointed offsets are often precise, because they were recorded as the
+	 * reader when through the data stream. Starting a reader from a checkpointed offset may hence not
+	 * require and search for the next delimiter/block/marker.
+	 *
+	 * <p>The {@code fileLen} is the length of the entire file, while {@code splitEnd} is the offset of
+	 * the first byte after the split end boundary (exclusive end boundary). For non-splittable formats,
+	 * both values are identical.
+	 */
+	Reader<T> restoreReader(
+			Configuration config,
+			FSDataInputStream stream,
+			long restoredOffset,
+			long fileLen,
+			long splitEnd) throws IOException;
+
+	/**
+	 * Checks whether this format is splittable. Splittable formats allow Flink to create multiple splits
+	 * per file, so that Flink can read multiple regions of the file concurrently.
+	 *
+	 * <p>See {@link StreamFormat top-level JavaDocs} (section "Splitting") for details.
+	 */
+	boolean isSplittable();
+
+	/**
+	 * Gets the type produced by this format. This type will be the type produced by the file
+	 * source as a whole.
+	 */
+	@Override
+	TypeInformation<T> getProducedType();
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * The config option to define how many bytes to be read by the I/O thread in one fetch operation.
+	 */
+	ConfigOption<MemorySize> FETCH_IO_SIZE = ConfigOptions
+			.key("source.file.stream.io-fetch-size")
+			.memoryType()
+			.defaultValue(MemorySize.ofMebiBytes(1L))
+			.withDescription("The approximate of bytes per fetch that is passed from the I/O thread to file reader.");
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * The actual reader that reads the records.
+	 */
+	interface Reader<T> extends Closeable {
+
+		/**
+		 * Reads the next record. Returns {@code null} when the input has reached its end.
+		 */
+		@Nullable
+		T read() throws IOException;
+
+		/**
+		 * Closes the reader to release all resources.
+		 */
+		@Override
+		void close() throws IOException;
+
+		/**
+		 * Optionally returns the current position of the reader. This can be implemented by readers that
+		 * want to speed up recovery from a checkpoint.
+		 *
+		 * <p>The current position of the reader is the position of the next record that will be returned
+		 * in a call to {@link #read()}. This can be implemented by readers that want to speed up recovery
+		 * from a checkpoint.
+		 *
+		 * <p>See the {@link StreamFormat top-level class comment} (section "Checkpointing") for details.
+		 */
+		@Nullable
+		default CheckpointedPosition getCheckpointedPosition() {
+			return null;
+		}
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/TextLineFormat.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/TextLineFormat.java
new file mode 100644
index 0000000..4eccd36
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/TextLineFormat.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.reader;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FSDataInputStream;
+
+import javax.annotation.Nullable;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+/**
+ * A reader format that text lines from a file.
+ *
+ * <p>The reader uses Java's built-in {@link InputStreamReader} to decode the byte stream using various
+ * supported charset encodings.
+ *
+ * <p>This format does not support optimized recovery from checkpoints. On recovery, it will re-read and
+ * discard the number of lined that were processed before the last checkpoint. That is due to the fact
+ * that the offsets of lines in the file cannot be tracked through the charset decoders with their
+ * internal buffering of stream input and charset decoder state.
+ */
+@PublicEvolving
+public class TextLineFormat extends SimpleStreamFormat<String> {
+
+	private static final long serialVersionUID = 1L;
+
+	public static final String DEFAULT_CHARSET_NAME = "UTF-8";
+
+	private final String charsetName;
+
+	public TextLineFormat() {
+		this(DEFAULT_CHARSET_NAME);
+	}
+
+	public TextLineFormat(String charsetName) {
+		this.charsetName = charsetName;
+	}
+
+	@Override
+	public Reader createReader(Configuration config, FSDataInputStream stream) throws IOException {
+		final BufferedReader reader = new BufferedReader(new InputStreamReader(stream, charsetName));
+		return new Reader(reader);
+	}
+
+	@Override
+	public TypeInformation<String> getProducedType() {
+		return Types.STRING;
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * The actual reader for the {@code TextLineFormat}.
+	 */
+	public static final class Reader implements StreamFormat.Reader<String> {
+
+		private final BufferedReader reader;
+
+		Reader(final BufferedReader reader) {
+			this.reader = reader;
+		}
+
+		@Nullable
+		@Override
+		public String read() throws IOException {
+			return reader.readLine();
+		}
+
+		@Override
+		public void close() throws IOException {
+			reader.close();
+		}
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/ArrayResultIterator.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/ArrayResultIterator.java
new file mode 100644
index 0000000..af500c5
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/ArrayResultIterator.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.util;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+
+import javax.annotation.Nullable;
+
+/**
+ * A simple {@link BulkFormat.RecordIterator} that returns the elements of an array, one after the other.
+ * The iterator is mutable to support object reuse and supports recycling.
+ *
+ * @param <E> The type of the record returned by the iterator.
+ */
+@PublicEvolving
+public final class ArrayResultIterator<E> extends RecyclableIterator<E> implements BulkFormat.RecordIterator<E> {
+
+	private E[] records;
+	private int num;
+	private int pos;
+
+	private final MutableRecordAndPosition<E> recordAndPosition;
+
+	/**
+	 * Creates a new, initially empty, {@code ArrayResultIterator}.
+	 */
+	public ArrayResultIterator() {
+		this(null);
+	}
+
+	/**
+	 * Creates a new, initially empty, {@code ArrayResultIterator}.
+	 * The iterator calls the given {@code recycler} when the {@link #releaseBatch()} method is called.
+	 */
+	public ArrayResultIterator(@Nullable Runnable recycler) {
+		super(recycler);
+		this.recordAndPosition = new MutableRecordAndPosition<>();
+	}
+
+	// -------------------------------------------------------------------------
+	//  Setting
+	// -------------------------------------------------------------------------
+
+	/**
+	 * Sets the records to be returned by this iterator.
+	 * Each record's {@link RecordAndPosition} will have the same offset (for {@link RecordAndPosition#getOffset()}.
+	 * The first returned record will have a records-to-skip count of {@code skipCountOfFirst + 1}, following
+	 * the contract that each record needs to point to the position AFTER itself
+	 * (because a checkpoint taken after the record was emitted needs to resume from after that record).
+	 */
+	public void set(final E[] records, final int num, final long offset, final long skipCountOfFirst) {
+		this.records = records;
+		this.num = num;
+		this.pos = 0;
+		this.recordAndPosition.set(null, offset, skipCountOfFirst);
+	}
+
+	// -------------------------------------------------------------------------
+	//  Result Iterator Methods
+	// -------------------------------------------------------------------------
+
+	@Nullable
+	@Override
+	public RecordAndPosition<E> next() {
+		if (pos < num) {
+			recordAndPosition.setNext(records[pos++]);
+			return recordAndPosition;
+		}
+		else {
+			return null;
+		}
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/CheckpointedPosition.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/CheckpointedPosition.java
new file mode 100644
index 0000000..bbdd09c
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/CheckpointedPosition.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.util;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * The position of a reader, to be stored in a checkpoint. The position consists of a record offset
+ * and a number of records to skip after that offset. The offset is optional, it may take the value
+ * {@link CheckpointedPosition#NO_OFFSET}, in which case only the records-to-skip count is used.
+ *
+ * <p>The combination of offset and records-to-skip makes it possible to represent the position of a wide
+ * variety of readers. In the simplest case, readers might store no offset and only store how many
+ * records they previously returned. On the other hand, readers that can precisely point to each
+ * record via a position can store that in the checkpoint. Readers that have occasional addressable positions
+ * (like sync markers, block starts, etc.) can store those together with the records skipped after the
+ * last marker.
+ */
+@PublicEvolving
+public final class CheckpointedPosition implements Serializable {
+
+	private static final long serialVersionUID = 1L;
+
+	/**
+	 * Constant for the offset, reflecting that the position does not contain any offset information.
+	 * It is used in positions that are defined only by a number of records to skip.
+	 */
+	public static final long NO_OFFSET = -1L;
+
+	private final long offset;
+	private final long recordsAfterOffset;
+
+	/**
+	 * Creates a new CheckpointedPosition for given offset and records-to-skip.
+	 *
+	 * @param offset The offset that the reader will seek to when restored from this checkpoint.
+	 * @param recordsAfterOffset The records to skip after the offset.
+	 */
+	public CheckpointedPosition(long offset, long recordsAfterOffset) {
+		checkArgument(offset >= -1, "offset must be >= 0 or NO_OFFSET");
+		checkArgument(recordsAfterOffset >= 0, "recordsAfterOffset must be >= 0");
+		this.offset = offset;
+		this.recordsAfterOffset = recordsAfterOffset;
+	}
+
+	/**
+	 * Gets the offset that the reader will seek to when restored from this checkpoint.
+	 */
+	public long getOffset() {
+		return offset;
+	}
+
+	/**
+	 * Gets the records to skip after the offset.
+	 */
+	public long getRecordsAfterOffset() {
+		return recordsAfterOffset;
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		final CheckpointedPosition that = (CheckpointedPosition) o;
+		return offset == that.offset &&
+			recordsAfterOffset == that.recordsAfterOffset;
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(offset, recordsAfterOffset);
+	}
+
+	@Override
+	public String toString() {
+		return String.format("CheckpointedPosition: offset=%s, recordsToSkip=%d",
+				offset == NO_OFFSET ? "NO_OFFSET" : String.valueOf(offset),
+				recordsAfterOffset);
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/IteratorResultIterator.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/IteratorResultIterator.java
new file mode 100644
index 0000000..5369e54
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/IteratorResultIterator.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.util;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+
+import javax.annotation.Nullable;
+
+import java.util.Iterator;
+
+/**
+ * A simple {@link BulkFormat.RecordIterator} that returns the elements of an iterator,
+ * augmented with position information.
+ *
+ * @param <E> The type of the record returned by the iterator.
+ */
+@PublicEvolving
+public final class IteratorResultIterator<E> extends RecyclableIterator<E> implements BulkFormat.RecordIterator<E> {
+
+	private final Iterator<E> records;
+
+	private final MutableRecordAndPosition<E> recordAndPosition;
+
+	/**
+	 * Creates a new {@code RecordIterator} returning the records from the given iterator, augmented
+	 * with their position information.
+	 *
+	 * <p>Each record's {@link RecordAndPosition} will have the same offset value for
+	 * {@link RecordAndPosition#getOffset()}. The first returned record will have a records-to-skip count
+	 * of {@code startingSkipCount + 1}, following the contract that each record needs to point to the
+	 * position AFTER itself (because a checkpoint taken after the record was emitted needs to resume
+	 * from after that record).
+	 */
+	public IteratorResultIterator(
+			final Iterator<E> records,
+			final long offset,
+			final long startingSkipCount) {
+		this(records, offset, startingSkipCount, null);
+	}
+
+	/**
+	 * Creates a new {@code RecordIterator} returning the records from the given iterator, augmented
+	 * with their position information. When the iterator is marked as done (via {@link #releaseBatch()}, the
+	 * given {@code recycler} is called.
+	 *
+	 * <p>Each record's {@link RecordAndPosition} will have the same offset value for
+	 * {@link RecordAndPosition#getOffset()}. The first returned record will have a records-to-skip count
+	 * of {@code startingSkipCount + 1}, following the contract that each record needs to point to the
+	 * position AFTER itself (because a checkpoint taken after the record was emitted needs to resume
+	 * from after that record).
+	 */
+	public IteratorResultIterator(
+			final Iterator<E> records,
+			final long offset,
+			final long startingSkipCount,
+			final @Nullable Runnable recycler) {
+
+		super(recycler);
+		this.records = records;
+		this.recordAndPosition = new MutableRecordAndPosition<>();
+		this.recordAndPosition.setPosition(offset, startingSkipCount);
+	}
+
+	// -------------------------------------------------------------------------
+	//  Result Iterator Methods
+	// -------------------------------------------------------------------------
+
+	@Nullable
+	@Override
+	public RecordAndPosition<E> next() {
+		if (records.hasNext()) {
+			recordAndPosition.setNext(records.next());
+			return recordAndPosition;
+		} else {
+			return null;
+		}
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/MutableRecordAndPosition.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/MutableRecordAndPosition.java
new file mode 100644
index 0000000..07de831
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/MutableRecordAndPosition.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.util;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+
+/**
+ * A mutable version of the {@link RecordAndPosition}.
+ *
+ * <p>This mutable object is useful in cases where only once instance of a {@code RecordAndPosition}
+ * is needed at a time, like for the result values of the {@link BulkFormat.RecordIterator}.
+ */
+@PublicEvolving
+public class MutableRecordAndPosition<E> extends RecordAndPosition<E> {
+
+	/**
+	 * Updates the record and position in this object.
+	 */
+	public void set(E record, long offset, long recordSkipCount) {
+		this.record = record;
+		this.offset = offset;
+		this.recordSkipCount = recordSkipCount;
+	}
+
+	/**
+	 * Sets the position without setting a record.
+	 */
+	public void setPosition(long offset, long recordSkipCount) {
+		this.offset = offset;
+		this.recordSkipCount = recordSkipCount;
+	}
+
+	/**
+	 * Sets the next record of a sequence. This increments the {@code recordSkipCount} by one.
+	 */
+	public void setNext(E record) {
+		this.record = record;
+		this.recordSkipCount++;
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/Pool.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/Pool.java
new file mode 100644
index 0000000..274e775
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/Pool.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.util;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.ArrayBlockingQueue;
+
+/**
+ * A pool to cache and recycle heavyweight objects, to reduce object allocation.
+ *
+ * <p>This pool can be used in the {@link BulkFormat.Reader}, when the returned objects are heavyweight
+ * and need to be reused for efficiency. Because the reading happens in I/O threads while the record
+ * processing happens in Flink's main processing threads, these objects cannot be reused immediately
+ * after being returned. They can be reused, once they are recycled back to the pool.
+ *
+ * @param <T> The type of object cached in the pool.
+ */
+@PublicEvolving
+public class Pool<T> {
+
+	private final ArrayBlockingQueue<T> pool;
+
+	private final Recycler<T> recycler;
+
+	private final int poolCapacity;
+	private int poolSize;
+
+	/**
+	 * Creates a pool with the given capacity. No more than that many elements may be added to the pool.
+	 */
+	public Pool(int poolCapacity) {
+		this.pool = new ArrayBlockingQueue<>(poolCapacity);
+		this.recycler = this::addBack;
+		this.poolCapacity = poolCapacity;
+		this.poolSize = 0;
+	}
+
+	/**
+	 * Gets the recycler for this pool. The recycler returns its given objects back to this pool.
+	 */
+	public Recycler<T> recycler() {
+		return recycler;
+	}
+
+	/**
+	 * Adds an entry to the pool with an optional payload.
+	 * This method fails if called more often than the pool capacity specified during construction.
+	 */
+	public synchronized void add(T object) {
+		if (poolSize >= poolCapacity) {
+			throw new IllegalStateException("No space left in pool");
+		}
+		poolSize++;
+
+		addBack(object);
+	}
+
+	/**
+	 * Gets the next cached entry. This blocks until the next entry is available.
+	 */
+	public T pollEntry() throws InterruptedException {
+		return pool.take();
+	}
+
+	/**
+	 * Tries to get the next cached entry. If the pool is empty, this method returns null.
+	 */
+	@Nullable
+	public T tryPollEntry() {
+		return pool.poll();
+	}
+
+	/**
+	 * Internal callback to put an entry back to the pool.
+	 */
+	void addBack(T object) {
+		pool.add(object);
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * A Recycler puts objects into the pool that the recycler is associated with.
+	 *
+	 * @param <T> The pooled and recycled type.
+	 */
+	@FunctionalInterface
+	public interface Recycler<T> {
+
+		/**
+		 * Recycles the given object to the pool that this recycler works with.
+		 */
+		void recycle(T object);
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/RecordAndPosition.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/RecordAndPosition.java
new file mode 100644
index 0000000..4c5079c
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/RecordAndPosition.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.util;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * A record, together with the reader position to be stored in the checkpoint.
+ *
+ * <p>The position defines the point in the reader AFTER the record. Record processing and
+ * updating checkpointed state happens atomically. The position points to where the reader should
+ * resume after this record is processed.
+ *
+ * <p>For example, the very first record in a file split could have an {@code offset} of zero and a
+ * {@code recordSkipCount} of one.
+ *
+ * <p>This class is immutable for safety. Use {@link MutableRecordAndPosition} if you need a mutable
+ * version for efficiency.
+ *
+ * <p>Note on this design: Why do we not make the position point to the current record and always skip
+ * one record after recovery (the just processed record)? We need to be able to support formats where
+ * skipping records (even one) is not an option. For example formats that execute (pushed down) filters
+ * may want to avoid a skip-record-count all together, so that they don't skip the wrong records when
+ * the filter gets updated around a checkpoint/savepoint.
+ */
+@PublicEvolving
+public class RecordAndPosition<E> {
+
+	/**
+	 * Constant for the offset, reflecting that the position does not contain any offset information.
+	 * It is used in positions that are defined only by a number of records to skip.
+	 */
+	public static final long NO_OFFSET = CheckpointedPosition.NO_OFFSET;
+
+	// these are package private and non-final so we can mutate them from the MutableRecordAndPosition
+	E record;
+	long offset;
+	long recordSkipCount;
+
+	/**
+	 * Creates a new {@code RecordAndPosition} with the given record and position info.
+	 */
+	public RecordAndPosition(E record, long offset, long recordSkipCount) {
+		this.record = record;
+		this.offset = offset;
+		this.recordSkipCount = recordSkipCount;
+	}
+
+	/**
+	 * Package private constructor for the {@link MutableRecordAndPosition}.
+	 */
+	RecordAndPosition() {}
+
+	// ------------------------------------------------------------------------
+
+	public E getRecord() {
+		return record;
+	}
+
+	public long getOffset() {
+		return offset;
+	}
+
+	public long getRecordSkipCount() {
+		return recordSkipCount;
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public String toString() {
+		return String.format("%s @ %d + %d", record, offset, recordSkipCount);
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/RecyclableIterator.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/RecyclableIterator.java
new file mode 100644
index 0000000..47cf967
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/RecyclableIterator.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+
+import javax.annotation.Nullable;
+
+/**
+ * Utility base class for iterators that accept a recycler.
+ *
+ * @param <E> The type of the records returned by the iterator.
+ */
+@Internal
+abstract class RecyclableIterator<E> implements BulkFormat.RecordIterator<E> {
+
+	@Nullable
+	private final Runnable recycler;
+
+	/**
+	 * Creates a {@code RecyclableIterator} with the given optional recycler.
+	 */
+	protected RecyclableIterator(@Nullable Runnable recycler) {
+		this.recycler = recycler;
+	}
+
+	@Override
+	public void releaseBatch() {
+		if (recycler != null) {
+			recycler.run();
+		}
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/SingletonResultIterator.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/SingletonResultIterator.java
new file mode 100644
index 0000000..7b56e5c
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/SingletonResultIterator.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.util;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+
+import javax.annotation.Nullable;
+
+/**
+ * A simple {@link BulkFormat.RecordIterator} that returns a single value.
+ * The iterator is mutable to support object reuse and supports recycling.
+ *
+ * @param <E> The type of the record returned by the iterator.
+ */
+@PublicEvolving
+public final class SingletonResultIterator<E> extends RecyclableIterator<E> implements BulkFormat.RecordIterator<E> {
+
+	@Nullable
+	private RecordAndPosition<E> element;
+
+	private final MutableRecordAndPosition<E> recordAndPosition;
+
+	public SingletonResultIterator() {
+		this(null);
+	}
+
+	public SingletonResultIterator(@Nullable Runnable recycler) {
+		super(recycler);
+		this.recordAndPosition = new MutableRecordAndPosition<>();
+	}
+
+	// -------------------------------------------------------------------------
+	//  Setting
+	// -------------------------------------------------------------------------
+
+	/**
+	 * Sets the single record to be returned by this iterator.
+	 * The offset and records-to-skip count will be used as provided here for the returned
+	 * {@link RecordAndPosition}, meaning they need to point to AFTER this specific record
+	 * (because a checkpoint taken after the record was processed needs to resume from after this record).
+	 */
+	public void set(final E element, final long offset, final long skipCount) {
+		this.recordAndPosition.set(element, offset, skipCount);
+		this.element = this.recordAndPosition;
+	}
+
+	// -------------------------------------------------------------------------
+	//  Result Iterator Methods
+	// -------------------------------------------------------------------------
+
+	@Nullable
+	@Override
+	public RecordAndPosition<E> next() {
+		final RecordAndPosition<E> next = this.element;
+		this.element = null;
+		return next;
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/Utils.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/Utils.java
new file mode 100644
index 0000000..bdadc1b
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/Utils.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.util;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Miscellaneous utilities for the file source.
+ */
+@PublicEvolving
+public final class Utils {
+
+	/**
+	 * Runs the given {@code SupplierWithException} (a piece of code producing a result).
+	 * If an exception happens during that, the given closable is quietly closed.
+	 */
+	public static <E> E doWithCleanupOnException(
+			final Closeable toCleanUp,
+			final SupplierWithException<E, IOException> code) throws IOException {
+
+		try {
+			return code.get();
+		}
+		catch (Throwable t) {
+			IOUtils.closeQuietly(toCleanUp);
+			ExceptionUtils.rethrowIOException(t);
+			return null; // silence the compiler
+		}
+	}
+
+	/**
+	 * Runs the given {@code Runnable}. If an exception happens during that, the given closable
+	 * is quietly closed.
+	 */
+	public static void doWithCleanupOnException(
+			final Closeable toCleanUp,
+			final ThrowingRunnable<IOException> code) throws IOException {
+
+		doWithCleanupOnException(toCleanUp, (SupplierWithException<Void, IOException>) () -> {
+			code.run();
+			return null;
+		});
+	}
+
+	// ------------------------------------------------------------------------
+
+	/** This class is not meant to be instantiated. */
+	private Utils() {}
+}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceHeavyThroughputTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceHeavyThroughputTest.java
new file mode 100644
index 0000000..278d214
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceHeavyThroughputTest.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src;
+
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.event.NoMoreSplitsEvent;
+import org.apache.flink.connector.file.src.reader.SimpleStreamFormat;
+import org.apache.flink.connector.file.src.reader.StreamFormat;
+import org.apache.flink.connector.file.src.testutils.TestingFileSystem;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+
+import org.junit.After;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * This test simulates readers that just produce byte arrays very fast. The test
+ * is meant to check that this does not break the system in terms of object allocations, etc.
+ */
+public class FileSourceHeavyThroughputTest {
+
+	/** Testing file system reference, to be cleaned up in an @After method. That way it also gets
+	 * cleaned up on a test failure, without needing finally clauses in every test. */
+	private TestingFileSystem testFs;
+
+	@After
+	public void unregisterTestFs() throws Exception {
+		if (testFs != null) {
+			testFs.unregister();
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testHeavyThroughput() throws Exception {
+		final Path path = new Path("testfs:///testpath");
+		final long fileSize = 20L << 30; // 20 GB
+		final FileSourceSplit split = new FileSourceSplit("testsplitId", path, 0, fileSize);
+
+		testFs = TestingFileSystem.createForFileStatus(
+				path.toUri().getScheme(),
+				TestingFileSystem.TestFileStatus.forFileWithStream(path, fileSize, new GeneratingInputStream(fileSize)));
+		testFs.register();
+
+		final FileSource<byte[]> source = FileSource.forRecordStreamFormat(new ArrayReaderFormat(), path).build();
+		final SourceReader<byte[], FileSourceSplit> reader = source.createReader(new NoOpReaderContext());
+		reader.addSplits(Collections.singletonList(split));
+		reader.handleSourceEvents(new NoMoreSplitsEvent());
+
+		final ReaderOutput<byte[]> out = new NoOpReaderOutput<>();
+
+		InputStatus status;
+		while ((status = reader.pollNext(out)) != InputStatus.END_OF_INPUT) {
+			// if nothing is available currently, wait for more
+			if (status == InputStatus.NOTHING_AVAILABLE) {
+				reader.isAvailable().get();
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  test mocks
+	// ------------------------------------------------------------------------
+
+	private static final class ArrayReader implements StreamFormat.Reader<byte[]> {
+
+		private static final int ARRAY_SIZE = 1 << 20; // 1 MiByte
+
+		private final FSDataInputStream in;
+
+		ArrayReader(FSDataInputStream in) {
+			this.in = in;
+		}
+
+		@Nullable
+		@Override
+		public byte[] read() throws IOException {
+			final byte[] array = new byte[ARRAY_SIZE];
+			final int read = in.read(array);
+			if (read == array.length) {
+				return array;
+			} else if (read == -1) {
+				return null;
+			} else {
+				return Arrays.copyOf(array, read);
+			}
+		}
+
+		@Override
+		public void close() throws IOException {
+			in.close();
+		}
+	}
+
+	private static final class ArrayReaderFormat extends SimpleStreamFormat<byte[]> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Reader<byte[]> createReader(Configuration config, FSDataInputStream stream) throws IOException {
+			return new ArrayReader(stream);
+		}
+
+		@Override
+		public TypeInformation<byte[]> getProducedType() {
+			return PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO;
+		}
+	}
+
+	private static final class GeneratingInputStream extends FSDataInputStream {
+
+		private final long length;
+		private long pos;
+
+		GeneratingInputStream(long length) {
+			this.length = length;
+		}
+
+		@Override
+		public void seek(long desired) throws IOException {
+			checkArgument(desired >= 0 && desired <= length);
+			pos = desired;
+		}
+
+		@Override
+		public long getPos() throws IOException {
+			return pos;
+		}
+
+		@Override
+		public int read() throws IOException {
+			if (pos < length) {
+				pos++;
+				return 0;
+			} else {
+				return -1;
+			}
+		}
+
+		@Override
+		public int read(byte[] b, int off, int len) throws IOException {
+			if (pos < length) {
+				final int remaining = (int) Math.min(len, length - pos);
+				pos += remaining;
+				return remaining;
+			} else {
+				return -1;
+			}
+		}
+	}
+
+	private static final class NoOpReaderContext implements SourceReaderContext {
+
+		@Override
+		public MetricGroup metricGroup() {
+			return new UnregisteredMetricsGroup();
+		}
+
+		@Override
+		public Configuration getConfiguration() {
+			return new Configuration();
+		}
+
+		@Override
+		public String getLocalHostName() {
+			return "localhost";
+		}
+
+		@Override
+		public void sendSourceEventToCoordinator(SourceEvent sourceEvent) {}
+	}
+
+	private static final class NoOpReaderOutput<E> implements ReaderOutput<E> {
+
+		@Override
+		public void collect(E record) {}
+
+		@Override
+		public void collect(E record, long timestamp) {}
+
+		@Override
+		public void emitWatermark(Watermark watermark) {}
+
+		@Override
+		public void markIdle() {}
+
+		@Override
+		public SourceOutput<E> createOutputForSplit(String splitId) {
+			return this;
+		}
+
+		@Override
+		public void releaseOutputForSplit(String splitId) {}
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceSplitSerializerTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceSplitSerializerTest.java
new file mode 100644
index 0000000..16f5720
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceSplitSerializerTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src;
+
+import org.apache.flink.connector.file.src.util.CheckpointedPosition;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.SimpleVersionedSerialization;
+
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+
+/**
+ * Unit tests for the {@link FileSourceSplitSerializer}.
+ */
+public class FileSourceSplitSerializerTest {
+
+	@Test
+	public void serializeSplitWithHosts() throws Exception {
+		final FileSourceSplit split = new FileSourceSplit(
+			"random-id",
+			new Path("hdfs://namenode:14565/some/path/to/a/file"),
+			100_000_000,
+			64_000_000,
+			"host1", "host2", "host3");
+
+		final FileSourceSplit deSerialized = serializeAndDeserialize(split);
+
+		assertSplitsEqual(split, deSerialized);
+	}
+
+	@Test
+	public void serializeSplitWithoutHosts() throws Exception {
+		final FileSourceSplit split = new FileSourceSplit(
+			"some-id",
+			new Path("file:/some/path/to/a/file"),
+			0,
+			0);
+
+		final FileSourceSplit deSerialized = serializeAndDeserialize(split);
+
+		assertSplitsEqual(split, deSerialized);
+	}
+
+	@Test
+	public void serializeSplitWithReaderPosition() throws Exception {
+		final FileSourceSplit split = new FileSourceSplit(
+			"random-id",
+			new Path("hdfs://namenode:14565/some/path/to/a/file"),
+			100_000_000,
+			64_000_000,
+			new String[] {"host1", "host2", "host3"},
+			new CheckpointedPosition(7665391L, 100L));
+
+		final FileSourceSplit deSerialized = serializeAndDeserialize(split);
+
+		assertSplitsEqual(split, deSerialized);
+	}
+
+	@Test
+	public void repeatedSerialization() throws Exception {
+		final FileSourceSplit split = new FileSourceSplit(
+			"an-id",
+			new Path("s3://some-bucket/key/to/the/object"),
+			0,
+			1234567);
+
+		serializeAndDeserialize(split);
+		serializeAndDeserialize(split);
+		final FileSourceSplit deSerialized = serializeAndDeserialize(split);
+
+		assertSplitsEqual(split, deSerialized);
+	}
+
+	@Test
+	public void repeatedSerializationCaches() throws Exception {
+		final FileSourceSplit split = new FileSourceSplit(
+			"random-id",
+			new Path("hdfs://namenode:14565/some/path/to/a/file"),
+			100_000_000,
+			64_000_000,
+			"host1", "host2", "host3");
+
+		final byte[] ser1 = FileSourceSplitSerializer.INSTANCE.serialize(split);
+		final byte[] ser2 = FileSourceSplitSerializer.INSTANCE.serialize(split);
+
+		assertSame(ser1, ser2);
+	}
+
+	// ------------------------------------------------------------------------
+	//  test utils
+	// ------------------------------------------------------------------------
+
+	private static FileSourceSplit serializeAndDeserialize(FileSourceSplit split) throws IOException {
+		final FileSourceSplitSerializer serializer = new FileSourceSplitSerializer();
+		final byte[] bytes = SimpleVersionedSerialization.writeVersionAndSerialize(serializer, split);
+		return SimpleVersionedSerialization.readVersionAndDeSerialize(serializer, bytes);
+	}
+
+	static void assertSplitsEqual(FileSourceSplit expected, FileSourceSplit actual) {
+		assertEquals(expected.splitId(), actual.splitId());
+		assertEquals(expected.path(), actual.path());
+		assertEquals(expected.offset(), actual.offset());
+		assertEquals(expected.length(), actual.length());
+		assertArrayEquals(expected.hostnames(), actual.hostnames());
+		assertEquals(expected.getReaderPosition(), actual.getReaderPosition());
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceSplitStateTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceSplitStateTest.java
new file mode 100644
index 0000000..95c704a
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceSplitStateTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src;
+
+import org.apache.flink.connector.file.src.util.CheckpointedPosition;
+import org.apache.flink.core.fs.Path;
+
+import org.junit.Test;
+
+import java.util.Optional;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unit tests for the {@link FileSourceSplitState}.
+ */
+public class FileSourceSplitStateTest {
+
+	@Test
+	public void testRoundTripWithoutModification() {
+		final FileSourceSplit split = getTestSplit();
+		final FileSourceSplitState state = new FileSourceSplitState(split);
+
+		final FileSourceSplit resultSplit = state.toFileSourceSplit();
+
+		assertEquals(split.getReaderPosition(), resultSplit.getReaderPosition());
+	}
+
+	@Test
+	public void testStateStartsWithSplitValues() {
+		final FileSourceSplit split = getTestSplit(new CheckpointedPosition(123L, 456L));
+		final FileSourceSplitState state = new FileSourceSplitState(split);
+
+		assertEquals(123L, state.getOffset());
+		assertEquals(456L, state.getRecordsToSkipAfterOffset());
+	}
+
+	@Test
+	public void testNewSplitTakesModifiedOffsetAndCount() {
+		final FileSourceSplit split = getTestSplit();
+		final FileSourceSplitState state = new FileSourceSplitState(split);
+
+		state.setPosition(1234L, 7566L);
+		final Optional<CheckpointedPosition> position = state.toFileSourceSplit().getReaderPosition();
+
+		assertTrue(position.isPresent());
+		assertEquals(new CheckpointedPosition(1234L, 7566L), position.get());
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static FileSourceSplit getTestSplit() {
+		return getTestSplit(null);
+	}
+
+	private static FileSourceSplit getTestSplit(CheckpointedPosition position) {
+		return new FileSourceSplit(
+				"test-id",
+				new Path("file:/some/random/path"),
+				17,
+				121,
+				new String[] {"localhost"},
+				position);
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceSplitTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceSplitTest.java
new file mode 100644
index 0000000..e58ef6f
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceSplitTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src;
+
+import org.apache.flink.core.fs.Path;
+
+import org.junit.Test;
+
+/**
+ * Unit tests for the {@link FileSourceSplit}.
+ */
+public class FileSourceSplitTest {
+
+	@Test(expected = IllegalArgumentException.class)
+	public void noNullHostsAllowed() {
+		new FileSourceSplit("id", new Path("file:/some/random/path"), 0, 10, "host1", null, "host2");
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
new file mode 100644
index 0000000..8876e7e
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.connector.file.src.reader.TextLineFormat;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * MiniCluster-based integration test for the {@link FileSource}.
+ */
+public class FileSourceTextLinesITCase extends TestLogger {
+
+	private static final int PARALLELISM = 4;
+
+	@ClassRule
+	public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
+
+	@ClassRule
+	public static final MiniClusterWithClientResource MINI_CLUSTER = new MiniClusterWithClientResource(
+		new MiniClusterResourceConfiguration.Builder()
+			.setNumberTaskManagers(1)
+			.setNumberSlotsPerTaskManager(PARALLELISM)
+			.build());
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * This test runs a job reading bounded input with a stream record format (text lines).
+	 */
+	@Test
+	public void testBoundedTextFileSource() throws Exception {
+		final File testDir = TMP_FOLDER.newFolder();
+
+		// our main test data
+		writeAllFiles(testDir);
+
+		// write some junk to hidden files test that common hidden file patterns are filtered by default
+		writeHiddenJunkFiles(testDir);
+
+		final FileSource<String> source = FileSource
+				.forRecordStreamFormat(new TextLineFormat(), Path.fromLocalFile(testDir))
+				.build();
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(PARALLELISM);
+
+		final DataStream<String> stream = env.fromSource(
+				source,
+				WatermarkStrategy.noWatermarks(),
+				"file-source");
+
+		final List<String> result = DataStreamUtils.collectBoundedStream(stream, "Bounded TextFiles Test");
+
+		verifyResult(result);
+	}
+
+	/**
+	 * This test runs a job reading continuous input (files appearing over time)
+	 * with a stream record format (text lines).
+	 */
+	@Test
+	public void testContinuousTextFileSource() throws Exception {
+		final File testDir = TMP_FOLDER.newFolder();
+
+		final FileSource<String> source = FileSource
+				.forRecordStreamFormat(new TextLineFormat(), Path.fromLocalFile(testDir))
+				.monitorContinuously(Duration.ofMillis(5))
+				.build();
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(PARALLELISM);
+
+		final DataStream<String> stream = env.fromSource(
+				source,
+				WatermarkStrategy.noWatermarks(),
+				"file-source");
+
+		final DataStreamUtils.ClientAndIterator<String> client =
+				DataStreamUtils.collectWithClient(stream, "Continuous TextFiles Monitoring Test");
+
+		// write one file, execute, and wait for its result
+		// that way we know that the application was running and the source has
+		// done its first chunk of work already
+
+		final int numLinesFirst = LINES_PER_FILE[0].length;
+		final int numLinesAfter = LINES.length - numLinesFirst;
+
+		writeFile(testDir, 0);
+		final List<String> result1 = DataStreamUtils.collectRecordsFromUnboundedStream(client, numLinesFirst);
+
+		// write the remaining files over time, after that collect the final result
+		for (int i = 1; i < LINES_PER_FILE.length; i++) {
+			Thread.sleep(10);
+			writeFile(testDir, i);
+		}
+
+		final List<String> result2 = DataStreamUtils.collectRecordsFromUnboundedStream(client, numLinesAfter);
+
+		// shut down the job, now that we have all the results we expected.
+		client.client.cancel().get();
+
+		result1.addAll(result2);
+		verifyResult(result1);
+	}
+
+	// ------------------------------------------------------------------------
+	//  verification
+	// ------------------------------------------------------------------------
+
+	private static void verifyResult(List<String> lines) {
+		final String[] expected = Arrays.copyOf(LINES, LINES.length);
+		final String[] actual = lines.toArray(new String[0]);
+
+		Arrays.sort(expected);
+		Arrays.sort(actual);
+
+		assertThat(actual, equalTo(expected));
+	}
+
+	// ------------------------------------------------------------------------
+	//  test data
+	// ------------------------------------------------------------------------
+
+	private static final String[] FILE_PATHS = new String[] {
+			"text.2",
+			"nested1/text.1",
+			"text.1",
+			"text.3",
+			"nested2/nested21/text",
+			"nested1/text.2",
+			"nested2/text"};
+
+	private static final String[] HIDDEN_JUNK_PATHS = new String[] {
+			// all file names here start with '.' or '_'
+			"_something",
+			".junk",
+			"nested1/.somefile",
+			"othernested/_ignoredfile",
+			"_nested/file",
+			"nested1/.intermediate/somefile"};
+
+	private static final String[] LINES = new String[] {
+		"To be, or not to be,--that is the question:--",
+		"Whether 'tis nobler in the mind to suffer",
+		"The slings and arrows of outrageous fortune",
+		"Or to take arms against a sea of troubles,",
+		"And by opposing end them?--To die,--to sleep,--",
+		"No more; and by a sleep to say we end",
+		"The heartache, and the thousand natural shocks",
+		"That flesh is heir to,--'tis a consummation",
+		"Devoutly to be wish'd. To die,--to sleep;--",
+		"To sleep! perchance to dream:--ay, there's the rub;",
+		"For in that sleep of death what dreams may come,",
+		"When we have shuffled off this mortal coil,",
+		"Must give us pause: there's the respect",
+		"That makes calamity of so long life;",
+		"For who would bear the whips and scorns of time,",
+		"The oppressor's wrong, the proud man's contumely,",
+		"The pangs of despis'd love, the law's delay,",
+		"The insolence of office, and the spurns",
+		"That patient merit of the unworthy takes,",
+		"When he himself might his quietus make",
+		"With a bare bodkin? who would these fardels bear,",
+		"To grunt and sweat under a weary life,",
+		"But that the dread of something after death,--",
+		"The undiscover'd country, from whose bourn",
+		"No traveller returns,--puzzles the will,",
+		"And makes us rather bear those ills we have",
+		"Than fly to others that we know not of?",
+		"Thus conscience does make cowards of us all;",
+		"And thus the native hue of resolution",
+		"Is sicklied o'er with the pale cast of thought;",
+		"And enterprises of great pith and moment,",
+		"With this regard, their currents turn awry,",
+		"And lose the name of action.--Soft you now!",
+		"The fair Ophelia!--Nymph, in thy orisons",
+		"Be all my sins remember'd."
+	};
+
+	private static final String[][] LINES_PER_FILE = splitLinesForFiles();
+
+	private static String[][] splitLinesForFiles() {
+		final String[][] result = new String[FILE_PATHS.length][];
+
+		final int linesPerFile = LINES.length / FILE_PATHS.length;
+		final int linesForLastFile = LINES.length - ((FILE_PATHS.length - 1) * linesPerFile);
+
+		int pos = 0;
+		for (int i = 0; i < FILE_PATHS.length - 1; i++) {
+			String[] lines = new String[linesPerFile];
+			result[i] = lines;
+			for (int k = 0; k < lines.length; k++) {
+				lines[k] = LINES[pos++];
+			}
+		}
+		String[] lines = new String[linesForLastFile];
+		result[result.length - 1] = lines;
+		for (int k = 0; k < lines.length; k++) {
+			lines[k] = LINES[pos++];
+		}
+		return result;
+	}
+
+	private static void writeFile(File testDir, int num) throws IOException {
+		final File file = new File(testDir, FILE_PATHS[num]);
+		final File parent = file.getParentFile();
+		assertTrue(parent.mkdirs() || parent.exists());
+
+		try (PrintWriter writer = new PrintWriter(new FileWriter(file))) {
+			for (String line : LINES_PER_FILE[num]) {
+				writer.println(line);
+			}
+		}
+	}
+
+	private static void writeAllFiles(File testDir) throws IOException {
+		for (int i = 0; i < FILE_PATHS.length; i++) {
+			writeFile(testDir, i);
+		}
+	}
+
+	private static void writeHiddenJunkFiles(File testDir) throws IOException {
+		for (String junkPath : HIDDEN_JUNK_PATHS) {
+			final File file = new File(testDir, junkPath);
+			final File parent = file.getParentFile();
+			assertTrue(parent.mkdirs() || parent.exists());
+
+			try (PrintWriter writer = new PrintWriter(new FileWriter(file))) {
+				writer.println("This should not end up in the test result.");
+				writer.println("Foo bar bazzl junk");
+			}
+		}
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/PendingSplitsCheckpointSerializerTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/PendingSplitsCheckpointSerializerTest.java
new file mode 100644
index 0000000..bc89dce
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/PendingSplitsCheckpointSerializerTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.SimpleVersionedSerialization;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.function.BiConsumer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+
+/**
+ * Unit tests for the {@link FileSourceSplitSerializer}.
+ */
+public class PendingSplitsCheckpointSerializerTest {
+
+	@Test
+	public void serializeEmptyCheckpoint() throws Exception {
+		final PendingSplitsCheckpoint checkpoint =
+				PendingSplitsCheckpoint.fromCollectionSnapshot(Collections.emptyList());
+
+		final PendingSplitsCheckpoint deSerialized = serializeAndDeserialize(checkpoint);
+
+		assertCheckpointsEqual(checkpoint, deSerialized);
+	}
+
+	@Test
+	public void serializeSomeSplits() throws Exception {
+		final PendingSplitsCheckpoint checkpoint = PendingSplitsCheckpoint.fromCollectionSnapshot(
+				Arrays.asList(testSplit1(), testSplit2(), testSplit3()));
+
+		final PendingSplitsCheckpoint deSerialized = serializeAndDeserialize(checkpoint);
+
+		assertCheckpointsEqual(checkpoint, deSerialized);
+	}
+
+	@Test
+	public void serializeSplitsAndProcessedPaths() throws Exception {
+		final PendingSplitsCheckpoint checkpoint = PendingSplitsCheckpoint.fromCollectionSnapshot(
+				Arrays.asList(testSplit1(), testSplit2(), testSplit3()),
+				Arrays.asList(new Path("file:/some/path"), new Path("s3://bucket/key/and/path"), new Path("hdfs://namenode:12345/path")));
+
+		final PendingSplitsCheckpoint deSerialized = serializeAndDeserialize(checkpoint);
+
+		assertCheckpointsEqual(checkpoint, deSerialized);
+	}
+
+	@Test
+	public void repeatedSerialization() throws Exception {
+		final PendingSplitsCheckpoint checkpoint = PendingSplitsCheckpoint.fromCollectionSnapshot(
+			Arrays.asList(testSplit3(), testSplit1()));
+
+		serializeAndDeserialize(checkpoint);
+		serializeAndDeserialize(checkpoint);
+		final PendingSplitsCheckpoint deSerialized = serializeAndDeserialize(checkpoint);
+
+		assertCheckpointsEqual(checkpoint, deSerialized);
+	}
+
+	@Test
+	public void repeatedSerializationCaches() throws Exception {
+		final PendingSplitsCheckpoint checkpoint = PendingSplitsCheckpoint.fromCollectionSnapshot(
+				Collections.singletonList(testSplit2()));
+
+		final byte[] ser1 = PendingSplitsCheckpointSerializer.INSTANCE.serialize(checkpoint);
+		final byte[] ser2 = PendingSplitsCheckpointSerializer.INSTANCE.serialize(checkpoint);
+
+		assertSame(ser1, ser2);
+	}
+
+	// ------------------------------------------------------------------------
+	//  test utils
+	// ------------------------------------------------------------------------
+
+	private static FileSourceSplit testSplit1() {
+		return new FileSourceSplit(
+				"random-id",
+				new Path("hdfs://namenode:14565/some/path/to/a/file"),
+				100_000_000,
+				64_000_000,
+				"host1", "host2", "host3");
+	}
+
+	private static FileSourceSplit testSplit2() {
+		return new FileSourceSplit(
+				"some-id",
+				new Path("file:/some/path/to/a/file"),
+				0,
+				0);
+	}
+
+	private static FileSourceSplit testSplit3() {
+		return new FileSourceSplit(
+				"an-id",
+				new Path("s3://some-bucket/key/to/the/object"),
+				0,
+				1234567);
+	}
+
+	private static PendingSplitsCheckpoint serializeAndDeserialize(PendingSplitsCheckpoint split) throws IOException {
+		final PendingSplitsCheckpointSerializer serializer = new PendingSplitsCheckpointSerializer();
+		final byte[] bytes = SimpleVersionedSerialization.writeVersionAndSerialize(serializer, split);
+		return SimpleVersionedSerialization.readVersionAndDeSerialize(serializer, bytes);
+	}
+
+	private static void assertCheckpointsEqual(PendingSplitsCheckpoint expected, PendingSplitsCheckpoint actual) {
+		assertOrderedCollectionEquals(expected.getSplits(), actual.getSplits(), FileSourceSplitSerializerTest::assertSplitsEqual);
+
+		assertOrderedCollectionEquals(
+				expected.getAlreadyProcessedPaths(), actual.getAlreadyProcessedPaths(), Assert::assertEquals);
+	}
+
+	private static <E> void assertOrderedCollectionEquals(
+			Collection<E> expected,
+			Collection<E> actual,
+			BiConsumer<E, E> equalityAsserter) {
+
+		assertEquals(expected.size(), actual.size());
+		final Iterator<E> expectedIter = expected.iterator();
+		final Iterator<E> actualIter = actual.iterator();
+		while (expectedIter.hasNext()) {
+			equalityAsserter.accept(expectedIter.next(), actualIter.next());
+		}
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/enumerate/BlockSplittingRecursiveEnumeratorTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/enumerate/BlockSplittingRecursiveEnumeratorTest.java
new file mode 100644
index 0000000..24d0414
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/enumerate/BlockSplittingRecursiveEnumeratorTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.enumerate;
+
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.testutils.TestingFileSystem;
+import org.apache.flink.core.fs.Path;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+/**
+ * Unit tests for the {@link BlockSplittingRecursiveEnumerator}.
+ */
+public class BlockSplittingRecursiveEnumeratorTest extends NonSplittingRecursiveEnumeratorTest {
+
+	@Test
+	@Override
+	public void testFileWithMultipleBlocks() throws Exception {
+		final Path testPath = new Path("testfs:///dir/file");
+		testFs = TestingFileSystem.createForFileStatus(
+				"testfs",
+				TestingFileSystem.TestFileStatus.forFileWithBlocks(testPath, 1000L,
+						new TestingFileSystem.TestBlockLocation(0L, 100L, "host1", "host2"),
+						new TestingFileSystem.TestBlockLocation(100L, 520L, "host2", "host3"),
+						new TestingFileSystem.TestBlockLocation(620L, 380L, "host3", "host4")));
+		testFs.register();
+
+		final BlockSplittingRecursiveEnumerator enumerator = createEnumerator();
+		final Collection<FileSourceSplit> splits = enumerator.enumerateSplits(
+			new Path[] { new Path("testfs:///dir")}, 0);
+
+		final Collection<FileSourceSplit> expected = Arrays.asList(
+				new FileSourceSplit("ignoredId", testPath, 0L, 100L, "host1", "host2"),
+				new FileSourceSplit("ignoredId", testPath, 100L, 520L, "host1", "host2"),
+				new FileSourceSplit("ignoredId", testPath, 620L, 380L, "host1", "host2"));
+
+		assertSplitsEqual(expected, splits);
+	}
+
+	protected BlockSplittingRecursiveEnumerator createEnumerator() {
+		return new BlockSplittingRecursiveEnumerator();
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/enumerate/NonSplittingRecursiveEnumeratorTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/enumerate/NonSplittingRecursiveEnumeratorTest.java
new file mode 100644
index 0000000..9837975
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/enumerate/NonSplittingRecursiveEnumeratorTest.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.enumerate;
+
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.testutils.TestingFileSystem;
+import org.apache.flink.core.fs.Path;
+
+import org.junit.After;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Unit tests for the {@link NonSplittingRecursiveEnumerator}.
+ */
+public class NonSplittingRecursiveEnumeratorTest {
+
+	/** Testing file system reference, to be cleaned up in an @After method. That way it also gets
+	 * cleaned up on a test failure, without needing finally clauses in every test. */
+	protected TestingFileSystem testFs;
+
+	@After
+	public void unregisterTestFs() throws Exception {
+		if (testFs != null) {
+			testFs.unregister();
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testIncludeFilesFromNestedDirectories() throws Exception {
+		final Path[] testPaths = new Path[] {
+				new Path("testfs:///dir/file1"),
+				new Path("testfs:///dir/nested/file.out"),
+				new Path("testfs:///dir/nested/anotherfile.txt")};
+		testFs = TestingFileSystem.createWithFiles("testfs", testPaths);
+		testFs.register();
+
+		final NonSplittingRecursiveEnumerator enumerator = createEnumerator();
+		final Collection<FileSourceSplit> splits = enumerator.enumerateSplits(
+				new Path[] { new Path("testfs:///dir")}, 1);
+
+		assertThat(toPaths(splits), containsInAnyOrder(testPaths));
+	}
+
+	@Test
+	public void testDefaultHiddenFilesFilter() throws Exception {
+		final Path[] testPaths = new Path[] {
+				new Path("testfs:///visiblefile"),
+				new Path("testfs:///.hiddenfile1"),
+				new Path("testfs:///_hiddenfile2")};
+		testFs = TestingFileSystem.createWithFiles("testfs", testPaths);
+		testFs.register();
+
+		final NonSplittingRecursiveEnumerator enumerator = createEnumerator();
+		final Collection<FileSourceSplit> splits = enumerator.enumerateSplits(
+				new Path[] { new Path("testfs:///")}, 1);
+
+		assertEquals(Collections.singletonList(new Path("testfs:///visiblefile")), toPaths(splits));
+	}
+
+	@Test
+	public void testHiddenDirectories() throws Exception {
+		final Path[] testPaths = new Path[] {
+				new Path("testfs:///dir/visiblefile"),
+				new Path("testfs:///dir/.hiddendir/file"),
+				new Path("testfs:///_notvisible/afile")};
+		testFs = TestingFileSystem.createWithFiles("testfs", testPaths);
+		testFs.register();
+
+		final NonSplittingRecursiveEnumerator enumerator = createEnumerator();
+		final Collection<FileSourceSplit> splits = enumerator.enumerateSplits(
+				new Path[] { new Path("testfs:///")}, 1);
+
+		assertEquals(Collections.singletonList(new Path("testfs:///dir/visiblefile")), toPaths(splits));
+	}
+
+	@Test
+	public void testFilesWithNoBlockInfo() throws Exception {
+		final Path testPath = new Path("testfs:///dir/file1");
+		testFs = TestingFileSystem.createForFileStatus(
+				"testfs",
+				TestingFileSystem.TestFileStatus.forFileWithBlocks(testPath, 12345L));
+		testFs.register();
+
+		final NonSplittingRecursiveEnumerator enumerator = createEnumerator();
+		final Collection<FileSourceSplit> splits = enumerator.enumerateSplits(
+				new Path[] { new Path("testfs:///dir")}, 0);
+
+		assertEquals(1, splits.size());
+		assertSplitsEqual(
+				new FileSourceSplit("ignoredId", testPath, 0L, 12345L),
+				splits.iterator().next());
+	}
+
+	@Test
+	public void testFileWithIncorrectBlocks() throws Exception {
+		final Path testPath = new Path("testfs:///testdir/testfile");
+
+		testFs = TestingFileSystem.createForFileStatus(
+				"testfs",
+				TestingFileSystem.TestFileStatus.forFileWithBlocks(testPath, 10000L,
+						new TestingFileSystem.TestBlockLocation(0L, 1000L),
+						new TestingFileSystem.TestBlockLocation(2000L, 1000L)));
+		testFs.register();
+
+		final NonSplittingRecursiveEnumerator enumerator = createEnumerator();
+		final Collection<FileSourceSplit> splits = enumerator.enumerateSplits(
+				new Path[] { new Path("testfs:///testdir")}, 0);
+
+		assertEquals(1, splits.size());
+		assertSplitsEqual(
+				new FileSourceSplit("ignoredId", testPath, 0L, 10000L),
+				splits.iterator().next());
+	}
+
+	@Test
+	public void testFileWithMultipleBlocks() throws Exception {
+		final Path testPath = new Path("testfs:///dir/file");
+		testFs = TestingFileSystem.createForFileStatus(
+				"testfs",
+				TestingFileSystem.TestFileStatus.forFileWithBlocks(testPath, 1000L,
+						new TestingFileSystem.TestBlockLocation(0L, 100L, "host1", "host2"),
+						new TestingFileSystem.TestBlockLocation(100L, 520L, "host2", "host3"),
+						new TestingFileSystem.TestBlockLocation(620L, 380L, "host3", "host4")));
+		testFs.register();
+
+		final NonSplittingRecursiveEnumerator enumerator = createEnumerator();
+		final Collection<FileSourceSplit> splits = enumerator.enumerateSplits(
+				new Path[] { new Path("testfs:///dir")}, 0);
+
+		assertSplitsEqual(
+				new FileSourceSplit("ignoredId", testPath, 0L, 1000L, "host1", "host2", "host3", "host4"),
+				splits.iterator().next());
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * The instantiation of the enumerator is overridable so that we can reuse these tests for sub-classes.
+	 */
+	protected NonSplittingRecursiveEnumerator createEnumerator() {
+		return new NonSplittingRecursiveEnumerator();
+	}
+
+	// ------------------------------------------------------------------------
+
+	protected static void assertSplitsEqual(final FileSourceSplit expected, final FileSourceSplit actual) {
+		assertEquals(expected.path(), actual.path());
+		assertEquals(expected.offset(), actual.offset());
+		assertEquals(expected.length(), actual.length());
+		assertArrayEquals(expected.hostnames(), actual.hostnames());
+	}
+
+	protected static void assertSplitsEqual(
+			final Collection<FileSourceSplit> expected,
+			final Collection<FileSourceSplit> actual) {
+
+		assertEquals(expected.size(), actual.size());
+
+		final ArrayList<FileSourceSplit> expectedCopy = new ArrayList<>(expected);
+		final ArrayList<FileSourceSplit> actualCopy = new ArrayList<>(expected);
+		expectedCopy.sort(NonSplittingRecursiveEnumeratorTest::compareFileSourceSplit);
+		actualCopy.sort(NonSplittingRecursiveEnumeratorTest::compareFileSourceSplit);
+
+		for (int i = 0; i < expectedCopy.size(); i++) {
+			assertSplitsEqual(expectedCopy.get(i), actualCopy.get(i));
+		}
+	}
+
+	protected static Collection<Path> toPaths(Collection<FileSourceSplit> splits) {
+		return splits.stream().map(FileSourceSplit::path).collect(Collectors.toList());
+	}
+
+	private static int compareFileSourceSplit(FileSourceSplit a, FileSourceSplit b) {
+		return Long.compare(a.offset(), b.offset());
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/AdapterTestBase.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/AdapterTestBase.java
new file mode 100644
index 0000000..ea59379
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/AdapterTestBase.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.impl;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connector.file.src.reader.StreamFormat;
+import org.apache.flink.connector.file.src.testutils.TestingFileSystem;
+import org.apache.flink.connector.file.src.util.CheckpointedPosition;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.Path;
+
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Queue;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Base class for adapters, as used by {@link StreamFormatAdapterTest} and {@link FileRecordFormatAdapterTest}.
+ */
+public abstract class AdapterTestBase<FormatT> {
+
+	@ClassRule
+	public static final TemporaryFolder TMP_DIR = new TemporaryFolder();
+
+	protected static final int NUM_NUMBERS = 100;
+	protected static final long FILE_LEN = 4 * NUM_NUMBERS;
+
+	protected static Path testPath;
+
+	@BeforeClass
+	public static void writeTestFile() throws IOException {
+		final File testFile = new File(TMP_DIR.getRoot(), "testFile");
+		testPath = Path.fromLocalFile(testFile);
+
+		try (DataOutputStream out = new DataOutputStream(new FileOutputStream(testFile))) {
+			for (int i = 0; i < NUM_NUMBERS; i++) {
+				out.writeInt(i);
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  format specific instantiation
+	// ------------------------------------------------------------------------
+
+	protected abstract FormatT createCheckpointedFormat();
+
+	protected abstract FormatT createNonCheckpointedFormat();
+
+	protected abstract FormatT createFormatFailingInInstantiation();
+
+	protected abstract BulkFormat<Integer> wrapWithAdapter(FormatT format);
+
+	// ------------------------------------------------------------------------
+	//  shared tests
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testRecoverCheckpointedFormatOneSplit() throws IOException {
+		testReading(createCheckpointedFormat(), 1, 5, 44);
+	}
+
+	@Test
+	public void testRecoverCheckpointedFormatMultipleSplits() throws IOException {
+		testReading(createCheckpointedFormat(), 3, 11, 33, 56);
+	}
+
+	@Test
+	public void testRecoverNonCheckpointedFormatOneSplit() throws IOException {
+		testReading(createNonCheckpointedFormat(), 1, 5, 44);
+	}
+
+	private void testReading(FormatT format, int numSplits, int... recoverAfterRecords) throws IOException {
+		// add the end boundary for recovery
+		final int[] boundaries = Arrays.copyOf(recoverAfterRecords, recoverAfterRecords.length + 1);
+		boundaries[boundaries.length - 1] = NUM_NUMBERS;
+
+		// set a fetch size so that we get three records per fetch
+		final Configuration config = new Configuration();
+		config.set(StreamFormat.FETCH_IO_SIZE, new MemorySize(10));
+
+		final BulkFormat<Integer> adapter = wrapWithAdapter(format);
+		final Queue<FileSourceSplit> splits = buildSplits(numSplits);
+		final List<Integer> result = new ArrayList<>();
+
+		FileSourceSplit currentSplit = null;
+		BulkFormat.Reader<Integer> currentReader = null;
+
+		for (int nextRecordToRecover : boundaries) {
+			final Tuple2<FileSourceSplit, CheckpointedPosition> toRecoverFrom = readNumbers(
+				currentReader, currentSplit,
+				adapter, splits, config,
+				result,
+				nextRecordToRecover - result.size());
+
+			currentSplit = toRecoverFrom.f0;
+			currentReader = adapter.restoreReader(config, currentSplit.path(), currentSplit.offset(), currentSplit.length(), toRecoverFrom.f1);
+		}
+
+		verifyIntListResult(result);
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testClosesStreamIfReaderCreationFails() throws Exception {
+		// setup
+		final Path testPath = new Path("testFs:///testpath-1");
+		final CloseTestingInputStream in = new CloseTestingInputStream();
+		final TestingFileSystem testFs = TestingFileSystem.createForFileStatus("testFs",
+				TestingFileSystem.TestFileStatus.forFileWithStream(testPath, 1024, in));
+		testFs.register();
+
+		// test
+		final BulkFormat<Integer> adapter = wrapWithAdapter(createFormatFailingInInstantiation());
+		try {
+			adapter.createReader(new Configuration(), testPath, 0, 1024);
+		} catch (IOException ignored) {}
+
+		// assertions
+		assertTrue(in.closed);
+
+		// cleanup
+		testFs.unregister();
+	}
+
+	@Test
+	public void testClosesStreamIfReaderRestoreFails() throws Exception {
+		// setup
+		final Path testPath = new Path("testFs:///testpath-1");
+		final CloseTestingInputStream in = new CloseTestingInputStream();
+		final TestingFileSystem testFs = TestingFileSystem.createForFileStatus("testFs",
+			TestingFileSystem.TestFileStatus.forFileWithStream(testPath, 1024, in));
+		testFs.register();
+
+		// test
+		final BulkFormat<Integer> adapter = wrapWithAdapter(createFormatFailingInInstantiation());
+		try {
+			adapter.restoreReader(
+					new Configuration(), testPath, 0, 1024,
+					new CheckpointedPosition(0L, 5L));
+		} catch (IOException ignored) {}
+
+		// assertions
+		assertTrue(in.closed);
+
+		// cleanup
+		testFs.unregister();
+	}
+
+	// ------------------------------------------------------------------------
+	//  test helpers
+	// ------------------------------------------------------------------------
+
+	protected static void verifyIntListResult(List<Integer> result) {
+		assertEquals("wrong result size", NUM_NUMBERS, result.size());
+		int nextExpected = 0;
+		for (int next : result) {
+			if (next != nextExpected++) {
+				fail("Wrong result: " + result);
+			}
+		}
+	}
+
+	protected static void readNumbers(BulkFormat.Reader<Integer> reader, List<Integer> result, int num) throws IOException {
+		readNumbers(reader, null, null, null, null, result, num);
+	}
+
+	protected static Tuple2<FileSourceSplit, CheckpointedPosition> readNumbers(
+			BulkFormat.Reader<Integer> currentReader,
+			FileSourceSplit currentSplit,
+			BulkFormat<Integer> format,
+			Queue<FileSourceSplit> moreSplits,
+			Configuration config,
+			List<Integer> result,
+			int num) throws IOException {
+
+		long offset = Long.MIN_VALUE;
+		long skip = Long.MIN_VALUE;
+
+		// loop across splits
+		while (num > 0) {
+			if (currentReader == null) {
+				currentSplit = moreSplits.poll();
+				assertNotNull(currentSplit);
+				currentReader = format.createReader(config, currentSplit.path(), currentSplit.offset(), currentSplit.length());
+			}
+
+			// loop across batches
+			BulkFormat.RecordIterator<Integer> nextBatch;
+			while (num > 0 && (nextBatch = currentReader.readBatch()) != null) {
+
+				// loop across record in batch
+				RecordAndPosition<Integer> next;
+				while (num > 0 && (next = nextBatch.next()) != null) {
+					num--;
+					result.add(next.getRecord());
+					offset = next.getOffset();
+					skip = next.getRecordSkipCount();
+				}
+			}
+
+			currentReader.close();
+			currentReader = null;
+		}
+
+		return new Tuple2<>(currentSplit, new CheckpointedPosition(offset, skip));
+	}
+
+	static Queue<FileSourceSplit> buildSplits(int numSplits) {
+		final Queue<FileSourceSplit>  splits = new ArrayDeque<>();
+		final long rangeForSplit = FILE_LEN / numSplits;
+
+		for (int i = 0; i < numSplits - 1; i++) {
+			splits.add(new FileSourceSplit("ID-" + i, testPath, i * rangeForSplit, rangeForSplit));
+		}
+		final long startOfLast = (numSplits - 1) * rangeForSplit;
+		splits.add(new FileSourceSplit("ID-" + (numSplits - 1), testPath, startOfLast, FILE_LEN - startOfLast));
+		return splits;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Test Mocks and Stubs
+	// ------------------------------------------------------------------------
+
+	private static class CloseTestingInputStream extends FSDataInputStream {
+
+		boolean closed;
+
+		@Override
+		public void seek(long desired) throws IOException {}
+
+		@Override
+		public long getPos() throws IOException {
+			return 0;
+		}
+
+		@Override
+		public int read() throws IOException {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public void close() throws IOException {
+			closed = true;
+		}
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/FileRecordFormatAdapterTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/FileRecordFormatAdapterTest.java
new file mode 100644
index 0000000..779f38e
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/FileRecordFormatAdapterTest.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.impl;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connector.file.src.reader.FileRecordFormat;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Unit and behavior tests for the {@link FileRecordFormatAdapter}.
+ */
+@SuppressWarnings("serial")
+public class FileRecordFormatAdapterTest extends AdapterTestBase<FileRecordFormat<Integer>> {
+
+	@Override
+	protected FileRecordFormat<Integer> createCheckpointedFormat() {
+		return new IntFileRecordFormat(true);
+	}
+
+	@Override
+	protected FileRecordFormat<Integer> createNonCheckpointedFormat() {
+		return new IntFileRecordFormat(false);
+	}
+
+	@Override
+	protected FileRecordFormat<Integer> createFormatFailingInInstantiation() {
+		return new FailingInstantiationFormat();
+	}
+
+	@Override
+	protected BulkFormat<Integer> wrapWithAdapter(FileRecordFormat<Integer> format) {
+		return new FileRecordFormatAdapter<>(format);
+	}
+
+	// ------------------------------------------------------------------------
+	//  not applicable tests
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void testClosesStreamIfReaderCreationFails() throws Exception {
+		// ignore this test
+	}
+
+
+	// ------------------------------------------------------------------------
+	//  test mocks
+	// ------------------------------------------------------------------------
+
+	private static final class IntFileRecordFormat implements FileRecordFormat<Integer> {
+
+		private final boolean checkpointed;
+
+		IntFileRecordFormat(boolean checkpointed) {
+			this.checkpointed = checkpointed;
+		}
+
+		@Override
+		public Reader<Integer> createReader(
+				Configuration config,
+				Path filePath,
+				long splitOffset,
+				long splitLength) throws IOException {
+
+			final FileSystem fs = filePath.getFileSystem();
+			final FileStatus status = fs.getFileStatus(filePath);
+			final FSDataInputStream in = fs.open(filePath);
+
+			final long fileLen = status.getLen();
+			final long splitEnd = splitOffset + splitLength;
+
+			assertEquals("invalid file length", 0, fileLen % 4);
+
+			// round all positions to the next integer boundary
+			// to simulate common split behavior, we round up to the next int boundary even when we
+			// are at a perfect boundary. exceptions are if we are start or end.
+			final long start = splitOffset == 0L ? 0L : splitOffset + 4 - splitOffset % 4;
+			final long end = splitEnd == fileLen ? fileLen : splitEnd + 4 - splitEnd % 4;
+			in.seek(start);
+
+			return new TestIntReader(in, end, checkpointed);
+		}
+
+		@Override
+		public Reader<Integer> restoreReader(
+				Configuration config,
+				Path filePath,
+				long restoredOffset,
+				long splitOffset,
+				long splitLength) throws IOException {
+
+			final FileSystem fs = filePath.getFileSystem();
+			final FileStatus status = fs.getFileStatus(filePath);
+			final FSDataInputStream in = fs.open(filePath);
+
+			final long fileLen = status.getLen();
+			final long splitEnd = splitOffset + splitLength;
+
+			assertEquals("invalid file length", 0, fileLen % 4);
+
+			// round end position to the next integer boundary
+			final long end = splitEnd == fileLen ? fileLen : splitEnd + 4 - splitEnd % 4;
+			// no rounding of checkpointed offset
+			in.seek(restoredOffset);
+			return new TestIntReader(in, end, checkpointed);
+		}
+
+		@Override
+		public boolean isSplittable() {
+			return true;
+		}
+
+		@Override
+		public TypeInformation<Integer> getProducedType() {
+			return Types.INT;
+		}
+	}
+
+	private static final class FailingInstantiationFormat implements FileRecordFormat<Integer> {
+
+		@Override
+		public Reader<Integer> createReader(
+				Configuration config,
+				Path filePath,
+				long splitOffset,
+				long splitLength) throws IOException {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public Reader<Integer> restoreReader(
+				Configuration config,
+				Path filePath,
+				long restoredOffset,
+				long splitOffset,
+				long splitLength) throws IOException {
+
+			final FSDataInputStream in = filePath.getFileSystem().open(filePath);
+			return new FailingReader(in);
+		}
+
+		@Override
+		public boolean isSplittable() {
+			return false;
+		}
+
+		@Override
+		public TypeInformation<Integer> getProducedType() {
+			return Types.INT;
+		}
+
+		private static final class FailingReader implements FileRecordFormat.Reader<Integer> {
+
+			private final FSDataInputStream stream;
+
+			FailingReader(FSDataInputStream stream) {
+				this.stream = stream;
+			}
+
+			@Nullable
+			@Override
+			public Integer read() throws IOException {
+				throw new IOException("test exception");
+			}
+
+			@Override
+			public void close() throws IOException {
+				stream.close();
+			}
+		}
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/FileRecordsTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/FileRecordsTest.java
new file mode 100644
index 0000000..42bc443
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/FileRecordsTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.impl;
+
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+import org.apache.flink.connector.file.src.util.SingletonResultIterator;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unit tests for the {@link FileRecords} class.
+ */
+public class FileRecordsTest {
+
+	@Test
+	public void testEmptySplits() {
+		final String split = "empty";
+		final FileRecords<Object> records = FileRecords.finishedSplit(split);
+
+		assertEquals(Collections.singleton(split), records.finishedSplits());
+	}
+
+	@Test
+	public void testMoveToFirstSplit() {
+		final String splitId = "splitId";
+		final FileRecords<Object> records = FileRecords.forRecords(splitId, new SingletonResultIterator<>());
+
+		final String firstSplitId = records.nextSplit();
+
+		assertEquals(splitId, firstSplitId);
+	}
+
+	@Test
+	public void testMoveToSecondSplit() {
+		final FileRecords<Object> records = FileRecords.forRecords("splitId", new SingletonResultIterator<>());
+		records.nextSplit();
+
+		final String secondSplitId = records.nextSplit();
+
+		assertNull(secondSplitId);
+	}
+
+	@Test
+	public void testRecordsFromFirstSplit() {
+		final SingletonResultIterator<String> iter = new SingletonResultIterator<>();
+		iter.set("test", 18, 99);
+		final FileRecords<String> records = FileRecords.forRecords("splitId", iter);
+		records.nextSplit();
+
+		final RecordAndPosition<String> recAndPos = records.nextRecordFromSplit();
+
+		assertEquals("test", recAndPos.getRecord());
+		assertEquals(18, recAndPos.getOffset());
+		assertEquals(99, recAndPos.getRecordSkipCount());
+	}
+
+	@Test(expected = IllegalStateException.class)
+	public void testRecordsInitiallyIllegal() {
+		final FileRecords<Object> records = FileRecords.forRecords("splitId", new SingletonResultIterator<>());
+
+		records.nextRecordFromSplit();
+	}
+
+	@Test(expected = IllegalStateException.class)
+	public void testRecordsOnSecondSplitIllegal() {
+		final FileRecords<Object> records = FileRecords.forRecords("splitId", new SingletonResultIterator<>());
+		records.nextSplit();
+		records.nextSplit();
+
+		records.nextRecordFromSplit();
+	}
+
+	@Test
+	public void testRecycleExhaustedBatch() {
+		final AtomicBoolean recycled = new AtomicBoolean(false);
+		final SingletonResultIterator<Object> iter = new SingletonResultIterator<>(() -> recycled.set(true));
+		iter.set(new Object(), 1L, 2L);
+
+		final FileRecords<Object> records = FileRecords.forRecords("test split", iter);
+		records.nextSplit();
+		records.nextRecordFromSplit();
+
+		// make sure we exhausted the iterator
+		assertNull(records.nextRecordFromSplit());
+		assertNull(records.nextSplit());
+
+		records.recycle();
+		assertTrue(recycled.get());
+	}
+
+	@Test
+	public void testRecycleNonExhaustedBatch() {
+		final AtomicBoolean recycled = new AtomicBoolean(false);
+		final SingletonResultIterator<Object> iter = new SingletonResultIterator<>(() -> recycled.set(true));
+		iter.set(new Object(), 1L, 2L);
+
+		final FileRecords<Object> records = FileRecords.forRecords("test split", iter);
+		records.nextSplit();
+
+		records.recycle();
+		assertTrue(recycled.get());
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/StreamFormatAdapterTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/StreamFormatAdapterTest.java
new file mode 100644
index 0000000..29e74b1
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/StreamFormatAdapterTest.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.impl;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connector.file.src.reader.SimpleStreamFormat;
+import org.apache.flink.connector.file.src.reader.StreamFormat;
+import org.apache.flink.core.fs.FSDataInputStream;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Unit and behavior tests for the {@link StreamFormatAdapter}.
+ */
+@SuppressWarnings("serial")
+public class StreamFormatAdapterTest extends AdapterTestBase<StreamFormat<Integer>> {
+
+	// ------------------------------------------------------------------------
+	//  Factories for Shared Tests
+	// ------------------------------------------------------------------------
+
+	@Override
+	protected StreamFormat<Integer> createCheckpointedFormat() {
+		return new CheckpointedIntFormat();
+	}
+
+	@Override
+	protected StreamFormat<Integer> createNonCheckpointedFormat() {
+		return new NonCheckpointedIntFormat();
+	}
+
+	@Override
+	protected StreamFormat<Integer> createFormatFailingInInstantiation() {
+		return new FailingInstantiationFormat();
+	}
+
+	@Override
+	protected BulkFormat<Integer> wrapWithAdapter(StreamFormat<Integer> format) {
+		return new StreamFormatAdapter<>(format);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Additional Unit Tests
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testReadSmallBatchSize() throws IOException {
+		simpleReadTest(1);
+	}
+
+	@Test
+	public void testBatchSizeMatchesOneRecord() throws IOException {
+		simpleReadTest(4);
+	}
+
+	@Test
+	public void testBatchSizeIsRecordMultiple() throws IOException {
+		simpleReadTest(20);
+	}
+
+	private void simpleReadTest(int batchSize) throws IOException {
+		final Configuration config = new Configuration();
+		config.set(StreamFormat.FETCH_IO_SIZE, new MemorySize(batchSize));
+		final StreamFormatAdapter<Integer> format = new StreamFormatAdapter<>(new CheckpointedIntFormat());
+		final BulkFormat.Reader<Integer> reader = format.createReader(config, testPath, 0L, FILE_LEN);
+
+		final List<Integer> result = new ArrayList<>();
+		readNumbers(reader, result, NUM_NUMBERS);
+
+		verifyIntListResult(result);
+	}
+
+	// ------------------------------------------------------------------------
+	//  test mocks
+	// ------------------------------------------------------------------------
+
+	private static final class CheckpointedIntFormat implements StreamFormat<Integer> {
+
+		@Override
+		public Reader<Integer> createReader(
+				Configuration config,
+				FSDataInputStream stream,
+				long fileLen,
+				long splitEnd) throws IOException {
+
+			assertEquals("invalid file length", 0, fileLen % 4);
+
+			// round all positions to the next integer boundary
+			// to simulate common split behavior, we round up to the next int boundary even when we
+			// are at a perfect boundary. exceptions are if we are start or end.
+			final long currPos = stream.getPos();
+			final long start = currPos == 0L ? 0L : currPos + 4 - currPos % 4;
+			final long end = splitEnd == fileLen ? fileLen : splitEnd + 4 - splitEnd % 4;
+			stream.seek(start);
+
+			return new TestIntReader(stream, end, true);
+		}
+
+		@Override
+		public Reader<Integer> restoreReader(
+				Configuration config,
+				FSDataInputStream stream,
+				long restoredOffset,
+				long fileLen,
+				long splitEnd) throws IOException {
+
+			assertEquals("invalid file length", 0, fileLen % 4);
+
+			// round end position to the next integer boundary
+			final long end = splitEnd == fileLen ? fileLen : splitEnd + 4 - splitEnd % 4;
+			// no rounding of checkpointed offset
+			stream.seek(restoredOffset);
+			return new TestIntReader(stream, end, true);
+		}
+
+		@Override
+		public boolean isSplittable() {
+			return true;
+		}
+
+		@Override
+		public TypeInformation<Integer> getProducedType() {
+			return Types.INT;
+		}
+	}
+
+	private static final class NonCheckpointedIntFormat extends SimpleStreamFormat<Integer> {
+
+		@Override
+		public Reader<Integer> createReader(Configuration config, FSDataInputStream stream) throws IOException {
+			return new TestIntReader(stream, Long.MAX_VALUE, false);
+		}
+
+		@Override
+		public TypeInformation<Integer> getProducedType() {
+			return Types.INT;
+		}
+	}
+
+	private static final class FailingInstantiationFormat implements StreamFormat<Integer> {
+
+		@Override
+		public Reader<Integer> createReader(
+				Configuration config,
+				FSDataInputStream stream,
+				long fileLen,
+				long splitEnd) throws IOException {
+			throw new IOException("test exception");
+		}
+
+		@Override
+		public Reader<Integer> restoreReader(
+				Configuration config,
+				FSDataInputStream stream,
+				long restoredOffset,
+				long fileLen,
+				long splitEnd) throws IOException {
+			throw new IOException("test exception");
+		}
+
+		@Override
+		public boolean isSplittable() {
+			return false;
+		}
+
+		@Override
+		public TypeInformation<Integer> getProducedType() {
+			return Types.INT;
+		}
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/TestIntReader.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/TestIntReader.java
new file mode 100644
index 0000000..b37e02e
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/TestIntReader.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.impl;
+
+import org.apache.flink.connector.file.src.reader.FileRecordFormat;
+import org.apache.flink.connector.file.src.reader.StreamFormat;
+import org.apache.flink.connector.file.src.util.CheckpointedPosition;
+import org.apache.flink.core.fs.FSDataInputStream;
+
+import javax.annotation.Nullable;
+
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+
+/**
+ * Simple reader for integers, that is both a {@link StreamFormat.Reader} and a {@link FileRecordFormat.Reader}.
+ */
+class TestIntReader implements StreamFormat.Reader<Integer>, FileRecordFormat.Reader<Integer> {
+
+	private static final int SKIPS_PER_OFFSET = 7;
+
+	private final FSDataInputStream in;
+	private final DataInputStream din;
+
+	private final long endOffset;
+	private long currentOffset;
+	private long currentSkipCount;
+
+	private final boolean checkpointed;
+
+	TestIntReader(FSDataInputStream in, long endOffset, boolean checkpointsPosition) throws IOException {
+		this.in = in;
+		this.endOffset = endOffset;
+		this.currentOffset = in.getPos();
+		this.din = new DataInputStream(in);
+		this.checkpointed = checkpointsPosition;
+	}
+
+	@Nullable
+	@Override
+	public Integer read() throws IOException {
+		if (in.getPos() >= endOffset) {
+			return null;
+		}
+
+		try {
+			final int next = din.readInt();
+			incrementPosition();
+			return next;
+		} catch (EOFException e) {
+			return null;
+		}
+	}
+
+	@Override
+	public void close() throws IOException {
+		in.close();
+	}
+
+	@Nullable
+	@Override
+	public CheckpointedPosition getCheckpointedPosition() {
+		return checkpointed ? new CheckpointedPosition(currentOffset, currentSkipCount) : null;
+	}
+
+	private void incrementPosition() {
+		currentSkipCount++;
+		if (currentSkipCount >= SKIPS_PER_OFFSET) {
+			currentOffset += 4 * currentSkipCount;
+			currentSkipCount = 0;
+		}
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/testutils/TestingFileSystem.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/testutils/TestingFileSystem.java
new file mode 100644
index 0000000..3596024
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/testutils/TestingFileSystem.java
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.testutils;
+
+import org.apache.flink.core.fs.BlockLocation;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemKind;
+import org.apache.flink.core.fs.Path;
+
+import javax.annotation.Nullable;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link FileSystem} for tests containing a pre-defined set of files and directories.
+ *
+ * <p>The file system can be registered under its schema at the file system registry, so that
+ * the {@link Path#getFileSystem()} method finds this test file system. Use the
+ * {@link TestingFileSystem#register()} method to do that, and don't forget to
+ * {@link TestingFileSystem#unregister()} when the test is done.
+ */
+public class TestingFileSystem extends FileSystem {
+
+	private final String scheme;
+
+	private final Map<Path, Collection<FileStatus>> directories;
+
+	private final Map<Path, TestFileStatus> files;
+
+	private TestingFileSystem(
+			final String scheme,
+			final Map<Path, Collection<FileStatus>> directories,
+			final Map<Path, TestFileStatus> files) {
+		this.scheme = scheme;
+		this.directories = directories;
+		this.files = files;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Factories
+	// ------------------------------------------------------------------------
+
+	public static TestingFileSystem createWithFiles(final String scheme, final Path... files) {
+		return createWithFiles(scheme, Arrays.asList(files));
+	}
+
+	public static TestingFileSystem createWithFiles(final String scheme, final Collection<Path> files) {
+		checkNotNull(scheme, "scheme");
+		checkNotNull(files, "files");
+
+		final Collection<TestFileStatus> status = files.stream()
+				.map((path) -> TestFileStatus.forFileWithDefaultBlock(path, 10L << 20))
+				.collect(Collectors.toList());
+
+		return createForFileStatus(scheme, status);
+	}
+
+	public static TestingFileSystem createForFileStatus(final String scheme, final TestFileStatus... files) {
+		return createForFileStatus(scheme, Arrays.asList(files));
+	}
+
+	public static TestingFileSystem createForFileStatus(final String scheme, final Collection<TestFileStatus> files) {
+		checkNotNull(scheme, "scheme");
+		checkNotNull(files, "files");
+
+		final HashMap<Path, TestFileStatus> fileMap = new HashMap<>(files.size());
+		final HashMap<Path, Collection<FileStatus>> directories = new HashMap<>();
+
+		for (TestFileStatus file : files) {
+			if (fileMap.putIfAbsent(file.getPath(), file) != null) {
+				throw new IllegalStateException("Already have a status for path " + file.getPath());
+			}
+			addParentDirectories(file, fileMap, directories);
+		}
+
+		return new TestingFileSystem(scheme, directories, fileMap);
+	}
+
+	private static void addParentDirectories(
+			final TestFileStatus file,
+			final Map<Path, TestFileStatus> files,
+			final Map<Path, Collection<FileStatus>> directories) {
+
+		final Path parentPath = file.getPath().getParent();
+		if (parentPath == null) {
+			return;
+		}
+
+		final TestFileStatus parentStatus = TestFileStatus.forDirectory(parentPath);
+		directories.computeIfAbsent(parentPath, (key) -> new ArrayList<>()).add(file);
+
+		final TestFileStatus existingParent = files.putIfAbsent(parentPath, parentStatus);
+		if (existingParent == null) {
+			addParentDirectories(parentStatus, files, directories);
+		} else {
+			checkArgument(existingParent.isDir(), "have a file already for a directory path");
+		}
+	}
+
+
+	// ------------------------------------------------------------------------
+	//  Test Utility
+	// ------------------------------------------------------------------------
+
+	public void register() throws Exception {
+		final Object key = createFsKey(scheme);
+		final Map<Object, Object> fsMap = getFsRegistry();
+		fsMap.put(key, this);
+	}
+
+	public void unregister() throws Exception {
+		final Object key = createFsKey(scheme);
+		final Map<Object, Object> fsMap = getFsRegistry();
+		fsMap.remove(key);
+	}
+
+	private static Object createFsKey(String scheme) throws Exception {
+		final Class<?> fsKeyClass = Class.forName("org.apache.flink.core.fs.FileSystem$FSKey");
+		final Constructor<?> ctor = fsKeyClass.getConstructor(String.class, String.class);
+		ctor.setAccessible(true);
+		return ctor.newInstance(scheme, null);
+	}
+
+	@SuppressWarnings("unchecked")
+	private static Map<Object, Object> getFsRegistry() throws Exception {
+		final Field cacheField = FileSystem.class.getDeclaredField("CACHE");
+		cacheField.setAccessible(true);
+		return (Map<Object, Object>) cacheField.get(null);
+	}
+
+	// ------------------------------------------------------------------------
+	//  File System Methods
+	// ------------------------------------------------------------------------
+
+	@Override
+	public Path getWorkingDirectory() {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public Path getHomeDirectory() {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public URI getUri() {
+		return URI.create(scheme + "://");
+	}
+
+	@Override
+	public FileStatus getFileStatus(Path f) throws IOException {
+		final FileStatus status = files.get(f);
+		if (status != null) {
+			return status;
+		} else {
+			throw new FileNotFoundException("File not found: " + f);
+		}
+	}
+
+	@Override
+	public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException {
+		final TestFileStatus status = file instanceof TestFileStatus
+				? (TestFileStatus) file
+				: files.get(file.getPath());
+
+		if (status == null) {
+			throw new FileNotFoundException(file.getPath().toString());
+		}
+
+		return status.getBlocks();
+	}
+
+	@Override
+	public FSDataInputStream open(Path f, int bufferSize) throws IOException {
+		return open(f);
+	}
+
+	@Override
+	public FSDataInputStream open(Path f) throws IOException {
+		final TestFileStatus status = (TestFileStatus) getFileStatus(f);
+		if (status.stream != null) {
+			return status.stream;
+		} else {
+			throw new UnsupportedOperationException("No stream registered for this file");
+		}
+	}
+
+	@Override
+	public FileStatus[] listStatus(Path f) throws IOException {
+		final Collection<FileStatus> dirContents = directories.get(f);
+		if (dirContents != null) {
+			return dirContents.toArray(new FileStatus[dirContents.size()]);
+		} else {
+			throw new FileNotFoundException("Directory not found: " + f);
+		}
+	}
+
+	@Override
+	public boolean delete(Path f, boolean recursive) throws IOException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public boolean mkdirs(Path f) throws IOException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public FSDataOutputStream create(Path f, WriteMode overwriteMode) throws IOException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public boolean rename(Path src, Path dst) throws IOException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public boolean isDistributedFS() {
+		return false;
+	}
+
+	@Override
+	public FileSystemKind getKind() {
+		return FileSystemKind.FILE_SYSTEM;
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Test implementation of a {@link FileStatus}.
+	 */
+	public static final class TestFileStatus implements FileStatus {
+
+		private static final long TIME = System.currentTimeMillis();
+
+		public static TestFileStatus forFileWithDefaultBlock(final Path path, final long len) {
+			return forFileWithBlocks(path, len, new TestBlockLocation(0L, len));
+		}
+
+		public static TestFileStatus forFileWithBlocks(final Path path, final long len, final BlockLocation... blocks) {
+			checkNotNull(blocks);
+			return new TestFileStatus(path, len, false, blocks, null);
+		}
+
+		public static TestFileStatus forFileWithStream(final Path path, final long len, final FSDataInputStream stream) {
+			return new TestFileStatus(path, len, false, new BlockLocation[] {new TestBlockLocation(0L, len)}, stream);
+		}
+
+		public static TestFileStatus forDirectory(final Path path) {
+			return new TestFileStatus(path, 4096L, true, null, null);
+		}
+
+		// ------------------------------------------------
+
+		private final Path path;
+		private final long len;
+		private final boolean isDir;
+		private @Nullable final BlockLocation[] blocks;
+		@Nullable final FSDataInputStream stream;
+
+		private TestFileStatus(
+				final Path path,
+				final long len,
+				final boolean isDir,
+				@Nullable final BlockLocation[] blocks,
+				@Nullable final FSDataInputStream stream) {
+			this.path = path;
+			this.len = len;
+			this.isDir = isDir;
+			this.blocks = blocks;
+			this.stream = stream;
+		}
+
+		@Override
+		public long getLen() {
+			return len;
+		}
+
+		@Override
+		public long getBlockSize() {
+			return 64 << 20;
+		}
+
+		@Override
+		public short getReplication() {
+			return 1;
+		}
+
+		@Override
+		public long getModificationTime() {
+			return TIME;
+		}
+
+		@Override
+		public long getAccessTime() {
+			return TIME;
+		}
+
+		@Override
+		public boolean isDir() {
+			return isDir;
+		}
+
+		@Override
+		public Path getPath() {
+			return path;
+		}
+
+		public BlockLocation[] getBlocks() {
+			return blocks;
+		}
+
+		@Override
+		public String toString() {
+			return path.toString() + (isDir ? " [DIR]" : " [FILE]");
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Test implementation of a {@link BlockLocation}.
+	 */
+	public static final class TestBlockLocation implements BlockLocation {
+
+		private final String[] hosts;
+		private final long offset;
+		private final long length;
+
+		public TestBlockLocation(long offset, long length, String... hosts) {
+			checkArgument(offset >= 0);
+			checkArgument(length >= 0);
+			this.offset = offset;
+			this.length = length;
+			this.hosts = checkNotNull(hosts);
+		}
+
+		@Override
+		public String[] getHosts() throws IOException {
+			return hosts;
+		}
+
+		@Override
+		public long getOffset() {
+			return offset;
+		}
+
+		@Override
+		public long getLength() {
+			return length;
+		}
+
+		@Override
+		public int compareTo(BlockLocation o) {
+			return 0;
+		}
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/util/ArrayResultIteratorTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/util/ArrayResultIteratorTest.java
new file mode 100644
index 0000000..a46ffcd
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/util/ArrayResultIteratorTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.util;
+
+import org.junit.Test;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unit tests for the {@link ArrayResultIterator}.
+ */
+public class ArrayResultIteratorTest {
+
+	@Test
+	public void testEmptyConstruction() {
+		final ArrayResultIterator<Object> iter = new ArrayResultIterator<>();
+		assertNull(iter.next());
+	}
+
+	@Test
+	public void testGetElements() {
+		final String[] elements = new String[] { "1", "2", "3", "4"};
+		final long initialPos = 1422;
+		final long initialSkipCount = 17;
+
+		final ArrayResultIterator<String> iter = new ArrayResultIterator<>();
+		iter.set(elements, elements.length, initialPos, initialSkipCount);
+
+		for (int i = 0; i < elements.length; i++) {
+			final RecordAndPosition<String> recAndPos = iter.next();
+			assertEquals(elements[i], recAndPos.getRecord());
+			assertEquals(initialPos, recAndPos.getOffset());
+			assertEquals(initialSkipCount + i + 1, recAndPos.getRecordSkipCount());
+		}
+	}
+
+	@Test
+	public void testExhausted() {
+		final ArrayResultIterator<String> iter = new ArrayResultIterator<>();
+		iter.set(new String[] { "1", "2"}, 2, 0L, 0L);
+
+		iter.next();
+		iter.next();
+
+		assertNull(iter.next());
+	}
+
+	@Test
+	public void testArraySubRange() {
+		final ArrayResultIterator<String> iter = new ArrayResultIterator<>();
+		iter.set(new String[] { "1", "2", "3"}, 2, 0L, 0L);
+
+		assertNotNull(iter.next());
+		assertNotNull(iter.next());
+		assertNull(iter.next());
+	}
+
+	@Test
+	public void testNoRecycler() {
+		final ArrayResultIterator<Object> iter = new ArrayResultIterator<>();
+		iter.releaseBatch();
+	}
+
+	@Test
+	public void testRecycler() {
+		final AtomicBoolean recycled = new AtomicBoolean();
+		final ArrayResultIterator<Object> iter = new ArrayResultIterator<>(() -> recycled.set(true));
+
+		iter.releaseBatch();
+
+		assertTrue(recycled.get());
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/util/IteratorResultIteratorTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/util/IteratorResultIteratorTest.java
new file mode 100644
index 0000000..e355e79
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/util/IteratorResultIteratorTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.util;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+/**
+ * Unit tests for the {@link IteratorResultIterator}.
+ */
+public class IteratorResultIteratorTest {
+
+	@Test
+	public void testGetElements() {
+		final String[] elements = new String[] { "1", "2", "3", "4"};
+		final long initialPos = 1422;
+		final long initialSkipCount = 17;
+
+		final IteratorResultIterator<String> iter = new IteratorResultIterator<>(
+				Arrays.asList(elements).iterator(), initialPos, initialSkipCount);
+
+		for (int i = 0; i < elements.length; i++) {
+			final RecordAndPosition<String> recAndPos = iter.next();
+			assertEquals(elements[i], recAndPos.getRecord());
+			assertEquals(initialPos, recAndPos.getOffset());
+			assertEquals(initialSkipCount + i + 1, recAndPos.getRecordSkipCount());
+		}
+	}
+
+	@Test
+	public void testExhausted() {
+		final IteratorResultIterator<String> iter = new IteratorResultIterator<>(
+				Arrays.asList("1", "2").iterator(), 0L, 0L);
+
+		iter.next();
+		iter.next();
+
+		assertNull(iter.next());
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/util/SingletonResultIteratorTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/util/SingletonResultIteratorTest.java
new file mode 100644
index 0000000..e1bbcd4
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/util/SingletonResultIteratorTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.util;
+
+import org.junit.Test;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unit tests for the {@link SingletonResultIterator}.
+ */
+public class SingletonResultIteratorTest {
+
+	@Test
+	public void testEmptyConstruction() {
+		final SingletonResultIterator<Object> iter = new SingletonResultIterator<>();
+		assertNull(iter.next());
+	}
+
+	@Test
+	public void testGetElement() {
+		final Object element = new Object();
+		final long pos = 1422;
+		final long skipCount = 17;
+
+		final SingletonResultIterator<Object> iter = new SingletonResultIterator<>();
+		iter.set(element, pos, skipCount);
+
+		final RecordAndPosition<Object> record = iter.next();
+		assertNotNull(record);
+		assertEquals(element, record.getRecord());
+		assertEquals(pos, record.getOffset());
+		assertEquals(skipCount, record.getRecordSkipCount());
+	}
+
+	@Test
+	public void testExhausted() {
+		final SingletonResultIterator<Object> iter = new SingletonResultIterator<>();
+		iter.set(new Object(), 1, 2);
+		iter.next();
+
+		assertNull(iter.next());
+	}
+
+	@Test
+	public void testNoRecycler() {
+		final SingletonResultIterator<Object> iter = new SingletonResultIterator<>();
+		iter.releaseBatch();
+	}
+
+	@Test
+	public void testRecycler() {
+		final AtomicBoolean recycled = new AtomicBoolean();
+		final SingletonResultIterator<Object> iter = new SingletonResultIterator<>(() -> recycled.set(true));
+
+		iter.releaseBatch();
+
+		assertTrue(recycled.get());
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/test/resources/log4j2-test.properties b/flink-connectors/flink-connector-files/src/test/resources/log4j2-test.properties
new file mode 100644
index 0000000..b4d2ba9
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/resources/log4j2-test.properties
@@ -0,0 +1,28 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = OFF
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%t] %-5p %c - %m%n
diff --git a/flink-connectors/pom.xml b/flink-connectors/pom.xml
index 8a0d867..9401351 100644
--- a/flink-connectors/pom.xml
+++ b/flink-connectors/pom.xml
@@ -55,6 +55,7 @@ under the License.
 		<module>flink-connector-gcp-pubsub</module>
 		<module>flink-connector-kinesis</module>
 		<module>flink-connector-base</module>
+		<module>flink-connector-files</module>
 	</modules>
 
 	<!-- override these root dependencies as 'provided', so they don't end up
diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml
index b8a005c..d22b0f1 100644
--- a/flink-dist/pom.xml
+++ b/flink-dist/pom.xml
@@ -144,6 +144,12 @@ under the License.
 			<version>${project.version}</version>
 		</dependency>
 
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-files</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
 		<!-- Default file system support. The Hadoop and MapR dependencies -->
 		<!--       are optional, so not being added to the dist jar        -->
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index 1af572c..05b3216 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -173,11 +173,12 @@ public class SourceOperator<OUT, SplitT extends SourceSplit>
 			sourceReader.addSplits(splits);
 		}
 
-		// Start the reader.
-		sourceReader.start();
 		// Register the reader to the coordinator.
 		registerReader();
 
+		// Start the reader after registration, sending messages in start is allowed.
+		sourceReader.start();
+
 		eventTimeLogic.startPeriodicWatermarkEmits();
 	}
 


[flink] 02/02: [hotfix][runtime] Remove commented-out annotation in SourceOperator

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2da55b8f841ba93e1aa7c56de31e6c5632d725ba
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Mon Sep 21 18:09:43 2020 +0200

    [hotfix][runtime] Remove commented-out annotation in SourceOperator
---
 .../java/org/apache/flink/streaming/api/operators/SourceOperator.java    | 1 -
 1 file changed, 1 deletion(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index 05b3216..95c9eda 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -66,7 +66,6 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * @param <OUT> The output type of the operator.
  */
 @Internal
-//@SuppressWarnings("serial")
 public class SourceOperator<OUT, SplitT extends SourceSplit>
 		extends AbstractStreamOperator<OUT>
 		implements OperatorEventHandler, PushingAsyncDataInput<OUT> {