You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/09/21 19:17:15 UTC

[flink] 01/02: [FLINK-19161][file connector] Add first version of the FLIP-27 File Source

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 90d01f7409c579313e70fcbfa9e0342e6af50ff8
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Mon Jun 29 11:00:59 2020 +0200

    [FLINK-19161][file connector] Add first version of the FLIP-27 File Source
    
    This closes #13401
---
 .../SingleThreadMultiplexSourceReaderBase.java     |  63 ++-
 .../source/reader/mocks/TestingReaderContext.java  |   4 +-
 flink-connectors/flink-connector-files/pom.xml     |  83 ++++
 .../file/src/ContinuousEnumerationSettings.java    |  55 +++
 .../flink/connector/file/src/FileSource.java       | 437 +++++++++++++++++++++
 .../flink/connector/file/src/FileSourceSplit.java  | 219 +++++++++++
 .../file/src/FileSourceSplitSerializer.java        | 128 ++++++
 .../connector/file/src/FileSourceSplitState.java   | 106 +++++
 .../file/src/PendingSplitsCheckpoint.java          |  93 +++++
 .../src/PendingSplitsCheckpointSerializer.java     | 150 +++++++
 .../file/src/assigners/FileSplitAssigner.java      |  71 ++++
 .../file/src/assigners/SimpleSplitAssigner.java    |  65 +++
 .../src/compression/StandardDeCompressors.java     | 105 +++++
 .../BlockSplittingRecursiveEnumerator.java         | 150 +++++++
 .../file/src/enumerate/DefaultFileFilter.java      |  42 ++
 .../file/src/enumerate/FileEnumerator.java         |  57 +++
 .../enumerate/NonSplittingRecursiveEnumerator.java | 144 +++++++
 .../src/impl/ContinuousFileSplitEnumerator.java    | 162 ++++++++
 .../file/src/impl/FileRecordFormatAdapter.java     | 168 ++++++++
 .../flink/connector/file/src/impl/FileRecords.java | 109 +++++
 .../connector/file/src/impl/FileSourceReader.java  |  72 ++++
 .../file/src/impl/FileSourceRecordEmitter.java     |  47 +++
 .../file/src/impl/FileSourceSplitReader.java       | 116 ++++++
 .../file/src/impl/StaticFileSplitEnumerator.java   | 128 ++++++
 .../file/src/impl/StreamFormatAdapter.java         | 269 +++++++++++++
 .../connector/file/src/reader/BulkFormat.java      | 199 ++++++++++
 .../file/src/reader/FileRecordFormat.java          | 200 ++++++++++
 .../file/src/reader/SimpleStreamFormat.java        | 115 ++++++
 .../connector/file/src/reader/StreamFormat.java    | 219 +++++++++++
 .../connector/file/src/reader/TextLineFormat.java  |  96 +++++
 .../file/src/util/ArrayResultIterator.java         |  90 +++++
 .../file/src/util/CheckpointedPosition.java        | 107 +++++
 .../file/src/util/IteratorResultIterator.java      |  95 +++++
 .../file/src/util/MutableRecordAndPosition.java    |  57 +++
 .../apache/flink/connector/file/src/util/Pool.java | 115 ++++++
 .../connector/file/src/util/RecordAndPosition.java |  90 +++++
 .../file/src/util/RecyclableIterator.java          |  50 +++
 .../file/src/util/SingletonResultIterator.java     |  75 ++++
 .../flink/connector/file/src/util/Utils.java       |  72 ++++
 .../file/src/FileSourceHeavyThroughputTest.java    | 230 +++++++++++
 .../file/src/FileSourceSplitSerializerTest.java    | 128 ++++++
 .../file/src/FileSourceSplitStateTest.java         |  82 ++++
 .../connector/file/src/FileSourceSplitTest.java    |  34 ++
 .../file/src/FileSourceTextLinesITCase.java        | 273 +++++++++++++
 .../src/PendingSplitsCheckpointSerializerTest.java | 150 +++++++
 .../BlockSplittingRecursiveEnumeratorTest.java     |  62 +++
 .../NonSplittingRecursiveEnumeratorTest.java       | 204 ++++++++++
 .../connector/file/src/impl/AdapterTestBase.java   | 284 +++++++++++++
 .../file/src/impl/FileRecordFormatAdapterTest.java | 199 ++++++++++
 .../connector/file/src/impl/FileRecordsTest.java   | 126 ++++++
 .../file/src/impl/StreamFormatAdapterTest.java     | 197 ++++++++++
 .../connector/file/src/impl/TestIntReader.java     |  90 +++++
 .../file/src/testutils/TestingFileSystem.java      | 391 ++++++++++++++++++
 .../file/src/util/ArrayResultIteratorTest.java     |  94 +++++
 .../file/src/util/IteratorResultIteratorTest.java  |  60 +++
 .../file/src/util/SingletonResultIteratorTest.java |  81 ++++
 .../src/test/resources/log4j2-test.properties      |  28 ++
 flink-connectors/pom.xml                           |   1 +
 flink-dist/pom.xml                                 |   6 +
 .../streaming/api/operators/SourceOperator.java    |   5 +-
 60 files changed, 7335 insertions(+), 13 deletions(-)

diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java
index ab87db0..f5806d1 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java
@@ -26,24 +26,67 @@ import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcher
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
 import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
 
+import java.util.Collection;
 import java.util.function.Supplier;
 
 /**
- * A abstract {@link SourceReader} implementation that assign all the splits to a single thread to consume.
- * @param <E>
- * @param <T>
- * @param <SplitT>
- * @param <SplitStateT>
+ * A base for {@link SourceReader}s that read splits with one thread using one {@link SplitReader}.
+ * The splits can be read either one after the other (like in a file source) or concurrently by changing
+ * the subscription in the split reader (like in the Kafka Source).
+ *
+ * <p>To implement a source reader based on this class, implementors need to supply the following:
+ * <ul>
+ *   <li>A {@link SplitReader}, which connects to the source and reads/polls data. The split reader
+ *       gets notified whenever there is a new split. The split reader would read files, contain a
+ *       Kafka or other source client, etc.</li>
+ *   <li>A {@link RecordEmitter} that takes a record from the Split Reader and updates the checkpointing
+ *       state and converts it into the final form. For example for Kafka, the Record Emitter takes a
+ *       {@code ConsumerRecord}, puts the offset information into state, transforms the records with the
+ *       deserializers into the final type, and emits the record.</li>
+ *   <li>The class must override the methods to convert back and forth between the immutable splits
+ *       ({@code SplitT}) and the mutable split state representation ({@code SplitStateT}).</li>
+ *   <li>Finally, the reader must decide what to do when it starts ({@link #start()}) or when a split is
+ *       finished ({@link #onSplitFinished(Collection)}).</li>
+ * </ul>
+ *
+ * @param <E> The type of the records (the raw type that typically contains checkpointing information).
+ * @param <T> The final type of the records emitted by the source.
+ * @param <SplitT> The type of the splits processed by the source.
+ * @param <SplitStateT> The type of the mutable state per split.
  */
 public abstract class SingleThreadMultiplexSourceReaderBase<E, T, SplitT extends SourceSplit, SplitStateT>
 	extends SourceReaderBase<E, T, SplitT, SplitStateT> {
 
+	/**
+	 * The primary constructor for the source reader.
+	 *
+	 * <p>The reader will use a handover queue sized as configured via
+	 * {@link SourceReaderOptions#ELEMENT_QUEUE_CAPACITY}.
+	 */
+	public SingleThreadMultiplexSourceReaderBase(
+			Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
+			RecordEmitter<E, T, SplitStateT> recordEmitter,
+			Configuration config,
+			SourceReaderContext context) {
+		this(
+			new FutureCompletingBlockingQueue<>(config.getInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY)),
+			splitReaderSupplier,
+			recordEmitter,
+			config,
+			context);
+	}
+
+	/**
+	 * This constructor behaves like
+	 * {@link #SingleThreadMultiplexSourceReaderBase(Supplier, RecordEmitter, Configuration, SourceReaderContext)},
+	 * but accepts a specific {@link FutureCompletingBlockingQueue}.
+	 */
 	public SingleThreadMultiplexSourceReaderBase(
-		FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
-		Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
-		RecordEmitter<E, T, SplitStateT> recordEmitter,
-		Configuration config,
-		SourceReaderContext context) {
+			FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
+			Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
+			RecordEmitter<E, T, SplitStateT> recordEmitter,
+			Configuration config,
+			SourceReaderContext context) {
 		super(
 			elementsQueue,
 			new SingleThreadFetcherManager<>(elementsQueue, splitReaderSupplier),
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingReaderContext.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingReaderContext.java
index 02faf1f..5bb9b59 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingReaderContext.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingReaderContext.java
@@ -64,7 +64,9 @@ public class TestingReaderContext implements SourceReaderContext {
 	}
 
 	@Override
-	public void sendSourceEventToCoordinator(SourceEvent sourceEvent) {}
+	public void sendSourceEventToCoordinator(SourceEvent sourceEvent) {
+		sentEvents.add(sourceEvent);
+	}
 
 	// ------------------------------------------------------------------------
 
diff --git a/flink-connectors/flink-connector-files/pom.xml b/flink-connectors/flink-connector-files/pom.xml
new file mode 100644
index 0000000..e1a5149
--- /dev/null
+++ b/flink-connectors/flink-connector-files/pom.xml
@@ -0,0 +1,83 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-connectors</artifactId>
+		<version>1.12-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-connector-files</artifactId>
+	<name>Flink : Connectors : Files</name>
+
+	<packaging>jar</packaging>
+
+	<dependencies>
+
+		<!-- core dependencies -->
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-base</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<!-- test dependencies -->
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-base</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
+			<version>1.12-SNAPSHOT</version>
+			<scope>test</scope>
+		</dependency>
+
+	</dependencies>
+
+</project>
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/ContinuousEnumerationSettings.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/ContinuousEnumerationSettings.java
new file mode 100644
index 0000000..4123c43
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/ContinuousEnumerationSettings.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src;
+
+import org.apache.flink.annotation.Internal;
+
+import java.io.Serializable;
+import java.time.Duration;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Settings describing how to do continuous file discovery and enumeration for the
+ * file source's continuous discovery and streaming mode.
+ */
+@Internal
+final class ContinuousEnumerationSettings implements Serializable {
+
+	private static final long serialVersionUID = 1L;
+
+	private final Duration discoveryInterval;
+
+	ContinuousEnumerationSettings(Duration discoveryInterval) {
+		this.discoveryInterval = checkNotNull(discoveryInterval);
+	}
+
+	public Duration getDiscoveryInterval() {
+		return discoveryInterval;
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public String toString() {
+		return "ContinuousEnumerationSettings{" +
+			"discoveryInterval=" + discoveryInterval +
+			'}';
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSource.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSource.java
new file mode 100644
index 0000000..296abf9
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSource.java
@@ -0,0 +1,437 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.connector.file.src.assigners.FileSplitAssigner;
+import org.apache.flink.connector.file.src.assigners.SimpleSplitAssigner;
+import org.apache.flink.connector.file.src.enumerate.BlockSplittingRecursiveEnumerator;
+import org.apache.flink.connector.file.src.enumerate.FileEnumerator;
+import org.apache.flink.connector.file.src.impl.ContinuousFileSplitEnumerator;
+import org.apache.flink.connector.file.src.impl.FileRecordFormatAdapter;
+import org.apache.flink.connector.file.src.impl.FileSourceReader;
+import org.apache.flink.connector.file.src.impl.StaticFileSplitEnumerator;
+import org.apache.flink.connector.file.src.impl.StreamFormatAdapter;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connector.file.src.reader.FileRecordFormat;
+import org.apache.flink.connector.file.src.reader.StreamFormat;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A unified data source that reads files - both in batch and in streaming mode.
+ *
+ * <p>This source supports all (distributed) file systems and object stores that can be
+ * accessed via the Flink's {@link FileSystem} class.
+ *
+ * <p>Start building a file source via one of the following calls:
+ * <ul>
+ *   <li>{@link FileSource#forRecordStreamFormat(StreamFormat, Path...)}</li>
+ *   <li>{@link FileSource#forBulkFileFormat(BulkFormat, Path...)}</li>
+ *   <li>{@link FileSource#forRecordFileFormat(FileRecordFormat, Path...)}</li>
+ * </ul>
+ * This creates a {@link FileSource.FileSourceBuilder} on which you can configure all the
+ * properties of the file source.
+ *
+ * <h2>Batch and Streaming</h2>
+ *
+ * <p>This source supports both bounded/batch and continuous/streaming data inputs. For the
+ * bounded/batch case, the file source processes all files under the given path(s).
+ * In the continuous/streaming case, the source periodically checks the paths for new files
+ * and will start reading those.
+ *
+ * <p>When you start creating a file source (via the {@link FileSource.FileSourceBuilder} created
+ * through one of the above-mentioned methods) the source is by default in bounded/batch mode.
+ * Call {@link FileSource.FileSourceBuilder#monitorContinuously(Duration)} to put the source
+ * into continuous streaming mode.
+ *
+ * <h2>Format Types</h2>
+ *
+ * <p>The reading of each file happens through file readers defined by <i>file formats</i>.
+ * These define the parsing logic for the contents of the file. There are multiple classes that
+ * the source supports. Their interfaces trade of simplicity of implementation and flexibility/efficiency.
+ * <ul>
+ *     <li>A {@link StreamFormat} reads the contents of a file from a file stream. It is the simplest
+ *         format to implement, and provides many features out-of-the-box (like checkpointing logic)
+ *         but is limited in the optimizations it can apply (such as object reuse, batching, etc.).</li>
+ *     <li>A {@link BulkFormat} reads batches of records from a file at a time. It is the most "low level"
+ *         format to implement, but offers the greatest flexibility to optimize the implementation.</li>
+ *     <li>A {@link FileRecordFormat} is in the middle of the trade-off spectrum between the
+ *         {@code StreamFormat} and the {@code BulkFormat}.</li>
+ * </ul>
+ *
+ * <h2>Discovering / Enumerating Files</h2>
+ *
+ * <p>The way that the source lists the files to be processes is defined by the {@link FileEnumerator}.
+ * The {@code FileEnumerator} is responsible to select the relevant files (for example filter out
+ * hidden files) and to optionally splits files into multiple regions (= file source splits) that
+ * can be read in parallel).
+ *
+ * @param <T> The type of the events/records produced by this source.
+ */
+@PublicEvolving
+public final class FileSource<T> implements Source<T, FileSourceSplit, PendingSplitsCheckpoint>, ResultTypeQueryable<T> {
+
+	private static final long serialVersionUID = 1L;
+
+	/**
+	 * The default split assigner, a lazy non-locality-aware assigner.
+	 */
+	public static final FileSplitAssigner.Provider DEFAULT_SPLIT_ASSIGNER = SimpleSplitAssigner::new;
+
+	/**
+	 * The default file enumerator used for splittable formats.
+	 * The enumerator recursively enumerates files, split files that consist of multiple distributed storage
+	 * blocks into multiple splits, and filters hidden files (files starting with '.' or '_').
+	 * Files with suffixes of common compression formats (for example '.gzip', '.bz2', '.xy', '.zip', ...)
+	 * will not be split.
+	 */
+	public static final FileEnumerator.Provider DEFAULT_SPLITTABLE_FILE_ENUMERATOR = BlockSplittingRecursiveEnumerator::new;
+
+	/**
+	 * The default file enumerator used for non-splittable formats.
+	 * The enumerator recursively enumerates files, creates one split for the file, and filters hidden
+	 * files (files starting with '.' or '_').
+	 */
+	public static final FileEnumerator.Provider DEFAULT_NON_SPLITTABLE_FILE_ENUMERATOR = BlockSplittingRecursiveEnumerator::new;
+
+	// ------------------------------------------------------------------------
+
+	private final Path[] inputPaths;
+
+	private final FileEnumerator.Provider enumeratorFactory;
+
+	private final FileSplitAssigner.Provider assignerFactory;
+
+	private final BulkFormat<T> readerFormat;
+
+	@Nullable
+	private final ContinuousEnumerationSettings continuousEnumerationSettings;
+
+	// ------------------------------------------------------------------------
+
+	private FileSource(
+			final Path[] inputPaths,
+			final FileEnumerator.Provider fileEnumerator,
+			final FileSplitAssigner.Provider splitAssigner,
+			final BulkFormat<T> readerFormat,
+			@Nullable final ContinuousEnumerationSettings continuousEnumerationSettings) {
+
+		checkArgument(inputPaths.length > 0);
+		this.inputPaths = inputPaths;
+		this.enumeratorFactory = checkNotNull(fileEnumerator);
+		this.assignerFactory = checkNotNull(splitAssigner);
+		this.readerFormat = checkNotNull(readerFormat);
+		this.continuousEnumerationSettings = continuousEnumerationSettings;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Source API Methods
+	// ------------------------------------------------------------------------
+
+	@Override
+	public Boundedness getBoundedness() {
+		return continuousEnumerationSettings == null ? Boundedness.BOUNDED : Boundedness.CONTINUOUS_UNBOUNDED;
+	}
+
+	@Override
+	public SourceReader<T, FileSourceSplit> createReader(SourceReaderContext readerContext) {
+		return new FileSourceReader<>(readerContext, readerFormat, readerContext.getConfiguration());
+	}
+
+	@Override
+	public SplitEnumerator<FileSourceSplit, PendingSplitsCheckpoint> createEnumerator(
+			SplitEnumeratorContext<FileSourceSplit> enumContext) {
+
+		final FileEnumerator enumerator = enumeratorFactory.create();
+
+		// read the initial set of splits (which is also the total set of splits for bounded sources)
+		final Collection<FileSourceSplit> splits;
+		try {
+			// TODO - in the next cleanup pass, we should try to remove the need to "wrap unchecked" here
+			splits = enumerator.enumerateSplits(inputPaths, enumContext.currentParallelism());
+		} catch (IOException e) {
+			throw new FlinkRuntimeException("Could not enumerate file splits", e);
+		}
+
+		return createSplitEnumerator(enumContext, enumerator, splits, null);
+	}
+
+	@Override
+	public SplitEnumerator<FileSourceSplit, PendingSplitsCheckpoint> restoreEnumerator(
+			SplitEnumeratorContext<FileSourceSplit> enumContext,
+			PendingSplitsCheckpoint checkpoint) throws IOException {
+
+		final FileEnumerator enumerator = enumeratorFactory.create();
+
+		return createSplitEnumerator(enumContext, enumerator, checkpoint.getSplits(), checkpoint.getAlreadyProcessedPaths());
+	}
+
+	@Override
+	public SimpleVersionedSerializer<FileSourceSplit> getSplitSerializer() {
+		return FileSourceSplitSerializer.INSTANCE;
+	}
+
+	@Override
+	public SimpleVersionedSerializer<PendingSplitsCheckpoint> getEnumeratorCheckpointSerializer() {
+		return PendingSplitsCheckpointSerializer.INSTANCE;
+	}
+
+	@Override
+	public TypeInformation<T> getProducedType() {
+		return readerFormat.getProducedType();
+	}
+
+	// ------------------------------------------------------------------------
+	//  helpers
+	// ------------------------------------------------------------------------
+
+	private SplitEnumerator<FileSourceSplit, PendingSplitsCheckpoint> createSplitEnumerator(
+			SplitEnumeratorContext<FileSourceSplit> context,
+			FileEnumerator enumerator,
+			Collection<FileSourceSplit> splits,
+			@Nullable Collection<Path> alreadyProcessedPaths) {
+
+		final FileSplitAssigner splitAssigner = assignerFactory.create(splits);
+
+		if (continuousEnumerationSettings == null) {
+			// bounded case
+			return new StaticFileSplitEnumerator(context, splitAssigner);
+		} else {
+			// unbounded case
+			if (alreadyProcessedPaths == null) {
+				alreadyProcessedPaths = splitsToPaths(splits);
+			}
+
+			return new ContinuousFileSplitEnumerator(
+					context,
+					enumerator,
+					splitAssigner,
+					inputPaths,
+					alreadyProcessedPaths,
+					continuousEnumerationSettings.getDiscoveryInterval().toMillis());
+		}
+	}
+
+	private static Collection<Path> splitsToPaths(Collection<FileSourceSplit> splits) {
+		return splits.stream()
+			.map(FileSourceSplit::path)
+			.collect(Collectors.toCollection(HashSet::new));
+	}
+
+	// ------------------------------------------------------------------------
+	//  Entry-point Factory Methods
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Builds a new {@code FileSource} using a {@link StreamFormat} to read record-by-record from a
+	 * file stream.
+	 *
+	 * <p>When possible, stream-based formats are generally easier (preferable) to file-based formats,
+	 * because they support better default behavior around I/O batching, or better progress tracking to
+	 * avoid re-doing work on recovery.
+	 */
+	public static <T> FileSourceBuilder<T> forRecordStreamFormat(final StreamFormat<T> reader, final Path... paths) {
+		checkNotNull(reader, "reader");
+		checkNotNull(paths, "paths");
+		checkArgument(paths.length > 0, "paths must not be empty");
+
+		final BulkFormat<T> bulkFormat = new StreamFormatAdapter<>(reader);
+		return new FileSourceBuilder<>(paths, bulkFormat);
+	}
+
+	/**
+	 * Builds a new {@code FileSource} using a {@link BulkFormat} to read batches of records
+	 * from files.
+	 *
+	 * <p>Examples for bulk readers are compressed and vectorized formats such as ORC or Parquet.
+	 */
+	public static <T> FileSourceBuilder<T> forBulkFileFormat(final BulkFormat<T> reader, final Path... paths) {
+		checkNotNull(reader, "reader");
+		checkNotNull(paths, "paths");
+		checkArgument(paths.length > 0, "paths must not be empty");
+
+		return new FileSourceBuilder<>(paths, reader);
+	}
+
+	/**
+	 * Builds a new {@code FileSource} using a {@link FileRecordFormat} to read record-by-record from a
+	 * a file path.
+	 *
+	 * <p>A {@code FileRecordFormat} is more general than the {@link StreamFormat}, but also
+	 * requires often more careful parametrization.
+	 */
+	public static <T> FileSourceBuilder<T> forRecordFileFormat(final FileRecordFormat<T> reader, final Path... paths) {
+		checkNotNull(reader, "reader");
+		checkNotNull(paths, "paths");
+		checkArgument(paths.length > 0, "paths must not be empty");
+
+		final BulkFormat<T> bulkFormat = new FileRecordFormatAdapter<>(reader);
+		return new FileSourceBuilder<>(paths, bulkFormat);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Builder
+	// ------------------------------------------------------------------------
+
+	/**
+	 * The builder for the {@code FileSource}, to configure the various behaviors.
+	 *
+	 * <p>Start building the source via one of the following methods:
+	 * <ul>
+	 *   <li>{@link FileSource#forRecordStreamFormat(StreamFormat, Path...)}</li>
+	 *   <li>{@link FileSource#forBulkFileFormat(BulkFormat, Path...)}</li>
+	 *   <li>{@link FileSource#forRecordFileFormat(FileRecordFormat, Path...)}</li>
+	 * </ul>
+	 */
+	public static final class FileSourceBuilder<T> extends AbstractFileSourceBuilder<T, FileSourceBuilder<T>> {
+		public FileSourceBuilder(Path[] inputPaths, BulkFormat<T> readerFormat) {
+			super(inputPaths, readerFormat);
+		}
+	}
+
+	/**
+	 * The generic base builder. This builder carries a <i>SELF</i> type to make it convenient to
+	 * extend this for subclasses, using the following pattern.
+	 * <pre>{@code
+	 * public class SubBuilder<T> extends AbstractFileSourceBuilder<T, SubBuilder<T>> {
+	 *     ...
+	 * }
+	 * }</pre>
+	 * That way, all return values from builder method defined here are typed to the sub-class
+	 * type and support fluent chaining.
+	 *
+	 * <p>We don't make the publicly visible builder generic with a SELF type, because it leads to
+	 * generic signatures that can look complicated and confusing.
+	 */
+	public static class AbstractFileSourceBuilder<T, SELF extends AbstractFileSourceBuilder<T, SELF>> {
+
+		// mandatory - have no defaults
+		private final Path[] inputPaths;
+		private final BulkFormat<T> readerFormat;
+
+		// optional - have defaults
+		private FileEnumerator.Provider fileEnumerator;
+		private FileSplitAssigner.Provider splitAssigner;
+		@Nullable
+		private ContinuousEnumerationSettings continuousSourceSettings;
+
+		protected AbstractFileSourceBuilder(Path[] inputPaths, BulkFormat<T> readerFormat) {
+			this.inputPaths = checkNotNull(inputPaths);
+			this.readerFormat = checkNotNull(readerFormat);
+
+			this.fileEnumerator = readerFormat.isSplittable()
+					? DEFAULT_SPLITTABLE_FILE_ENUMERATOR
+					: DEFAULT_NON_SPLITTABLE_FILE_ENUMERATOR;
+			this.splitAssigner = DEFAULT_SPLIT_ASSIGNER;
+		}
+
+		/**
+		 * Creates the file source with the settings applied to this builder.
+		 */
+		public FileSource<T> build() {
+			return new FileSource<>(
+				inputPaths,
+				fileEnumerator,
+				splitAssigner,
+				readerFormat,
+				continuousSourceSettings);
+		}
+
+		/**
+		 * Sets this source to streaming ("continuous monitoring") mode.
+		 *
+		 * <p>This makes the source a "continuous streaming" source that keeps running, monitoring
+		 * for new files, and reads these files when they appear and are discovered by the monitoring.
+		 *
+		 * <p>The interval in which the source checks for new files is the {@code discoveryInterval}.
+		 * Shorter intervals mean that files are discovered more quickly, but also imply more frequent
+		 * listing or directory traversal of the file system / object store.
+		 */
+		public SELF monitorContinuously(Duration discoveryInterval) {
+			checkNotNull(discoveryInterval, "discoveryInterval");
+			checkArgument(!(discoveryInterval.isNegative() || discoveryInterval.isZero()), "discoveryInterval must be > 0");
+
+			this.continuousSourceSettings = new ContinuousEnumerationSettings(discoveryInterval);
+			return self();
+		}
+
+		/**
+		 * Sets this source to bounded (batch) mode.
+		 *
+		 * <p>In this mode, the source processes the files that are under the given paths when the
+		 * application is started. Once all files are processed, the source will finish.
+		 *
+		 * <p>This setting is also the default behavior. This method is mainly here to "switch back" to
+		 * bounded (batch) mode, or to make it explicit in the source construction.
+		 */
+		public SELF processStaticFileSet() {
+			this.continuousSourceSettings = null;
+			return self();
+		}
+
+		/**
+		 * Configures the {@link FileEnumerator} for the source.
+		 * The File Enumerator is responsible for selecting from the input path the set of files
+		 * that should be processed (and which to filter out). Furthermore, the File Enumerator
+		 * may split the files further into sub-regions, to enable parallelization beyond the number
+		 * of files.
+		 */
+		public SELF setFileEnumerator(FileEnumerator.Provider fileEnumerator) {
+			this.fileEnumerator = checkNotNull(fileEnumerator);
+			return self();
+		}
+
+		/**
+		 * Configures the {@link FileSplitAssigner} for the source.
+		 * The File Split Assigner determines which parallel reader instance gets which
+		 * {@link FileSourceSplit}, and in which order these splits are assigned.
+		 */
+		public SELF setSplitAssigner(FileSplitAssigner.Provider splitAssigner) {
+			this.splitAssigner = checkNotNull(splitAssigner);
+			return self();
+		}
+
+		@SuppressWarnings("unchecked")
+		private SELF self() {
+			return (SELF) this;
+		}
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSourceSplit.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSourceSplit.java
new file mode 100644
index 0000000..7f85618
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSourceSplit.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.connector.file.src.util.CheckpointedPosition;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.StringUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link SourceSplit} that represents a file, or a region of a file.
+ *
+ * <p>The split has an offset and an end, which defines the region of the file represented by
+ * the split. For splits representing the while file, the offset is zero and the length is the
+ * file size.
+ *
+ * <p>The split may furthermore have a "reader position", which is the checkpointed position from
+ * a reader previously reading this split. This position is typically null when the split is assigned
+ * from the enumerator to the readers, and is non-null when the readers checkpoint their state
+ * in a file source split.
+ *
+ * <p>This class is {@link Serializable} for convenience. For Flink's internal serialization (both for
+ * RPC and for checkpoints), the {@link FileSourceSplitSerializer} is used.
+ */
+@PublicEvolving
+public class FileSourceSplit implements SourceSplit, Serializable {
+
+	private static final long serialVersionUID = 1L;
+
+	private static final String[] NO_HOSTS = StringUtils.EMPTY_STRING_ARRAY;
+
+	/** The unique ID of the split. Unique within the scope of this source. */
+	private final String id;
+
+	/** The path of the file referenced by this split. */
+	private final Path filePath;
+
+	/** The position of the first byte in the file to process. */
+	private final long offset;
+
+	/** The number of bytes in the file to process. */
+	private final long length;
+
+	/** The names of the hosts storing this range of the file. Empty, if no host information is available. */
+	private final String[] hostnames;
+
+	/** The precise reader position in the split, to resume from. */
+	@Nullable
+	private final CheckpointedPosition readerPosition;
+
+	/** The splits are frequently serialized into checkpoints.
+	 * Caching the byte representation makes repeated serialization cheap.
+	 * This field is used by {@link FileSourceSplitSerializer}. */
+	@Nullable
+	transient byte[] serializedFormCache;
+
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Constructs a split with host information.
+	 *
+	 * @param id The unique ID of this source split.
+	 * @param filePath The path to the file.
+	 * @param offset The start (inclusive) of the split's rage in the file.
+	 * @param length The number of bytes in the split (starting from the offset)
+	 */
+	public FileSourceSplit(String id, Path filePath, long offset, long length) {
+		this(id, filePath, offset, length, NO_HOSTS);
+	}
+
+	/**
+	 * Constructs a split with host information.
+	 *
+	 * @param filePath The path to the file.
+	 * @param offset The start (inclusive) of the split's rage in the file.
+	 * @param length The number of bytes in the split (starting from the offset)
+	 * @param hostnames The hostnames of the nodes storing the split's file range.
+	 */
+	public FileSourceSplit(String id, Path filePath, long offset, long length, String... hostnames) {
+		this(id, filePath, offset, length, hostnames, null, null);
+	}
+
+	/**
+	 * Constructs a split with host information.
+	 *
+	 * @param filePath The path to the file.
+	 * @param offset The start (inclusive) of the split's rage in the file.
+	 * @param length The number of bytes in the split (starting from the offset)
+	 * @param hostnames The hostnames of the nodes storing the split's file range.
+	 */
+	public FileSourceSplit(
+			String id,
+			Path filePath,
+			long offset,
+			long length,
+			String[] hostnames,
+			@Nullable CheckpointedPosition readerPosition) {
+		this(id, filePath, offset, length, hostnames, readerPosition, null);
+	}
+
+	/**
+	 * Package private constructor, used by the serializers to directly cache the serialized form.
+	 */
+	FileSourceSplit(
+			String id,
+			Path filePath,
+			long offset,
+			long length,
+			String[] hostnames,
+			@Nullable CheckpointedPosition readerPosition,
+			@Nullable byte[] serializedForm) {
+
+		checkArgument(offset >= 0, "offset must be >= 0");
+		checkArgument(length >= 0, "length must be >= 0");
+		checkNoNullHosts(hostnames);
+
+		this.id = checkNotNull(id);
+		this.filePath = checkNotNull(filePath);
+		this.offset = offset;
+		this.length = length;
+		this.hostnames = hostnames;
+		this.readerPosition = readerPosition;
+		this.serializedFormCache = serializedForm;
+	}
+
+	// ------------------------------------------------------------------------
+	//  split properties
+	// ------------------------------------------------------------------------
+
+	@Override
+	public String splitId() {
+		return id;
+	}
+
+	/**
+	 * Gets the file's path.
+	 */
+	public Path path() {
+		return filePath;
+	}
+
+	/**
+	 * Returns the start of the file region referenced by this source split.
+	 * The position is inclusive, the value indicates the first byte that is part of the split.
+	 */
+	public long offset() {
+		return offset;
+	}
+
+	/**
+	 * Returns the number of bytes in the file region described by this source split.
+	 */
+	public long length() {
+		return length;
+	}
+
+	/**
+	 * Gets the hostnames of the nodes storing the file range described by this split.
+	 * The returned array is empty, if no host information is available.
+	 *
+	 * <p>Host information is typically only available on specific file systems, like HDFS.
+	 */
+	public String[] hostnames() {
+		return hostnames;
+	}
+
+	/**
+	 * Gets the (checkpointed) position of the reader, if set.
+	 * This value is typically absent for splits when assigned from the enumerator to the readers,
+	 * and present when the splits are recovered from a checkpoint.
+	 */
+	public Optional<CheckpointedPosition> getReaderPosition() {
+		return Optional.ofNullable(readerPosition);
+	}
+
+	// ------------------------------------------------------------------------
+	//  utils
+	// ------------------------------------------------------------------------
+
+	@Override
+	public String toString() {
+		final String hosts = hostnames.length == 0 ? "(no host info)" : " hosts=" + Arrays.toString(hostnames);
+		return String.format("FileSourceSplit: %s [%d, %d) %s ID=%s position=%s",
+				filePath, offset, offset + length, hosts, id, readerPosition);
+	}
+
+	private static void checkNoNullHosts(String[] hosts) {
+		checkNotNull(hosts, "hostnames array must not be null");
+		for (String host : hosts) {
+			checkArgument(host != null, "the hostnames must not contain null entries");
+		}
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSourceSplitSerializer.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSourceSplitSerializer.java
new file mode 100644
index 0000000..1674230
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSourceSplitSerializer.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.file.src.util.CheckpointedPosition;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A serializer for the {@link FileSourceSplit}.
+ */
+@PublicEvolving
+public final class FileSourceSplitSerializer implements SimpleVersionedSerializer<FileSourceSplit> {
+
+	public static final FileSourceSplitSerializer INSTANCE = new FileSourceSplitSerializer();
+
+	private static final ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE =
+			ThreadLocal.withInitial(() -> new DataOutputSerializer(64));
+
+	private static final int VERSION = 1;
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public int getVersion() {
+		return VERSION;
+	}
+
+	@Override
+	public byte[] serialize(FileSourceSplit split) throws IOException {
+		checkArgument(split.getClass() == FileSourceSplit.class, "Cannot serialize subclasses of FileSourceSplit");
+
+		// optimization: the splits lazily cache their own serialized form
+		if (split.serializedFormCache != null) {
+			return split.serializedFormCache;
+		}
+
+		final DataOutputSerializer out = SERIALIZER_CACHE.get();
+
+		out.writeUTF(split.splitId());
+		split.path().write(out);
+		out.writeLong(split.offset());
+		out.writeLong(split.length());
+		writeStringArray(out, split.hostnames());
+
+		final Optional<CheckpointedPosition> readerPosition = split.getReaderPosition();
+		out.writeBoolean(readerPosition.isPresent());
+		if (readerPosition.isPresent()) {
+			out.writeLong(readerPosition.get().getOffset());
+			out.writeLong(readerPosition.get().getRecordsAfterOffset());
+		}
+
+		final byte[] result = out.getCopyOfBuffer();
+		out.clear();
+
+		// optimization: cache the serialized from, so we avoid the byte work during repeated serialization
+		split.serializedFormCache = result;
+
+		return result;
+	}
+
+	@Override
+	public FileSourceSplit deserialize(int version, byte[] serialized) throws IOException {
+		if (version == 1) {
+			return deserializeV1(serialized);
+		}
+		throw new IOException("Unknown version: " + version);
+	}
+
+	private static FileSourceSplit deserializeV1(byte[] serialized) throws IOException {
+		final DataInputDeserializer in = new DataInputDeserializer(serialized);
+
+		final String id = in.readUTF();
+		final Path path = new Path();
+		path.read(in);
+		final long offset = in.readLong();
+		final long len = in.readLong();
+		final String[] hosts = readStringArray(in);
+
+		final CheckpointedPosition readerPosition = in.readBoolean()
+				? new CheckpointedPosition(in.readLong(), in.readLong()) : null;
+
+		// instantiate a new split and cache the serialized form
+		return new FileSourceSplit(id, path, offset, len, hosts, readerPosition, serialized);
+	}
+
+	private static void writeStringArray(DataOutputView out, String[] strings) throws IOException {
+		out.writeInt(strings.length);
+		for (String string : strings) {
+			out.writeUTF(string);
+		}
+	}
+
+	private static String[] readStringArray(DataInputView in) throws IOException {
+		final int len = in.readInt();
+		final String[] strings = new String[len];
+		for (int i = 0; i < len; i++) {
+			strings[i] = in.readUTF();
+		}
+		return strings;
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSourceSplitState.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSourceSplitState.java
new file mode 100644
index 0000000..e1f6b68
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSourceSplitState.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.file.src.util.CheckpointedPosition;
+
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * State of the reader, essentially a mutable version of the {@link FileSourceSplit}.
+ * Has a modifiable offset and records-to-skip-count.
+ *
+ * <p>The {@link FileSourceSplit} assigned to the reader or stored in the checkpoint points to the
+ * position from where to start reading (after recovery), so the current offset and records-to-skip
+ * need to always point to the record after the last emitted record.
+ */
+@PublicEvolving
+public final class FileSourceSplitState {
+
+	private final FileSourceSplit split;
+
+	private long offset;
+
+	private long recordsToSkipAfterOffset;
+
+	public FileSourceSplitState(FileSourceSplit split) {
+		this.split = checkNotNull(split);
+
+		final Optional<CheckpointedPosition> readerPosition = split.getReaderPosition();
+		if (readerPosition.isPresent()) {
+			this.offset = readerPosition.get().getOffset();
+			this.recordsToSkipAfterOffset = readerPosition.get().getRecordsAfterOffset();
+		} else {
+			this.offset = CheckpointedPosition.NO_OFFSET;
+			this.recordsToSkipAfterOffset = 0L;
+		}
+	}
+
+	public long getOffset() {
+		return offset;
+	}
+
+	public long getRecordsToSkipAfterOffset() {
+		return recordsToSkipAfterOffset;
+	}
+
+	public void setOffset(long offset) {
+		// we skip sanity / boundary checks here for efficiency.
+		// illegal boundaries will eventually be caught when constructing the split on checkpoint.
+		this.offset = offset;
+	}
+
+	public void setRecordsToSkipAfterOffset(long recordsToSkipAfterOffset) {
+		// we skip sanity / boundary checks here for efficiency.
+		// illegal boundaries will eventually be caught when constructing the split on checkpoint.
+		this.recordsToSkipAfterOffset = recordsToSkipAfterOffset;
+	}
+
+	public void setPosition(long offset, long recordsToSkipAfterOffset) {
+		// we skip sanity / boundary checks here for efficiency.
+		// illegal boundaries will eventually be caught when constructing the split on checkpoint.
+		this.offset = offset;
+		this.recordsToSkipAfterOffset = recordsToSkipAfterOffset;
+	}
+
+	public void setPosition(CheckpointedPosition position) {
+		this.offset = position.getOffset();
+		this.recordsToSkipAfterOffset = position.getRecordsAfterOffset();
+	}
+
+	/**
+	 * Use the current row count as the starting row count to create a new FileSourceSplit.
+	 */
+	public FileSourceSplit toFileSourceSplit() {
+		final CheckpointedPosition position =
+				(offset == CheckpointedPosition.NO_OFFSET && recordsToSkipAfterOffset == 0) ?
+						null : new CheckpointedPosition(offset, recordsToSkipAfterOffset);
+
+		return new FileSourceSplit(
+				split.splitId(),
+				split.path(),
+				split.offset(),
+				split.length(),
+				split.hostnames(),
+				position);
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/PendingSplitsCheckpoint.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/PendingSplitsCheckpoint.java
new file mode 100644
index 0000000..1515690
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/PendingSplitsCheckpoint.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.fs.Path;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A checkpoint of the current state of the containing the currently pending splits that are not yet assigned.
+ */
+@PublicEvolving
+public final class PendingSplitsCheckpoint {
+
+	/** The splits in the checkpoint. */
+	private final Collection<FileSourceSplit> splits;
+
+	/** The paths that are no longer in the enumerator checkpoint, but have been processed
+	 * before and should this be ignored. Relevant only for sources in continuous monitoring mode. */
+	private final Collection<Path> alreadyProcessedPaths;
+
+	/** The cached byte representation from the last serialization step. This helps to avoid
+	 * paying repeated serialization cost for the same checkpoint object. This field is used
+	 * by {@link PendingSplitsCheckpointSerializer}. */
+	@Nullable
+	byte[] serializedFormCache;
+
+	private PendingSplitsCheckpoint(Collection<FileSourceSplit> splits, Collection<Path> alreadyProcessedPaths) {
+		this.splits = Collections.unmodifiableCollection(splits);
+		this.alreadyProcessedPaths = Collections.unmodifiableCollection(alreadyProcessedPaths);
+	}
+
+	// ------------------------------------------------------------------------
+
+	public Collection<FileSourceSplit> getSplits() {
+		return splits;
+	}
+
+	public Collection<Path> getAlreadyProcessedPaths() {
+		return alreadyProcessedPaths;
+	}
+
+	// ------------------------------------------------------------------------
+	//  factories
+	// ------------------------------------------------------------------------
+
+	public static PendingSplitsCheckpoint fromCollectionSnapshot(Collection<FileSourceSplit> splits) {
+		checkNotNull(splits);
+
+		// create a copy of the collection to make sure this checkpoint is immutable
+		final Collection<FileSourceSplit> copy = new ArrayList<>(splits);
+		return new PendingSplitsCheckpoint(copy, Collections.emptySet());
+	}
+
+	public static PendingSplitsCheckpoint fromCollectionSnapshot(
+			Collection<FileSourceSplit> splits,
+			Collection<Path> alreadyProcessedPaths) {
+		checkNotNull(splits);
+
+		// create a copy of the collection to make sure this checkpoint is immutable
+		final Collection<FileSourceSplit> splitsCopy = new ArrayList<>(splits);
+		final Collection<Path> pathsCopy = new ArrayList<>(alreadyProcessedPaths);
+
+		return new PendingSplitsCheckpoint(splitsCopy, pathsCopy);
+	}
+
+	static PendingSplitsCheckpoint reusingCollection(Collection<FileSourceSplit> splits, Collection<Path> alreadyProcessedPaths) {
+		return new PendingSplitsCheckpoint(splits, alreadyProcessedPaths);
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/PendingSplitsCheckpointSerializer.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/PendingSplitsCheckpointSerializer.java
new file mode 100644
index 0000000..e87ca7d
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/PendingSplitsCheckpointSerializer.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A serializer for the {@link PendingSplitsCheckpoint}.
+ */
+@PublicEvolving
+public final class PendingSplitsCheckpointSerializer implements SimpleVersionedSerializer<PendingSplitsCheckpoint> {
+
+	public static final PendingSplitsCheckpointSerializer INSTANCE = new PendingSplitsCheckpointSerializer();
+
+	private static final int VERSION = 1;
+
+	private static final int VERSION_1_MAGIC_NUMBER = 0xDEADBEEF;
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public int getVersion() {
+		return VERSION;
+	}
+
+	@Override
+	public byte[] serialize(PendingSplitsCheckpoint checkpoint) throws IOException {
+		checkArgument(checkpoint.getClass() == PendingSplitsCheckpoint.class,
+				"Cannot serialize subclasses of PendingSplitsCheckpoint");
+
+		// optimization: the splits lazily cache their own serialized form
+		if (checkpoint.serializedFormCache != null) {
+			return checkpoint.serializedFormCache;
+		}
+
+		final FileSourceSplitSerializer serializer = FileSourceSplitSerializer.INSTANCE;
+		final Collection<FileSourceSplit> splits = checkpoint.getSplits();
+		final Collection<Path> processedPaths = checkpoint.getAlreadyProcessedPaths();
+
+		final ArrayList<byte[]> serializedSplits = new ArrayList<>(splits.size());
+		final ArrayList<byte[]> serializedPaths = new ArrayList<>(processedPaths.size());
+
+		int totalLen = 16; // four ints: magic, version, count splits, count paths
+
+		for (FileSourceSplit split : splits) {
+			final byte[] serSplit = serializer.serialize(split);
+			serializedSplits.add(serSplit);
+			totalLen += serSplit.length + 4; // 4 bytes for the length field
+		}
+
+		for (Path path : processedPaths) {
+			final byte[] serPath = path.toString().getBytes(StandardCharsets.UTF_8);
+			serializedPaths.add(serPath);
+			totalLen += serPath.length + 4; // 4 bytes for the length field
+		}
+
+		final byte[] result = new byte[totalLen];
+		final ByteBuffer byteBuffer = ByteBuffer.wrap(result).order(ByteOrder.LITTLE_ENDIAN);
+		byteBuffer.putInt(VERSION_1_MAGIC_NUMBER);
+		byteBuffer.putInt(serializer.getVersion());
+		byteBuffer.putInt(serializedSplits.size());
+		byteBuffer.putInt(serializedPaths.size());
+
+		for (byte[] splitBytes : serializedSplits) {
+			byteBuffer.putInt(splitBytes.length);
+			byteBuffer.put(splitBytes);
+		}
+
+		for (byte[] pathBytes : serializedPaths) {
+			byteBuffer.putInt(pathBytes.length);
+			byteBuffer.put(pathBytes);
+		}
+
+		assert byteBuffer.remaining() == 0;
+
+		// optimization: cache the serialized from, so we avoid the byte work during repeated serialization
+		checkpoint.serializedFormCache = result;
+
+		return result;
+	}
+
+	@Override
+	public PendingSplitsCheckpoint deserialize(int version, byte[] serialized) throws IOException {
+		if (version == 1) {
+			return deserializeV1(serialized);
+		}
+		throw new IOException("Unknown version: " + version);
+	}
+
+	private static PendingSplitsCheckpoint deserializeV1(byte[] serialized) throws IOException {
+		final ByteBuffer bb = ByteBuffer.wrap(serialized).order(ByteOrder.LITTLE_ENDIAN);
+
+		final int magic = bb.getInt();
+		if (magic != VERSION_1_MAGIC_NUMBER) {
+			throw new IOException(String.format("Invalid magic number for PendingSplitsCheckpoint. " +
+				"Expected: %X , found %X", VERSION_1_MAGIC_NUMBER, magic));
+		}
+
+		final int version = bb.getInt();
+		final int numSplits = bb.getInt();
+		final int numPaths = bb.getInt();
+
+		final FileSourceSplitSerializer serializer = FileSourceSplitSerializer.INSTANCE;
+		final ArrayList<FileSourceSplit> splits = new ArrayList<>(numSplits);
+		final ArrayList<Path> paths = new ArrayList<>(numPaths);
+
+		for (int remaining = numSplits; remaining > 0; remaining--) {
+			final byte[] bytes = new byte[bb.getInt()];
+			bb.get(bytes);
+			final FileSourceSplit split = serializer.deserialize(version, bytes);
+			splits.add(split);
+		}
+
+		for (int remaining = numPaths; remaining > 0; remaining--) {
+			final byte[] bytes = new byte[bb.getInt()];
+			bb.get(bytes);
+			final Path path = new Path(new String(bytes, StandardCharsets.UTF_8));
+			paths.add(path);
+		}
+
+		return PendingSplitsCheckpoint.reusingCollection(splits, paths);
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/assigners/FileSplitAssigner.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/assigners/FileSplitAssigner.java
new file mode 100644
index 0000000..9182b61
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/assigners/FileSplitAssigner.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.assigners;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Optional;
+
+/**
+ * The {@code FileSplitAssigner} is responsible for deciding what split should be processed next by
+ * which node. It determines split processing order and locality.
+ */
+@PublicEvolving
+public interface FileSplitAssigner {
+
+	/**
+	 * Gets the next split.
+	 *
+	 * <p>When this method returns an empty {@code Optional}, then the set of splits is
+	 * assumed to be done and the source will finish once the readers finished their current
+	 * splits.
+	 */
+	Optional<FileSourceSplit> getNext(@Nullable String hostname);
+
+	/**
+	 * Adds a set of splits to this assigner. This happens for example when some split processing
+	 * failed and the splits need to be re-added, or when new splits got discovered.
+	 */
+	void addSplits(Collection<FileSourceSplit> splits);
+
+	/**
+	 * Gets the remaining splits that this assigner has pending.
+	 */
+	Collection<FileSourceSplit> remainingSplits();
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Factory for the {@code FileSplitAssigner}, to allow the {@code FileSplitAssigner} to be eagerly
+	 * initialized and to not be serializable.
+	 */
+	@FunctionalInterface
+	interface Provider extends Serializable {
+
+		/**
+		 * Creates a new {@code FileSplitAssigner} that starts with the given set of initial splits.
+		 */
+		FileSplitAssigner create(Collection<FileSourceSplit> initialSplits);
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/assigners/SimpleSplitAssigner.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/assigners/SimpleSplitAssigner.java
new file mode 100644
index 0000000..ed52f97
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/assigners/SimpleSplitAssigner.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.assigners;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Optional;
+
+/**
+ * The {@code SimpleSplitAssigner} hands out splits in a random order, without any consideration
+ * for order or locality.
+ */
+@PublicEvolving
+public class SimpleSplitAssigner implements FileSplitAssigner {
+
+	private final ArrayList<FileSourceSplit> splits;
+
+	public SimpleSplitAssigner(Collection<FileSourceSplit> splits) {
+		this.splits = new ArrayList<>(splits);
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public Optional<FileSourceSplit> getNext(String hostname) {
+		final int size = splits.size();
+		return size == 0 ? Optional.empty() : Optional.of(splits.remove(size - 1));
+	}
+
+	@Override
+	public void addSplits(Collection<FileSourceSplit> newSplits) {
+		splits.addAll(newSplits);
+	}
+
+	@Override
+	public Collection<FileSourceSplit> remainingSplits() {
+		return splits;
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public String toString() {
+		return "SimpleSplitAssigner " + splits;
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/compression/StandardDeCompressors.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/compression/StandardDeCompressors.java
new file mode 100644
index 0000000..14a732e
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/compression/StandardDeCompressors.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.compression;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.io.compression.Bzip2InputStreamFactory;
+import org.apache.flink.api.common.io.compression.DeflateInflaterInputStreamFactory;
+import org.apache.flink.api.common.io.compression.GzipInflaterInputStreamFactory;
+import org.apache.flink.api.common.io.compression.InflaterInputStreamFactory;
+import org.apache.flink.api.common.io.compression.XZInputStreamFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * A collection of common compression formats and de-compressors.
+ */
+@PublicEvolving
+public final class StandardDeCompressors {
+
+	/** All supported file compression formats, by common file extensions. */
+	private static final Map<String, InflaterInputStreamFactory<?>> DECOMPRESSORS =
+			buildDecompressorMap(
+					DeflateInflaterInputStreamFactory.getInstance(),
+					GzipInflaterInputStreamFactory.getInstance(),
+					Bzip2InputStreamFactory.getInstance(),
+					XZInputStreamFactory.getInstance());
+
+	/** All common file extensions of supported file compression formats. */
+	private static final Collection<String> COMMON_SUFFIXES =
+			Collections.unmodifiableList(new ArrayList<>(DECOMPRESSORS.keySet()));
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Gets all common file extensions of supported file compression formats.
+	 */
+	public static Collection<String> getCommonSuffixes() {
+		return COMMON_SUFFIXES;
+	}
+
+	/**
+	 * Gets the decompressor for a file extension. Returns null if there is no decompressor for this
+	 * file extension.
+	 */
+	@Nullable
+	public static InflaterInputStreamFactory<?> getDecompressorForExtension(String extension) {
+		return DECOMPRESSORS.get(extension);
+	}
+
+	/**
+	 * Gets the decompressor for a file name. This checks the file against all known and supported
+	 * file extensions.
+	 * Returns null if there is no decompressor for this file name.
+	 */
+	@Nullable
+	public static InflaterInputStreamFactory<?> getDecompressorForFileName(String fileName) {
+		for (final Map.Entry<String, InflaterInputStreamFactory<?>> entry : DECOMPRESSORS.entrySet()) {
+			if (fileName.endsWith(entry.getKey())) {
+				return entry.getValue();
+			}
+		}
+		return null;
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static Map<String, InflaterInputStreamFactory<?>> buildDecompressorMap(
+			final InflaterInputStreamFactory<?>... decompressors) {
+
+		final LinkedHashMap<String, InflaterInputStreamFactory<?>> map = new LinkedHashMap<>(decompressors.length);
+		for (InflaterInputStreamFactory<?> decompressor : decompressors) {
+			for (String suffix : decompressor.getCommonFileExtensions()) {
+				map.put(suffix, decompressor);
+			}
+		}
+		return map;
+	}
+
+	// ------------------------------------------------------------------------
+
+	/** This class has purely static utility methods and is not meant to be instantiated. */
+	private StandardDeCompressors() {}
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/BlockSplittingRecursiveEnumerator.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/BlockSplittingRecursiveEnumerator.java
new file mode 100644
index 0000000..f475a68
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/BlockSplittingRecursiveEnumerator.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.enumerate;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.compression.StandardDeCompressors;
+import org.apache.flink.core.fs.BlockLocation;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.function.Predicate;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This {@code FileEnumerator} enumerates all files under the given paths recursively,
+ * and creates a separate split for each file block.
+ *
+ * <p>Please note that file blocks are only exposed by some file systems, such as HDFS. File systems
+ * that do not expose block information will not create multiple file splits per file, but keep
+ * the files as one source split.
+ *
+ * <p>Files with suffixes corresponding to known compression formats (for example '.gzip', '.bz2', ...)
+ * will not be split. See {@link StandardDeCompressors} for a list of known formats and suffixes.
+ *
+ * <p>The default instantiation of this enumerator filters files with the common hidden file prefixes
+ * '.' and '_'. A custom file filter can be specified.
+ */
+@PublicEvolving
+public class BlockSplittingRecursiveEnumerator extends NonSplittingRecursiveEnumerator {
+
+	private static final Logger LOG = LoggerFactory.getLogger(BlockSplittingRecursiveEnumerator.class);
+
+	private final String[] nonSplittableFileSuffixes;
+
+	/**
+	 * Creates a new enumerator that enumerates all files except hidden files.
+	 * Hidden files are considered files where the filename starts with '.' or with '_'.
+	 *
+	 * <p>The enumerator does not split files that have a suffix corresponding to a known
+	 * compression format (for example '.gzip', '.bz2', '.xy', '.zip', ...).
+	 * See {@link StandardDeCompressors} for details.
+	 */
+	public BlockSplittingRecursiveEnumerator() {
+		this(new DefaultFileFilter(), StandardDeCompressors.getCommonSuffixes().toArray(new String[0]));
+	}
+
+	/**
+	 * Creates a new enumerator that uses the given predicate as a filter
+	 * for file paths, and avoids splitting files with the given extension (typically
+	 * to avoid splitting compressed files).
+	 */
+	public BlockSplittingRecursiveEnumerator(
+			final Predicate<Path> fileFilter,
+			final String[] nonSplittableFileSuffixes) {
+		super(fileFilter);
+		this.nonSplittableFileSuffixes = checkNotNull(nonSplittableFileSuffixes);
+	}
+
+	protected void convertToSourceSplits(
+			final FileStatus file,
+			final FileSystem fs,
+			final List<FileSourceSplit> target) throws IOException {
+
+		if (!isFileSplittable(file.getPath())) {
+			super.convertToSourceSplits(file, fs, target);
+			return;
+		}
+
+		final BlockLocation[] blocks = getBlockLocationsForFile(file, fs);
+		if (blocks == null) {
+			target.add(new FileSourceSplit(getNextId(), file.getPath(), 0L, file.getLen()));
+		} else {
+			for (BlockLocation block : blocks) {
+				target.add(new FileSourceSplit(
+						getNextId(),
+						file.getPath(),
+						block.getOffset(),
+						block.getLength(),
+						block.getHosts()));
+			}
+		}
+	}
+
+	protected boolean isFileSplittable(Path filePath) {
+		if (nonSplittableFileSuffixes.length == 0) {
+			return true;
+		}
+
+		final String path = filePath.getPath();
+		for (String suffix : nonSplittableFileSuffixes) {
+			if (path.endsWith(suffix)) {
+				return false;
+			}
+		}
+		return true;
+	}
+
+	@Nullable
+	private static BlockLocation[] getBlockLocationsForFile(FileStatus file, FileSystem fs) throws IOException {
+		final long len = file.getLen();
+
+		final BlockLocation[] blocks = fs.getFileBlockLocations(file, 0, len);
+		if (blocks == null || blocks.length == 0) {
+			return null;
+		}
+
+		// a cheap check whether we have all blocks. we don't check whether the blocks fully cover the
+		// file (too expensive) but make some sanity checks to catch early the common cases where incorrect
+		// bloc info is returned by the implementation.
+
+		long totalLen = 0L;
+		for (BlockLocation block : blocks) {
+			totalLen += block.getLength();
+		}
+		if (totalLen != len) {
+			LOG.warn("Block lengths do not match file length for {}. File length is {}, blocks are {}",
+					file.getPath(), len, Arrays.toString(blocks));
+			return null;
+		}
+
+		return blocks;
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/DefaultFileFilter.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/DefaultFileFilter.java
new file mode 100644
index 0000000..7ac7d46
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/DefaultFileFilter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.enumerate;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.fs.Path;
+
+import java.util.function.Predicate;
+
+/**
+ * A file filter that filters out hidden files based on common naming patterns,
+ * i.e., files where the filename starts with '.' or with '_'.
+ */
+@PublicEvolving
+public final class DefaultFileFilter implements Predicate<Path> {
+
+	@Override
+	public boolean test(Path path) {
+		final String fileName = path.getName();
+		if (fileName == null || fileName.length() == 0) {
+			return true;
+		}
+		final char first = fileName.charAt(0);
+		return first != '.' && first != '_';
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/FileEnumerator.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/FileEnumerator.java
new file mode 100644
index 0000000..67c2790
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/FileEnumerator.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.enumerate;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.core.fs.Path;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collection;
+
+/**
+ * The {@code FileEnumerator}'s task is to discover all files to be read and to split them into
+ * a set of {@link FileSourceSplit}.
+ *
+ * <p>This includes possibly, path traversals, file filtering (by name or other patterns) and
+ * deciding whether to split files into multiple splits, and how to split them.
+ */
+@PublicEvolving
+public interface FileEnumerator {
+
+	/**
+	 * Generates all file splits for the relevant files under the given paths.
+	 * The {@code minDesiredSplits} is an optional hint indicating how many splits would be necessary
+	 * to exploit parallelism properly.
+	 */
+	Collection<FileSourceSplit> enumerateSplits(Path[] paths, int minDesiredSplits) throws IOException;
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Factory for the {@code FileEnumerator}, to allow the {@code FileEnumerator} to be eagerly
+	 * initialized and to not be serializable.
+	 */
+	@FunctionalInterface
+	interface Provider extends Serializable {
+
+		FileEnumerator create();
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/NonSplittingRecursiveEnumerator.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/NonSplittingRecursiveEnumerator.java
new file mode 100644
index 0000000..a9a06f3
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/NonSplittingRecursiveEnumerator.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.enumerate;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.core.fs.BlockLocation;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.StringUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.function.Predicate;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This {@code FileEnumerator} enumerates all files under the given paths recursively.
+ * Each file becomes one split; this enumerator does not split files into smaller "block" units.
+ *
+ * <p>The default instantiation of this enumerator filters files with the common hidden file prefixes
+ * '.' and '_'. A custom file filter can be specified.
+ */
+@PublicEvolving
+public class NonSplittingRecursiveEnumerator implements FileEnumerator {
+
+	/** The filter predicate to filter out unwanted files. */
+	private final Predicate<Path> fileFilter;
+
+	/** The current Id as a mutable string representation. This covers more values than the
+	 * integer value range, so we should never overflow. */
+	private final char[] currentId = "0000000000".toCharArray();
+
+	/**
+	 * Creates a NonSplittingRecursiveEnumerator that enumerates all files except hidden files.
+	 * Hidden files are considered files where the filename starts with '.' or with '_'.
+	 */
+	public NonSplittingRecursiveEnumerator() {
+		this(new DefaultFileFilter());
+	}
+
+	/**
+	 * Creates a NonSplittingRecursiveEnumerator that uses the given predicate as a filter
+	 * for file paths.
+	 */
+	public NonSplittingRecursiveEnumerator(Predicate<Path> fileFilter) {
+		this.fileFilter = checkNotNull(fileFilter);
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public Collection<FileSourceSplit> enumerateSplits(Path[] paths, int minDesiredSplits) throws IOException {
+		final ArrayList<FileSourceSplit> splits = new ArrayList<>();
+
+		for (Path path : paths) {
+			final FileSystem fs = path.getFileSystem();
+			final FileStatus status = fs.getFileStatus(path);
+			addSplitsForPath(status, fs, splits);
+		}
+
+		return splits;
+	}
+
+	private void addSplitsForPath(FileStatus fileStatus, FileSystem fs, ArrayList<FileSourceSplit> target) throws IOException {
+		if (!fileFilter.test(fileStatus.getPath())) {
+			return;
+		}
+
+		if (!fileStatus.isDir()) {
+			convertToSourceSplits(fileStatus, fs, target);
+			return;
+		}
+
+		final FileStatus[] containedFiles = fs.listStatus(fileStatus.getPath());
+		for (FileStatus containedStatus : containedFiles) {
+			addSplitsForPath(containedStatus, fs, target);
+		}
+	}
+
+	protected void convertToSourceSplits(
+			final FileStatus file,
+			final FileSystem fs,
+			final List<FileSourceSplit> target) throws IOException {
+
+		final String[] hosts = getHostsFromBlockLocations(fs.getFileBlockLocations(file, 0L, file.getLen()));
+		target.add(new FileSourceSplit(getNextId(), file.getPath(), 0, file.getLen(), hosts));
+	}
+
+	protected final String getNextId() {
+		// because we just increment numbers, we increment the char representation directly,
+		// rather than incrementing an integer and converting it to a string representation
+		// every time again (requires quite some expensive conversion logic).
+		incrementCharArrayByOne(currentId, currentId.length - 1);
+		return new String(currentId);
+	}
+
+	private static String[] getHostsFromBlockLocations(BlockLocation[] blockLocations) throws IOException {
+		if (blockLocations.length == 0) {
+			return StringUtils.EMPTY_STRING_ARRAY;
+		}
+		if (blockLocations.length == 1) {
+			return blockLocations[0].getHosts();
+		}
+		final LinkedHashSet<String> allHosts = new LinkedHashSet<>();
+		for (BlockLocation block : blockLocations) {
+			allHosts.addAll(Arrays.asList(block.getHosts()));
+		}
+		return allHosts.toArray(new String[allHosts.size()]);
+	}
+
+	private static void incrementCharArrayByOne(char[] array, int pos) {
+		char c = array[pos];
+		c++;
+
+		if (c > '9') {
+			c = '0';
+			incrementCharArrayByOne(array, pos - 1);
+		}
+		array[pos] = c;
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/ContinuousFileSplitEnumerator.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/ContinuousFileSplitEnumerator.java
new file mode 100644
index 0000000..72fa0ad
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/ContinuousFileSplitEnumerator.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.impl;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.connector.base.source.event.RequestSplitEvent;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.PendingSplitsCheckpoint;
+import org.apache.flink.connector.file.src.assigners.FileSplitAssigner;
+import org.apache.flink.connector.file.src.enumerate.FileEnumerator;
+import org.apache.flink.core.fs.Path;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A continuously monitoring enumerator.
+ */
+@Internal
+public class ContinuousFileSplitEnumerator implements SplitEnumerator<FileSourceSplit, PendingSplitsCheckpoint> {
+
+	private static final Logger LOG = LoggerFactory.getLogger(ContinuousFileSplitEnumerator.class);
+
+	private final SplitEnumeratorContext<FileSourceSplit> context;
+
+	private final FileSplitAssigner splitAssigner;
+
+	private final FileEnumerator enumerator;
+
+	private final HashSet<Path> pathsAlreadyProcessed;
+
+	private final LinkedHashMap<Integer, String> readersAwaitingSplit;
+
+	private final Path[] paths;
+
+	private final long discoveryInterval;
+
+	// ------------------------------------------------------------------------
+
+	public ContinuousFileSplitEnumerator(
+			SplitEnumeratorContext<FileSourceSplit> context,
+			FileEnumerator enumerator,
+			FileSplitAssigner splitAssigner,
+			Path[] paths,
+			Collection<Path> alreadyDiscoveredPaths,
+			long discoveryInterval) {
+
+		checkArgument(discoveryInterval > 0L);
+		this.context = checkNotNull(context);
+		this.enumerator = checkNotNull(enumerator);
+		this.splitAssigner = checkNotNull(splitAssigner);
+		this.paths = paths;
+		this.discoveryInterval = discoveryInterval;
+		this.pathsAlreadyProcessed = new HashSet<>(alreadyDiscoveredPaths);
+		this.readersAwaitingSplit = new LinkedHashMap<>();
+	}
+
+	@Override
+	public void start() {
+		context.callAsync(
+			() -> enumerator.enumerateSplits(paths, 1),
+			this::processDiscoveredSplits,
+			discoveryInterval, discoveryInterval);
+	}
+
+	@Override
+	public void close() throws IOException {
+		// no resources to close
+	}
+
+	@Override
+	public void addReader(int subtaskId) {
+		// this source is purely lazy-pull-based, nothing to do upon registration
+	}
+
+	@Override
+	public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
+		if (sourceEvent instanceof RequestSplitEvent) {
+			readersAwaitingSplit.put(subtaskId, ((RequestSplitEvent) sourceEvent).hostName());
+			assignSplits();
+		}
+		else {
+			LOG.error("Received unrecognized event: {}", sourceEvent);
+		}
+	}
+
+	@Override
+	public void addSplitsBack(List<FileSourceSplit> splits, int subtaskId) {
+		LOG.debug("File Source Enumerator adds splits back: {}", splits);
+		splitAssigner.addSplits(splits);
+	}
+
+	@Override
+	public PendingSplitsCheckpoint snapshotState() throws Exception {
+		return PendingSplitsCheckpoint.fromCollectionSnapshot(splitAssigner.remainingSplits(), pathsAlreadyProcessed);
+	}
+
+	// ------------------------------------------------------------------------
+
+	private void processDiscoveredSplits(Collection<FileSourceSplit> splits, Throwable error) {
+		if (error != null) {
+			LOG.error("Failed to enumerate files", error);
+			return;
+		}
+
+		final Collection<FileSourceSplit> newSplits = splits.stream()
+				.filter((split) -> pathsAlreadyProcessed.add(split.path()))
+				.collect(Collectors.toList());
+		splitAssigner.addSplits(newSplits);
+
+		assignSplits();
+	}
+
+	private void assignSplits() {
+		final Iterator<Map.Entry<Integer, String>> awaitingReader = readersAwaitingSplit.entrySet().iterator();
+
+		while (awaitingReader.hasNext()) {
+			final Map.Entry<Integer, String> nextAwaiting = awaitingReader.next();
+			final String hostname = nextAwaiting.getValue();
+			final int awaitingSubtask = nextAwaiting.getKey();
+			final Optional<FileSourceSplit> nextSplit = splitAssigner.getNext(hostname);
+			if (nextSplit.isPresent()) {
+				context.assignSplit(nextSplit.get(), awaitingSubtask);
+				awaitingReader.remove();
+			} else {
+				break;
+			}
+		}
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/FileRecordFormatAdapter.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/FileRecordFormatAdapter.java
new file mode 100644
index 0000000..e084926
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/FileRecordFormatAdapter.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.impl;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connector.file.src.reader.FileRecordFormat;
+import org.apache.flink.connector.file.src.util.CheckpointedPosition;
+import org.apache.flink.connector.file.src.util.IteratorResultIterator;
+import org.apache.flink.core.fs.Path;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import static org.apache.flink.connector.file.src.util.Utils.doWithCleanupOnException;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The FormatReaderAdapter turns a {@link FileRecordFormat} into a {@link BulkFormat}.
+ */
+@Internal
+public final class FileRecordFormatAdapter<T> implements BulkFormat<T> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final FileRecordFormat<T> fileFormat;
+
+	public FileRecordFormatAdapter(FileRecordFormat<T> fileFormat) {
+		this.fileFormat = checkNotNull(fileFormat);
+	}
+
+	@Override
+	public BulkFormat.Reader<T> createReader(
+			final Configuration config,
+			final Path filePath,
+			final long splitOffset,
+			final long splitLength) throws IOException {
+
+		final FileRecordFormat.Reader<T> reader = fileFormat.createReader(config, filePath, splitOffset, splitLength);
+		return doWithCleanupOnException(reader, () -> {
+			//noinspection CodeBlock2Expr
+			return wrapReader(reader, config, CheckpointedPosition.NO_OFFSET, 0L);
+		});
+	}
+
+	@Override
+	public BulkFormat.Reader<T> restoreReader(
+			final Configuration config,
+			final Path filePath,
+			final long splitOffset,
+			final long splitLength,
+			final CheckpointedPosition checkpointedPosition) throws IOException {
+
+		final FileRecordFormat.Reader<T> reader = checkpointedPosition.getOffset() == CheckpointedPosition.NO_OFFSET
+				? fileFormat.createReader(config, filePath, splitOffset, splitLength)
+				: fileFormat.restoreReader(config, filePath, checkpointedPosition.getOffset(), splitOffset, splitLength);
+
+		return doWithCleanupOnException(reader, () -> {
+			long remaining = checkpointedPosition.getRecordsAfterOffset();
+			while (remaining > 0 && reader.read() != null) {
+				remaining--;
+			}
+
+			return wrapReader(reader, config, checkpointedPosition.getOffset(), checkpointedPosition.getRecordsAfterOffset());
+		});
+	}
+
+	@Override
+	public boolean isSplittable() {
+		return fileFormat.isSplittable();
+	}
+
+	@Override
+	public TypeInformation<T> getProducedType() {
+		return fileFormat.getProducedType();
+	}
+
+	private static <T> Reader<T> wrapReader(
+			final FileRecordFormat.Reader<T> reader,
+			final Configuration config,
+			final long startingOffset,
+			final long startingSkipCount) {
+
+		final int numRecordsPerBatch = config.get(FileRecordFormat.RECORDS_PER_FETCH);
+		return new Reader<>(reader, numRecordsPerBatch, startingOffset, startingSkipCount);
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * The reader adapter, from {@link FileRecordFormat.Reader} to {@link BulkFormat.Reader}.
+	 */
+	public static final class Reader<T> implements BulkFormat.Reader<T> {
+
+		private final FileRecordFormat.Reader<T> reader;
+		private final int numRecordsPerBatch;
+
+		private long lastOffset;
+		private long lastRecordsAfterOffset;
+
+		Reader(
+				final FileRecordFormat.Reader<T> reader,
+				final int numRecordsPerBatch,
+				final long initialOffset,
+				final long initialSkipCount) {
+			checkArgument(numRecordsPerBatch > 0, "numRecordsPerBatch must be > 0");
+			this.reader = checkNotNull(reader);
+			this.numRecordsPerBatch = numRecordsPerBatch;
+			this.lastOffset = initialOffset;
+			this.lastRecordsAfterOffset = initialSkipCount;
+		}
+
+		@Nullable
+		@Override
+		public RecordIterator<T> readBatch() throws IOException {
+			updateCheckpointablePosition();
+
+			final ArrayList<T> result = new ArrayList<>(numRecordsPerBatch);
+			T next;
+			int remaining = numRecordsPerBatch;
+			while (remaining-- > 0 && ((next = reader.read()) != null)) {
+				result.add(next);
+			}
+
+			if (result.isEmpty()) {
+				return null;
+			}
+
+			final RecordIterator<T> iter = new IteratorResultIterator<>(result.iterator(), lastOffset, lastRecordsAfterOffset);
+			lastRecordsAfterOffset += result.size();
+			return iter;
+		}
+
+		@Override
+		public void close() throws IOException {
+			reader.close();
+		}
+
+		private void updateCheckpointablePosition() {
+			final CheckpointedPosition position = reader.getCheckpointedPosition();
+			if (position != null) {
+				this.lastOffset = position.getOffset();
+				this.lastRecordsAfterOffset = position.getRecordsAfterOffset();
+			}
+		}
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/FileRecords.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/FileRecords.java
new file mode 100644
index 0000000..b3d2157f
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/FileRecords.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.impl;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.Set;
+
+/**
+ * A collection of records for one file split.
+ *
+ * <p>This is essentially a slim wrapper around the {@link BulkFormat.RecordIterator} that only
+ * adds information about the current split, or finished splits (to keep knowledge about current split
+ * IDs out of the reader formats).
+ */
+@Internal
+public final class FileRecords<T> implements RecordsWithSplitIds<RecordAndPosition<T>> {
+
+	@Nullable
+	private String splitId;
+
+	@Nullable
+	private BulkFormat.RecordIterator<T> recordsForSplitCurrent;
+
+	@Nullable
+	private final BulkFormat.RecordIterator<T> recordsForSplit;
+
+	private final Set<String> finishedSplits;
+
+	private FileRecords(
+			@Nullable String splitId,
+			@Nullable BulkFormat.RecordIterator<T> recordsForSplit,
+			Set<String> finishedSplits) {
+
+		this.splitId = splitId;
+		this.recordsForSplit = recordsForSplit;
+		this.finishedSplits = finishedSplits;
+	}
+
+	@Nullable
+	@Override
+	public String nextSplit() {
+		// move the split one (from current value to null)
+		final String nextSplit = this.splitId;
+		this.splitId = null;
+
+		// move the iterator, from null to value (if first move) or to null (if second move)
+		this.recordsForSplitCurrent = nextSplit != null ? this.recordsForSplit : null;
+
+		return nextSplit;
+	}
+
+	@Nullable
+	@Override
+	public RecordAndPosition<T> nextRecordFromSplit() {
+		final BulkFormat.RecordIterator<T> recordsForSplit = this.recordsForSplitCurrent;
+		if (recordsForSplit != null) {
+			return recordsForSplit.next();
+		} else {
+			throw new IllegalStateException();
+		}
+	}
+
+	@Override
+	public void recycle() {
+		if (recordsForSplit != null) {
+			recordsForSplit.releaseBatch();
+		}
+	}
+
+	@Override
+	public Set<String> finishedSplits() {
+		return finishedSplits;
+	}
+
+	// ------------------------------------------------------------------------
+
+	public static <T> FileRecords<T> forRecords(
+			final String splitId,
+			final BulkFormat.RecordIterator<T> recordsForSplit) {
+		return new FileRecords<>(splitId, recordsForSplit, Collections.emptySet());
+	}
+
+	public static <T> FileRecords<T> finishedSplit(String splitId) {
+		return new FileRecords<>(null, null, Collections.singleton(splitId));
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/FileSourceReader.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/FileSourceReader.java
new file mode 100644
index 0000000..08c965e
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/FileSourceReader.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.impl;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.event.RequestSplitEvent;
+import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.FileSourceSplitState;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+
+import java.util.Collection;
+
+/**
+ * A {@link SourceReader} that read records from {@link FileSourceSplit}.
+ */
+@Internal
+public final class FileSourceReader<T>
+		extends SingleThreadMultiplexSourceReaderBase<RecordAndPosition<T>, T, FileSourceSplit, FileSourceSplitState> {
+
+	public FileSourceReader(SourceReaderContext readerContext, BulkFormat<T> readerFormat, Configuration config) {
+		super(
+			() -> new FileSourceSplitReader<>(config, readerFormat),
+			new FileSourceRecordEmitter<>(),
+			config,
+			readerContext);
+	}
+
+	@Override
+	public void start() {
+		requestSplit();
+	}
+
+	@Override
+	protected void onSplitFinished(Collection<String> finishedSplitIds) {
+		requestSplit();
+	}
+
+	@Override
+	protected FileSourceSplitState initializedState(FileSourceSplit split) {
+		return new FileSourceSplitState(split);
+	}
+
+	@Override
+	protected FileSourceSplit toSplitType(String splitId, FileSourceSplitState splitState) {
+		return splitState.toFileSourceSplit();
+	}
+
+	private void requestSplit() {
+		context.sendSourceEventToCoordinator(new RequestSplitEvent(context.getLocalHostName()));
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/FileSourceRecordEmitter.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/FileSourceRecordEmitter.java
new file mode 100644
index 0000000..5f55159
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/FileSourceRecordEmitter.java
@@ -0,0 +1,47 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.impl;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.connector.base.source.reader.RecordEmitter;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.FileSourceSplitState;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+
+/**
+ * The {@link RecordEmitter} implementation for {@link FileSourceReader}.
+ *
+ * <p>This updates the {@link FileSourceSplit} for every emitted record.
+ * Because the {@link FileSourceSplit} points to the position from where to start reading (after recovery),
+ * the current offset and records-to-skip need to always point to the record after the emitted record.
+ */
+@Internal
+final class FileSourceRecordEmitter<T> implements RecordEmitter<RecordAndPosition<T>, T, FileSourceSplitState> {
+
+	@Override
+	public void emitRecord(
+			final RecordAndPosition<T> element,
+			final SourceOutput<T> output,
+			final FileSourceSplitState splitState) {
+
+		output.collect(element.getRecord());
+		splitState.setPosition(element.getOffset(), element.getRecordSkipCount());
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/FileSourceSplitReader.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/FileSourceSplitReader.java
new file mode 100644
index 0000000..ed9159b
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/FileSourceSplitReader.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.impl;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connector.file.src.util.CheckpointedPosition;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Optional;
+import java.util.Queue;
+
+/**
+ * The {@link SplitReader} implementation for the file source.
+ */
+@Internal
+final class FileSourceSplitReader<T> implements SplitReader<RecordAndPosition<T>, FileSourceSplit> {
+
+	private static final Logger LOG = LoggerFactory.getLogger(FileSourceSplitReader.class);
+
+	private final Configuration config;
+	private final BulkFormat<T> readerFactory;
+
+	private final Queue<FileSourceSplit> splits;
+
+	@Nullable
+	private BulkFormat.Reader<T> currentReader;
+	@Nullable
+	private String currentSplitId;
+
+	public FileSourceSplitReader(Configuration config, BulkFormat<T> readerFactory) {
+		this.config = config;
+		this.readerFactory = readerFactory;
+		this.splits = new ArrayDeque<>();
+	}
+
+	@Override
+	public RecordsWithSplitIds<RecordAndPosition<T>> fetch() throws IOException {
+		checkSplitOrStartNext();
+
+		final BulkFormat.RecordIterator<T> nextBatch = currentReader.readBatch();
+		return nextBatch == null ? finishSplit() : FileRecords.forRecords(currentSplitId, nextBatch);
+	}
+
+	@Override
+	public void handleSplitsChanges(final SplitsChange<FileSourceSplit> splitChange) {
+		if (!(splitChange instanceof SplitsAddition)) {
+			throw new UnsupportedOperationException(String.format(
+					"The SplitChange type of %s is not supported.", splitChange.getClass()));
+		}
+
+		LOG.debug("Handling split change {}", splitChange);
+		splits.addAll(splitChange.splits());
+	}
+
+	@Override
+	public void wakeUp() {}
+
+	private void checkSplitOrStartNext() throws IOException {
+		if (currentReader != null) {
+			return;
+		}
+
+		final FileSourceSplit nextSplit = splits.poll();
+		if (nextSplit == null) {
+			throw new IOException("Cannot fetch from another split - no split remaining");
+		}
+
+		currentSplitId = nextSplit.splitId();
+
+		final Optional<CheckpointedPosition> position = nextSplit.getReaderPosition();
+		currentReader = position.isPresent()
+				? readerFactory.restoreReader(config, nextSplit.path(), nextSplit.offset(), nextSplit.length(), position.get())
+				: readerFactory.createReader(config, nextSplit.path(), nextSplit.offset(), nextSplit.length());
+	}
+
+	private FileRecords<T> finishSplit() throws IOException {
+		if (currentReader != null) {
+			currentReader.close();
+			currentReader = null;
+		}
+
+		final FileRecords<T> finishRecords = FileRecords.finishedSplit(currentSplitId);
+		currentSplitId = null;
+		return finishRecords;
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/StaticFileSplitEnumerator.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/StaticFileSplitEnumerator.java
new file mode 100644
index 0000000..05d7897
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/StaticFileSplitEnumerator.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.impl;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.connector.base.source.event.NoMoreSplitsEvent;
+import org.apache.flink.connector.base.source.event.RequestSplitEvent;
+import org.apache.flink.connector.file.src.FileSource;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.PendingSplitsCheckpoint;
+import org.apache.flink.connector.file.src.assigners.FileSplitAssigner;
+import org.apache.flink.connector.file.src.enumerate.FileEnumerator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A SplitEnumerator implementation for bounded / batch {@link FileSource} input.
+ *
+ * <p>This enumerator takes all files that are present in the configured input directories and assigns
+ * them to the readers. Once all files are processed, the source is finished.
+ *
+ * <p>The implementation of this class is rather thin. The actual logic for creating the set of
+ * FileSourceSplits to process, and the logic to decide which reader gets what split, are in
+ * {@link FileEnumerator} and in {@link FileSplitAssigner}, respectively.
+ */
+@Internal
+public class StaticFileSplitEnumerator implements SplitEnumerator<FileSourceSplit, PendingSplitsCheckpoint> {
+
+	private static final Logger LOG = LoggerFactory.getLogger(StaticFileSplitEnumerator.class);
+
+	private final SplitEnumeratorContext<FileSourceSplit> context;
+
+	private final FileSplitAssigner splitAssigner;
+
+	// ------------------------------------------------------------------------
+
+	public StaticFileSplitEnumerator(
+			SplitEnumeratorContext<FileSourceSplit> context,
+			FileSplitAssigner splitAssigner) {
+		this.context = checkNotNull(context);
+		this.splitAssigner = checkNotNull(splitAssigner);
+	}
+
+	@Override
+	public void start() {
+		// no resources to start
+	}
+
+	@Override
+	public void close() throws IOException {
+		// no resources to close
+	}
+
+	@Override
+	public void addReader(int subtaskId) {
+		// this source is purely lazy-pull-based, nothing to do upon registration
+	}
+
+	@Override
+	public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
+		if (sourceEvent instanceof RequestSplitEvent) {
+			final RequestSplitEvent requestSplitEvent = (RequestSplitEvent) sourceEvent;
+			assignNextEvents(subtaskId, requestSplitEvent.hostName());
+		}
+		else {
+			LOG.error("Received unrecognized event: {}", sourceEvent);
+		}
+	}
+
+	@Override
+	public void addSplitsBack(List<FileSourceSplit> splits, int subtaskId) {
+		LOG.debug("File Source Enumerator adds splits back: {}", splits);
+		splitAssigner.addSplits(splits);
+	}
+
+	@Override
+	public PendingSplitsCheckpoint snapshotState() {
+		return PendingSplitsCheckpoint.fromCollectionSnapshot(splitAssigner.remainingSplits());
+	}
+
+	// ------------------------------------------------------------------------
+
+	private void assignNextEvents(int subtask, @Nullable String hostname) {
+		if (LOG.isInfoEnabled()) {
+			final String hostInfo = hostname == null ? "(no host locality info)" : "(on host '" + hostname + "')";
+			LOG.info("Subtask {} {} is requesting a file source split", subtask, hostInfo);
+		}
+
+		final Optional<FileSourceSplit> nextSplit = splitAssigner.getNext(hostname);
+		if (nextSplit.isPresent()) {
+			final FileSourceSplit split = nextSplit.get();
+			context.assignSplit(split, subtask);
+			LOG.info("Assigned split to subtask {} : {}", subtask, split);
+		}
+		else {
+			context.sendEventToSourceReader(subtask, new NoMoreSplitsEvent());
+			LOG.info("No more splits available for subtask {}", subtask);
+		}
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/StreamFormatAdapter.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/StreamFormatAdapter.java
new file mode 100644
index 0000000..e085a0e
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/StreamFormatAdapter.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.impl;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connector.file.src.reader.StreamFormat;
+import org.apache.flink.connector.file.src.util.CheckpointedPosition;
+import org.apache.flink.connector.file.src.util.IteratorResultIterator;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.MathUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import static org.apache.flink.connector.file.src.util.Utils.doWithCleanupOnException;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Adapter to turn a {@link StreamFormat} into a {@link BulkFormat}.
+ */
+@Internal
+public final class StreamFormatAdapter<T> implements BulkFormat<T> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final StreamFormat<T> streamFormat;
+
+	public StreamFormatAdapter(StreamFormat<T> streamFormat) {
+		this.streamFormat = checkNotNull(streamFormat);
+	}
+
+	@Override
+	public BulkFormat.Reader<T> createReader(
+			final Configuration config,
+			final Path filePath,
+			final long splitOffset,
+			final long splitLength) throws IOException {
+
+		final TrackingFsDataInputStream trackingStream = openStream(filePath, config, splitOffset);
+
+		return doWithCleanupOnException(trackingStream, () -> {
+			final StreamFormat.Reader<T> streamReader = streamFormat.createReader(
+					config, trackingStream, trackingStream.getFileLength(), splitOffset + splitLength);
+			return new Reader<>(streamReader, trackingStream, CheckpointedPosition.NO_OFFSET, 0L);
+		});
+	}
+
+	@Override
+	public BulkFormat.Reader<T> restoreReader(
+			final Configuration config,
+			final Path filePath,
+			final long splitOffset,
+			final long splitLength,
+			final CheckpointedPosition checkpointedPosition) throws IOException {
+
+		final TrackingFsDataInputStream trackingStream = openStream(filePath, config, splitOffset);
+
+		return doWithCleanupOnException(trackingStream, () -> {
+			// if there never was a checkpointed offset, yet, we need to initialize the reader like a fresh reader.
+			// see the JavaDocs on StreamFormat.restoreReader() for details
+			final StreamFormat.Reader<T> streamReader = checkpointedPosition.getOffset() == CheckpointedPosition.NO_OFFSET
+					? streamFormat.createReader(
+							config, trackingStream, trackingStream.getFileLength(), splitOffset + splitLength)
+					: streamFormat.restoreReader(
+							config, trackingStream, checkpointedPosition.getOffset(),
+							trackingStream.getFileLength(), splitOffset + splitLength);
+
+			// skip the records to skip, but make sure we close the reader if something goes wrong
+			doWithCleanupOnException(streamReader, () -> {
+				long toSkip = checkpointedPosition.getRecordsAfterOffset();
+				while (toSkip > 0 && streamReader.read() != null) {
+					toSkip--;
+				}
+			});
+
+			return new Reader<>(
+					streamReader, trackingStream,
+					checkpointedPosition.getOffset(), checkpointedPosition.getRecordsAfterOffset());
+		});
+	}
+
+	@Override
+	public boolean isSplittable() {
+		return streamFormat.isSplittable();
+	}
+
+	@Override
+	public TypeInformation<T> getProducedType() {
+		return streamFormat.getProducedType();
+	}
+
+	private static TrackingFsDataInputStream openStream(
+			final Path file,
+			final Configuration config,
+			final long seekPosition) throws IOException {
+
+		final FileSystem fs = file.getFileSystem();
+		final long fileLength = fs.getFileStatus(file).getLen();
+
+		final int fetchSize = MathUtils.checkedDownCast(config.get(StreamFormat.FETCH_IO_SIZE).getBytes());
+		if (fetchSize <= 0) {
+			throw new IllegalConfigurationException(
+					String.format("The fetch size (%s) must be > 0, but is %d",
+							StreamFormat.FETCH_IO_SIZE.key(), fetchSize));
+		}
+
+		final FSDataInputStream inStream = fs.open(file);
+		return doWithCleanupOnException(inStream, () -> {
+			inStream.seek(seekPosition);
+			return new TrackingFsDataInputStream(inStream, fileLength, fetchSize);
+		});
+
+	}
+
+	// ----------------------------------------------------------------------------------
+
+	/**
+	 * The reader adapter, from {@link StreamFormat.Reader} to {@link BulkFormat.Reader}.
+	 */
+	public static final class Reader<T> implements BulkFormat.Reader<T> {
+
+		private final StreamFormat.Reader<T> reader;
+		private final TrackingFsDataInputStream stream;
+		private long lastOffset;
+		private long lastRecordsAfterOffset;
+
+		Reader(
+				final StreamFormat.Reader<T> reader,
+				final TrackingFsDataInputStream stream,
+				final long initialOffset,
+				final long initialSkipCount) {
+
+			this.reader = checkNotNull(reader);
+			this.stream = checkNotNull(stream);
+			this.lastOffset = initialOffset;
+			this.lastRecordsAfterOffset = initialSkipCount;
+		}
+
+		@Nullable
+		@Override
+		public RecordIterator<T> readBatch() throws IOException {
+			updateCheckpointedPosition();
+			stream.newBatch();
+
+			final ArrayList<T> result = new ArrayList<>();
+			T next;
+			while (stream.hasRemainingInBatch() && (next = reader.read()) != null) {
+				result.add(next);
+			}
+
+			if (result.isEmpty()) {
+				return null;
+			}
+
+			final RecordIterator<T> iter = new IteratorResultIterator<>(
+					result.iterator(), lastOffset, lastRecordsAfterOffset);
+			lastRecordsAfterOffset += result.size();
+			return iter;
+		}
+
+		@Override
+		public void close() throws IOException {
+			try {
+				reader.close();
+			} finally {
+				// this is just in case, to guard against resource leaks
+				IOUtils.closeQuietly(stream);
+			}
+		}
+
+		private void updateCheckpointedPosition() {
+			final CheckpointedPosition position = reader.getCheckpointedPosition();
+			if (position != null) {
+				this.lastOffset = position.getOffset();
+				this.lastRecordsAfterOffset = position.getRecordsAfterOffset();
+			}
+		}
+	}
+
+	// ----------------------------------------------------------------------------------
+
+	/**
+	 * Utility stream that tracks how much has been read. This is used to decide when the reader
+	 * should finish the current batch and start the next batch. That way we make the batch sizes
+	 * dependent on the consumed data volume, which is more robust than making it dependent on a
+	 * record count.
+	 */
+	private static final class TrackingFsDataInputStream extends FSDataInputStream {
+
+		private final FSDataInputStream stream;
+		private final long fileLength;
+		private final int batchSize;
+		private int remainingInBatch;
+
+		TrackingFsDataInputStream(FSDataInputStream stream, long fileLength, int batchSize) {
+			checkArgument(fileLength > 0L);
+			checkArgument(batchSize > 0);
+			this.stream = stream;
+			this.fileLength = fileLength;
+			this.batchSize = batchSize;
+		}
+
+		@Override
+		public void seek(long desired) throws IOException {
+			stream.seek(desired);
+			remainingInBatch = 0;  // after each seek, we need to start a new batch
+		}
+
+		@Override
+		public long getPos() throws IOException {
+			return stream.getPos();
+		}
+
+		@Override
+		public int read() throws IOException {
+			remainingInBatch--;
+			return stream.read();
+		}
+
+		@Override
+		public int read(byte[] b, int off, int len) throws IOException {
+			remainingInBatch -= len;
+			return stream.read(b, off, len);
+		}
+
+		@Override
+		public void close() throws IOException {
+			stream.close();
+		}
+
+		boolean hasRemainingInBatch() {
+			return remainingInBatch > 0;
+		}
+
+		void newBatch() {
+			remainingInBatch = batchSize;
+		}
+
+		long getFileLength() {
+			return fileLength;
+		}
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/BulkFormat.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/BulkFormat.java
new file mode 100644
index 0000000..5c378ed
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/BulkFormat.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.reader;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.util.CheckpointedPosition;
+import org.apache.flink.connector.file.src.util.MutableRecordAndPosition;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+import org.apache.flink.core.fs.Path;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * The {@code BulkFormat} reads and decodes batches of records at a time. Examples of bulk formats
+ * are formats like ORC or Parquet.
+ *
+ * <p>The outer {@code 'BulkFormat'} class acts mainly as a configuration holder and factory for the reader.
+ * The actual reading is done by the {@link BulkFormat.Reader}, which is created in the
+ * {@link BulkFormat#createReader(Configuration, Path, long, long)} method. If a bulk reader is created
+ * based on a checkpoint during checkpointed streaming execution, then the reader is re-created in
+ * the {@link BulkFormat#restoreReader(Configuration, Path, long, long, CheckpointedPosition)} method.
+ *
+ * <h2>Splitting</h2>
+ *
+ * <p>File splitting means dividing a file into multiple regions that can be read independently.
+ * Whether a format supports splitting is indicated via the {@link #isSplittable()} method.
+ *
+ * <p>Splitting has the potential to increase parallelism and performance, but poses additional
+ * constraints on the format readers: Readers need to be able to find a consistent starting point
+ * within the file near the offset where the split starts, (like the next record delimiter, or a
+ * block start or a sync marker). This is not necessarily possible for all formats, which is why
+ * splitting is optional.
+ *
+ * <h2>Checkpointing</h2>
+ *
+ * <p>The bulk reader returns an iterator per batch that it reads. The iterator produces records together
+ * with a position. That position is stored in the checkpointed state atomically with the processing of
+ * the record. That means it must be the position from where the reading can be resumed AFTER
+ * the record was processed; the position hence points effectively to the record AFTER the current record.
+ *
+ * <p>The simplest way to return this position information is to store no offset and simply store an
+ * incrementing count of records to skip after recovery. Given the above contract, the fist record would
+ * be returned with a records-to-skip count of one, the second one with a record count of two, etc.
+ *
+ * <p>Formats that have the ability to efficiently seek to a record (or to every n-th record) can use
+ * the position field to seek to a record directly and avoid having to read and discard many records
+ * on recovery.
+ *
+ * <p>Note on this design: Why do we not make the position point to the current record and always skip
+ * one record after recovery (the just processed record)? We need to be able to support formats where
+ * skipping records (even one) is not an option. For example formats that execute (pushed down) filters
+ * may want to avoid a skip-record-count all together, so that they don't skip the wrong records when
+ * the filter gets updated around a checkpoint/savepoint.
+ *
+ * <h2>Serializable</h2>
+ *
+ * <p>Like many other API classes in Flink, the outer class is serializable to support sending instances
+ * to distributed workers for parallel execution. This is purely short-term serialization for RPC and
+ * no instance of this will be long-term persisted in a serialized form.
+ *
+ * <h2>Record Batching</h2>
+ *
+ * <p>Internally in the file source, the readers pass batches of records from the reading
+ * threads (that perform the typically blocking I/O operations) to the async mailbox threads that
+ * do the streaming and batch data processing. Passing records in batches (rather than one-at-a-time)
+ * much reduce the thread-to-thread handover overhead.
+ *
+ * <p>For the {@code BulkFormat}, one batch (as returned by {@link BulkFormat.Reader#readBatch()}) is
+ * handed over as one.
+ */
+@PublicEvolving
+public interface BulkFormat<T> extends Serializable, ResultTypeQueryable<T> {
+
+	/**
+	 * Creates a new reader that reads from {@code filePath} starting at {@code offset} and reads
+	 * until {@code length} bytes after the offset.
+	 */
+	BulkFormat.Reader<T> createReader(
+			Configuration config,
+			Path filePath,
+			long splitOffset,
+			long splitLength) throws IOException;
+
+	/**
+	 * Creates a new reader that reads from {@code filePath} starting at {@code offset} and reads
+	 * until {@code length} bytes after the offset. A number of {@code recordsToSkip} records should be
+	 * read and discarded after the offset. This is typically part of restoring a reader to a checkpointed
+	 * position.
+	 */
+	BulkFormat.Reader<T> restoreReader(
+			Configuration config,
+			Path filePath,
+			long splitOffset,
+			long splitLength,
+			CheckpointedPosition checkpointedPosition) throws IOException;
+
+	/**
+	 * Checks whether this format is splittable. Splittable formats allow Flink to create multiple splits
+	 * per file, so that Flink can read multiple regions of the file concurrently.
+	 *
+	 * <p>See {@link BulkFormat top-level JavaDocs} (section "Splitting") for details.
+	 */
+	boolean isSplittable();
+
+	/**
+	 * Gets the type produced by this format. This type will be the type produced by the file
+	 * source as a whole.
+	 */
+	@Override
+	TypeInformation<T> getProducedType();
+
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * The actual reader that reads the batches of records.
+	 */
+	interface Reader<T> extends Closeable {
+
+		/**
+		 * Reads one batch. The method should return null when reaching the end of the input.
+		 * The returned batch will be handed over to the processing threads as one.
+		 *
+		 * <p>The returned iterator object and any contained objects may be held onto by the file source
+		 * for some time, so it should not be immediately reused by the reader.
+		 *
+		 * <p>To implement reuse and to save object allocation, consider using a
+		 * {@link org.apache.flink.connector.file.src.util.Pool} and recycle objects into the Pool in the
+		 * the {@link RecordIterator#releaseBatch()} method.
+		 */
+		@Nullable
+		RecordIterator<T> readBatch() throws IOException;
+
+		/**
+		 * Closes the reader and should release all resources.
+		 */
+		@Override
+		void close() throws IOException;
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * An iterator over records with their position in the file.
+	 * The iterator is closeable to support clean resource release and recycling.
+	 *
+	 * @param <T> The type of the record.
+	 */
+	interface RecordIterator<T> {
+
+		/**
+		 * Gets the next record from the file, together with its position.
+		 *
+		 * <p>The position information returned with the record point to the record AFTER the returned
+		 * record, because it defines the point where the reading should resume once the current record
+		 * is emitted. The position information is put in the source's state when the record
+		 * is emitted. If a checkpoint is taken directly after the record is emitted, the checkpoint must
+		 * to describe where to resume the source reading from after that record.
+		 *
+		 * <p>Objects returned by this method may be reused by the iterator. By the time that this
+		 * method is called again, no object returned from the previous call will be referenced any more.
+		 * That makes it possible to have a single {@link MutableRecordAndPosition} object and return the
+		 * same instance (with updated record and position) on every call.
+		 */
+		@Nullable
+		RecordAndPosition<T> next();
+
+		/**
+		 * Releases the batch that this iterator iterated over.
+		 * This is not supposed to close the reader and its resources, but is simply a signal that this
+		 * iterator is no used any more. This method can be used as a hook to recycle/reuse heavyweight
+		 * object structures.
+		 */
+		void releaseBatch();
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/FileRecordFormat.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/FileRecordFormat.java
new file mode 100644
index 0000000..a5f1def
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/FileRecordFormat.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.reader;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.util.CheckpointedPosition;
+import org.apache.flink.core.fs.Path;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * A reader format that reads individual records from a file.
+ *
+ * <p>This format is for cases where the readers need access to the file directly or need to create a custom
+ * stream. For readers that can directly on input streams, consider using the {@link StreamFormat}, which
+ * is more robust.
+ *
+ * <p>The outer class {@code FileRecordFormat} acts mainly as a configuration holder and factory for the reader.
+ * The actual reading is done by the {@link FileRecordFormat.Reader}, which is created based on an
+ * input stream in the {@link #createReader(Configuration, Path, long, long)} method
+ * and restored (from checkpointed positions) in the method
+ * {@link #restoreReader(Configuration, Path, long, long, long)}.
+ *
+ * <h2>Splitting</h2>
+ *
+ * <p>File splitting means dividing a file into multiple regions that can be read independently.
+ * Whether a format supports splitting is indicated via the {@link #isSplittable()} method.
+ *
+ * <p>Splitting has the potential to increase parallelism and performance, but poses additional
+ * constraints on the format readers: Readers need to be able to find a consistent starting point
+ * within the file near the offset where the split starts, (like the next record delimiter, or a
+ * block start or a sync marker). This is not necessarily possible for all formats, which is why
+ * splitting is optional.
+ *
+ * <h2>Checkpointing</h2>
+ *
+ * <p>Readers can optionally return the current position of the reader, via the
+ * {@link FileRecordFormat.Reader#getCheckpointedPosition()}. This can improve recovery speed from
+ * a checkpoint.
+ *
+ * <p>By default (if that method is not overridden or returns null), then recovery from a checkpoint
+ * works by reading the split again and skipping the number of records that were processed before
+ * the checkpoint. Implementing this method allows formats to directly seek to that position, rather
+ * than read and discard a number or records.
+ *
+ * <p>The position is a combination of offset in the file and a number of records to skip after
+ * this offset (see {@link CheckpointedPosition}). This helps formats that cannot describe all
+ * record positions by an offset, for example  because records are compressed in batches or stored
+ * in a columnar layout (e.g., ORC, Parquet).
+ * The default behavior can be viewed as returning a {@code CheckpointedPosition} where the offset
+ * is always zero and only the {@link CheckpointedPosition#getRecordsAfterOffset()} is incremented
+ * with each emitted record.
+ *
+ * <h2>Serializable</h2>
+ *
+ * <p>Like many other API classes in Flink, the outer class is serializable to support sending instances
+ * to distributed workers for parallel execution. This is purely short-term serialization for RPC and
+ * no instance of this will be long-term persisted in a serialized form.
+ *
+ * <h2>Record Batching</h2>
+ *
+ * <p>Internally in the file source, the readers pass batches of records from the reading
+ * threads (that perform the typically blocking I/O operations) to the async mailbox threads that
+ * do the streaming and batch data processing. Passing records in batches (rather than one-at-a-time)
+ * much reduce the thread-to-thread handover overhead.
+ *
+ * <p>This batching is by default based a number of records. See {@link FileRecordFormat#RECORDS_PER_FETCH}
+ * to configure that handover batch size.
+ */
+@PublicEvolving
+public interface FileRecordFormat<T> extends Serializable, ResultTypeQueryable<T> {
+
+	/**
+	 * Creates a new reader to read in this format. This method is called when a fresh reader is
+	 * created for a split that was assigned from the enumerator. This method may also be called on
+	 * recovery from a checkpoint, if the reader never stored an offset in the checkpoint
+	 * (see {@link #restoreReader(Configuration, Path, long, long, long)} for details.
+	 *
+	 * <p>The {@code fileLen} is the length of the entire file, while {@code splitEnd} is the offset of
+	 * the first byte after the split end boundary (exclusive end boundary). For non-splittable formats,
+	 * both values are identical.
+	 */
+	FileRecordFormat.Reader<T> createReader(
+			Configuration config,
+			Path filePath,
+			long splitOffset,
+			long splitLength) throws IOException;
+
+	/**
+	 * Restores a reader from a checkpointed position. This method is called when the reader is recovered
+	 * from a checkpoint and the reader has previously stored an offset into the checkpoint, by returning
+	 * from the {@link FileRecordFormat.Reader#getCheckpointedPosition()} a value with non-negative
+	 * {@link CheckpointedPosition#getOffset() offset}. That value is supplied as the {@code restoredOffset}.
+	 *
+	 * <p>If the reader never produced a {@code CheckpointedPosition} with a non-negative offset before, then
+	 * this method is not called, and the reader is created in the same way as a fresh reader via the method
+	 * {@link #createReader(Configuration, Path, long, long)} and the appropriate number of
+	 * records are read and discarded, to position to reader to the checkpointed position.
+	 *
+	 * <p>The {@code fileLen} is the length of the entire file, while {@code splitEnd} is the offset of
+	 * the first byte after the split end boundary (exclusive end boundary). For non-splittable formats,
+	 * both values are identical.
+	 */
+	FileRecordFormat.Reader<T> restoreReader(
+			Configuration config,
+			Path filePath,
+			long restoredOffset,
+			long splitOffset,
+			long splitLength) throws IOException;
+
+	/**
+	 * Checks whether this format is splittable. Splittable formats allow Flink to create multiple splits
+	 * per file, so that Flink can read multiple regions of the file concurrently.
+	 *
+	 * <p>See {@link StreamFormat top-level JavaDocs} (section "Splitting") for details.
+	 */
+	boolean isSplittable();
+
+	/**
+	 * Gets the type produced by this format. This type will be the type produced by the file
+	 * source as a whole.
+	 */
+	@Override
+	TypeInformation<T> getProducedType();
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Config option for the number of records to hand over in each fetch.
+	 *
+	 * <p>The number should be large enough so that the thread-to-thread handover overhead
+	 * is amortized across the records, but small enough so that the these records together do
+	 * not consume too memory to be feasible.
+	 */
+	ConfigOption<Integer> RECORDS_PER_FETCH = ConfigOptions
+		.key("source.file.records.fetch-size")
+		.intType()
+		.defaultValue(128)
+		.withDescription("The number of records to hand over from the I/O thread to file reader in one unit.");
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * The actual reader that reads the records.
+	 */
+	interface Reader<T> extends Closeable {
+
+		/**
+		 * Reads the next record. Returns {@code null} when the input has reached its end.
+		 */
+		@Nullable
+		T read() throws IOException;
+
+		/**
+		 * Closes the reader to release all resources.
+		 */
+		@Override
+		void close() throws IOException;
+
+		/**
+		 * Optionally returns the current position of the reader. This can be implemented by readers that
+		 * want to speed up recovery from a checkpoint.
+		 *
+		 * <p>The current position of the reader is the position of the next record that will be returned
+		 * in a call to {@link #read()}. This can be implemented by readers that want to speed up recovery
+		 * from a checkpoint.
+		 *
+		 * <p>See the {@link FileRecordFormat top-level class comment} (section "Checkpointing") for details.
+		 */
+		@Nullable
+		default CheckpointedPosition getCheckpointedPosition() {
+			return null;
+		}
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/SimpleStreamFormat.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/SimpleStreamFormat.java
new file mode 100644
index 0000000..938b40e
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/SimpleStreamFormat.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.reader;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.util.CheckpointedPosition;
+import org.apache.flink.core.fs.FSDataInputStream;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A simple version of the {@link StreamFormat}, for formats that are not splittable.
+ *
+ * <p>This format makes no difference between creating readers from scratch (new file) or from a
+ * checkpoint. Because of that, if the reader actively checkpoints its position
+ * (via the {@link Reader#getCheckpointedPosition()} method) then the checkpointed offset must be
+ * a byte offset in the file from which the stream can be resumed as if it were te beginning of the file.
+ *
+ * <p>For all other details, please check the docs of {@link StreamFormat}.
+ *
+ * @param <T> The type of records created by this format reader.
+ */
+@PublicEvolving
+public abstract class SimpleStreamFormat<T> implements StreamFormat<T> {
+
+	private static final long serialVersionUID = 1L;
+
+	/**
+	 * Creates a new reader. This method is called both for the creation of new reader (from the beginning
+	 * of a file) and for restoring checkpointed readers.
+	 *
+	 * <p>If the reader previously checkpointed an offset, then the input stream will be positioned to
+	 * that particular offset. Readers checkpoint an offset by returning a value from the method
+	 * {@link Reader#getCheckpointedPosition()} method with an offset other than
+	 * {@link CheckpointedPosition#NO_OFFSET}).
+	 */
+	public abstract Reader<T> createReader(Configuration config, FSDataInputStream stream) throws IOException;
+
+	/**
+	 * Gets the type produced by this format. This type will be the type produced by the file
+	 * source as a whole.
+	 */
+	@Override
+	public abstract TypeInformation<T> getProducedType();
+
+	// ------------------------------------------------------------------------
+	//  pre-defined methods from Stream Format
+	// ------------------------------------------------------------------------
+
+	/**
+	 * This format is always not splittable.
+	 */
+	@Override
+	public final boolean isSplittable() {
+		return false;
+	}
+
+	@Override
+	public final Reader<T> createReader(
+			Configuration config,
+			FSDataInputStream stream,
+			long fileLen,
+			long splitEnd) throws IOException {
+
+		checkNotSplit(fileLen, splitEnd);
+
+		final long streamPos = stream.getPos();
+		checkArgument(streamPos == 0L,
+				"SimpleStreamFormat is not splittable, but found non-zero stream position (%s)",
+				streamPos);
+
+		return createReader(config, stream);
+	}
+
+	@Override
+	public final Reader<T> restoreReader(
+			final Configuration config,
+			final FSDataInputStream stream,
+			final long restoredOffset,
+			final long fileLen,
+			final long splitEnd) throws IOException {
+
+		checkNotSplit(fileLen, splitEnd);
+		stream.seek(restoredOffset);
+		return createReader(config, stream);
+	}
+
+	private static void checkNotSplit(long fileLen, long splitEnd) {
+		if (splitEnd != fileLen) {
+			throw new IllegalArgumentException(String.format(
+					"SimpleStreamFormat is not splittable, but found split end (%d) different from file length (%d)",
+					splitEnd, fileLen));
+		}
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/StreamFormat.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/StreamFormat.java
new file mode 100644
index 0000000..57424b2
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/StreamFormat.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.reader;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.connector.file.src.util.CheckpointedPosition;
+import org.apache.flink.core.fs.FSDataInputStream;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * A reader format that reads individual records from a stream.
+ *
+ * <p>The outer class {@code StreamFormat} acts mainly as a configuration holder and factory for the reader.
+ * The actual reading is done by the {@link StreamFormat.Reader}, which is created based on an
+ * input stream in the {@link #createReader(Configuration, FSDataInputStream, long, long)} method
+ * and restored (from checkpointed positions) in the method
+ * {@link #restoreReader(Configuration, FSDataInputStream, long, long, long)}.
+ *
+ * <p>Compared to the {@link BulkFormat}, the stream format handles a few things out-of-the-box, like
+ * deciding how to batch records or dealing with compression.
+ *
+ * <p>For a simpler version of this interface, for format that do not support splitting or logical record
+ * offsets during checkpointing, see {@link SimpleStreamFormat}.
+ *
+ * <h2>Splitting</h2>
+ *
+ * <p>File splitting means dividing a file into multiple regions that can be read independently.
+ * Whether a format supports splitting is indicated via the {@link #isSplittable()} method.
+ *
+ * <p>Splitting has the potential to increase parallelism and performance, but poses additional
+ * constraints on the format readers: Readers need to be able to find a consistent starting point
+ * within the file near the offset where the split starts, (like the next record delimiter, or a
+ * block start or a sync marker). This is not necessarily possible for all formats, which is why
+ * splitting is optional.
+ *
+ * <h2>Checkpointing</h2>
+ *
+ * <p>Readers can optionally return the current position of the reader, via the
+ * {@link StreamFormat.Reader#getCheckpointedPosition()}. This can improve recovery speed from
+ * a checkpoint.
+ *
+ * <p>By default (if that method is not overridden or returns null), then recovery from a checkpoint
+ * works by reading the split again and skipping the number of records that were processed before
+ * the checkpoint. Implementing this method allows formats to directly seek to that position, rather
+ * than read and discard a number or records.
+ *
+ * <p>The position is a combination of offset in the file and a number of records to skip after
+ * this offset (see {@link CheckpointedPosition}). This helps formats that cannot describe all
+ * record positions by an offset, for example  because records are compressed in batches or stored
+ * in a columnar layout (e.g., ORC, Parquet).
+ * The default behavior can be viewed as returning a {@code CheckpointedPosition} where the offset
+ * is always zero and only the {@link CheckpointedPosition#getRecordsAfterOffset()} is incremented
+ * with each emitted record.
+ *
+ * <h2>Serializable</h2>
+ *
+ * <p>Like many other API classes in Flink, the outer class is serializable to support sending instances
+ * to distributed workers for parallel execution. This is purely short-term serialization for RPC and
+ * no instance of this will be long-term persisted in a serialized form.
+ *
+ * <h2>Record Batching</h2>
+ *
+ * <p>Internally in the file source, the readers pass batches of records from the reading
+ * threads (that perform the typically blocking I/O operations) to the async mailbox threads that
+ * do the streaming and batch data processing. Passing records in batches (rather than one-at-a-time)
+ * much reduces the thread-to-thread handover overhead.
+ *
+ * <p>This batching is by default based on I/O fetch size for the {@code StreamFormat}, meaning
+ * the set of records derived from one I/O buffer will be handed over as one.
+ * See {@link StreamFormat#FETCH_IO_SIZE} to configure that fetch size.
+ *
+ * @param <T> The type of records created by this format reader.
+ */
+@PublicEvolving
+public interface StreamFormat<T> extends Serializable, ResultTypeQueryable<T> {
+
+	/**
+	 * Creates a new reader to read in this format. This method is called when a fresh reader is
+	 * created for a split that was assigned from the enumerator. This method may also be called on
+	 * recovery from a checkpoint, if the reader never stored an offset in the checkpoint
+	 * (see {@link #restoreReader(Configuration, FSDataInputStream, long, long, long)} for details.
+	 *
+	 * <p>If the format is {@link #isSplittable() splittable}, then the {@code stream} is positioned
+	 * to the beginning of the file split, otherwise it will be at position zero.
+	 *
+	 * <p>The {@code fileLen} is the length of the entire file, while {@code splitEnd} is the offset of
+	 * the first byte after the split end boundary (exclusive end boundary). For non-splittable formats,
+	 * both values are identical.
+	 */
+	Reader<T> createReader(
+			Configuration config,
+			FSDataInputStream stream,
+			long fileLen,
+			long splitEnd) throws IOException;
+
+	/**
+	 * Restores a reader from a checkpointed position. This method is called when the reader is recovered
+	 * from a checkpoint and the reader has previously stored an offset into the checkpoint, by returning
+	 * from the {@link Reader#getCheckpointedPosition()} a value with non-negative
+	 * {@link CheckpointedPosition#getOffset() offset}. That value is supplied as the {@code restoredOffset}.
+	 *
+	 * <p>If the format is {@link #isSplittable() splittable}, then the {@code stream} is positioned
+	 * to the beginning of the file split, otherwise it will be at position zero. The stream is NOT
+	 * positioned to the checkpointed offset, because the format is free to interpret this offset in
+	 * a different way than the byte offset in the file (for example as a record index).
+	 *
+	 * <p>If the reader never produced a {@code CheckpointedPosition} with a non-negative offset before, then
+	 * this method is not called, and the reader is created in the same way as a fresh reader via the method
+	 * {@link #createReader(Configuration, FSDataInputStream, long, long)} and the appropriate number of
+	 * records are read and discarded, to position to reader to the checkpointed position.
+	 *
+	 * <p>Having a different method for restoring readers to a checkpointed position allows readers to
+	 * seek to the start position differently in that case, compared to when the reader is created from
+	 * a split offset generated at the enumerator. In the latter case, the offsets are commonly "approximate",
+	 * because the enumerator typically generates splits based only on metadata. Reader then have to skip some
+	 * bytes while searching for the next position to start from (based on a delimiter, sync marker, block
+	 * offset, etc.). In contrast, checkpointed offsets are often precise, because they were recorded as the
+	 * reader when through the data stream. Starting a reader from a checkpointed offset may hence not
+	 * require and search for the next delimiter/block/marker.
+	 *
+	 * <p>The {@code fileLen} is the length of the entire file, while {@code splitEnd} is the offset of
+	 * the first byte after the split end boundary (exclusive end boundary). For non-splittable formats,
+	 * both values are identical.
+	 */
+	Reader<T> restoreReader(
+			Configuration config,
+			FSDataInputStream stream,
+			long restoredOffset,
+			long fileLen,
+			long splitEnd) throws IOException;
+
+	/**
+	 * Checks whether this format is splittable. Splittable formats allow Flink to create multiple splits
+	 * per file, so that Flink can read multiple regions of the file concurrently.
+	 *
+	 * <p>See {@link StreamFormat top-level JavaDocs} (section "Splitting") for details.
+	 */
+	boolean isSplittable();
+
+	/**
+	 * Gets the type produced by this format. This type will be the type produced by the file
+	 * source as a whole.
+	 */
+	@Override
+	TypeInformation<T> getProducedType();
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * The config option to define how many bytes to be read by the I/O thread in one fetch operation.
+	 */
+	ConfigOption<MemorySize> FETCH_IO_SIZE = ConfigOptions
+			.key("source.file.stream.io-fetch-size")
+			.memoryType()
+			.defaultValue(MemorySize.ofMebiBytes(1L))
+			.withDescription("The approximate of bytes per fetch that is passed from the I/O thread to file reader.");
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * The actual reader that reads the records.
+	 */
+	interface Reader<T> extends Closeable {
+
+		/**
+		 * Reads the next record. Returns {@code null} when the input has reached its end.
+		 */
+		@Nullable
+		T read() throws IOException;
+
+		/**
+		 * Closes the reader to release all resources.
+		 */
+		@Override
+		void close() throws IOException;
+
+		/**
+		 * Optionally returns the current position of the reader. This can be implemented by readers that
+		 * want to speed up recovery from a checkpoint.
+		 *
+		 * <p>The current position of the reader is the position of the next record that will be returned
+		 * in a call to {@link #read()}. This can be implemented by readers that want to speed up recovery
+		 * from a checkpoint.
+		 *
+		 * <p>See the {@link StreamFormat top-level class comment} (section "Checkpointing") for details.
+		 */
+		@Nullable
+		default CheckpointedPosition getCheckpointedPosition() {
+			return null;
+		}
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/TextLineFormat.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/TextLineFormat.java
new file mode 100644
index 0000000..4eccd36
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/TextLineFormat.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.reader;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FSDataInputStream;
+
+import javax.annotation.Nullable;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+/**
+ * A reader format that text lines from a file.
+ *
+ * <p>The reader uses Java's built-in {@link InputStreamReader} to decode the byte stream using various
+ * supported charset encodings.
+ *
+ * <p>This format does not support optimized recovery from checkpoints. On recovery, it will re-read and
+ * discard the number of lined that were processed before the last checkpoint. That is due to the fact
+ * that the offsets of lines in the file cannot be tracked through the charset decoders with their
+ * internal buffering of stream input and charset decoder state.
+ */
+@PublicEvolving
+public class TextLineFormat extends SimpleStreamFormat<String> {
+
+	private static final long serialVersionUID = 1L;
+
+	public static final String DEFAULT_CHARSET_NAME = "UTF-8";
+
+	private final String charsetName;
+
+	public TextLineFormat() {
+		this(DEFAULT_CHARSET_NAME);
+	}
+
+	public TextLineFormat(String charsetName) {
+		this.charsetName = charsetName;
+	}
+
+	@Override
+	public Reader createReader(Configuration config, FSDataInputStream stream) throws IOException {
+		final BufferedReader reader = new BufferedReader(new InputStreamReader(stream, charsetName));
+		return new Reader(reader);
+	}
+
+	@Override
+	public TypeInformation<String> getProducedType() {
+		return Types.STRING;
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * The actual reader for the {@code TextLineFormat}.
+	 */
+	public static final class Reader implements StreamFormat.Reader<String> {
+
+		private final BufferedReader reader;
+
+		Reader(final BufferedReader reader) {
+			this.reader = reader;
+		}
+
+		@Nullable
+		@Override
+		public String read() throws IOException {
+			return reader.readLine();
+		}
+
+		@Override
+		public void close() throws IOException {
+			reader.close();
+		}
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/ArrayResultIterator.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/ArrayResultIterator.java
new file mode 100644
index 0000000..af500c5
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/ArrayResultIterator.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.util;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+
+import javax.annotation.Nullable;
+
+/**
+ * A simple {@link BulkFormat.RecordIterator} that returns the elements of an array, one after the other.
+ * The iterator is mutable to support object reuse and supports recycling.
+ *
+ * @param <E> The type of the record returned by the iterator.
+ */
+@PublicEvolving
+public final class ArrayResultIterator<E> extends RecyclableIterator<E> implements BulkFormat.RecordIterator<E> {
+
+	private E[] records;
+	private int num;
+	private int pos;
+
+	private final MutableRecordAndPosition<E> recordAndPosition;
+
+	/**
+	 * Creates a new, initially empty, {@code ArrayResultIterator}.
+	 */
+	public ArrayResultIterator() {
+		this(null);
+	}
+
+	/**
+	 * Creates a new, initially empty, {@code ArrayResultIterator}.
+	 * The iterator calls the given {@code recycler} when the {@link #releaseBatch()} method is called.
+	 */
+	public ArrayResultIterator(@Nullable Runnable recycler) {
+		super(recycler);
+		this.recordAndPosition = new MutableRecordAndPosition<>();
+	}
+
+	// -------------------------------------------------------------------------
+	//  Setting
+	// -------------------------------------------------------------------------
+
+	/**
+	 * Sets the records to be returned by this iterator.
+	 * Each record's {@link RecordAndPosition} will have the same offset (for {@link RecordAndPosition#getOffset()}.
+	 * The first returned record will have a records-to-skip count of {@code skipCountOfFirst + 1}, following
+	 * the contract that each record needs to point to the position AFTER itself
+	 * (because a checkpoint taken after the record was emitted needs to resume from after that record).
+	 */
+	public void set(final E[] records, final int num, final long offset, final long skipCountOfFirst) {
+		this.records = records;
+		this.num = num;
+		this.pos = 0;
+		this.recordAndPosition.set(null, offset, skipCountOfFirst);
+	}
+
+	// -------------------------------------------------------------------------
+	//  Result Iterator Methods
+	// -------------------------------------------------------------------------
+
+	@Nullable
+	@Override
+	public RecordAndPosition<E> next() {
+		if (pos < num) {
+			recordAndPosition.setNext(records[pos++]);
+			return recordAndPosition;
+		}
+		else {
+			return null;
+		}
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/CheckpointedPosition.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/CheckpointedPosition.java
new file mode 100644
index 0000000..bbdd09c
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/CheckpointedPosition.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.util;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * The position of a reader, to be stored in a checkpoint. The position consists of a record offset
+ * and a number of records to skip after that offset. The offset is optional, it may take the value
+ * {@link CheckpointedPosition#NO_OFFSET}, in which case only the records-to-skip count is used.
+ *
+ * <p>The combination of offset and records-to-skip makes it possible to represent the position of a wide
+ * variety of readers. In the simplest case, readers might store no offset and only store how many
+ * records they previously returned. On the other hand, readers that can precisely point to each
+ * record via a position can store that in the checkpoint. Readers that have occasional addressable positions
+ * (like sync markers, block starts, etc.) can store those together with the records skipped after the
+ * last marker.
+ */
+@PublicEvolving
+public final class CheckpointedPosition implements Serializable {
+
+	private static final long serialVersionUID = 1L;
+
+	/**
+	 * Constant for the offset, reflecting that the position does not contain any offset information.
+	 * It is used in positions that are defined only by a number of records to skip.
+	 */
+	public static final long NO_OFFSET = -1L;
+
+	private final long offset;
+	private final long recordsAfterOffset;
+
+	/**
+	 * Creates a new CheckpointedPosition for given offset and records-to-skip.
+	 *
+	 * @param offset The offset that the reader will seek to when restored from this checkpoint.
+	 * @param recordsAfterOffset The records to skip after the offset.
+	 */
+	public CheckpointedPosition(long offset, long recordsAfterOffset) {
+		checkArgument(offset >= -1, "offset must be >= 0 or NO_OFFSET");
+		checkArgument(recordsAfterOffset >= 0, "recordsAfterOffset must be >= 0");
+		this.offset = offset;
+		this.recordsAfterOffset = recordsAfterOffset;
+	}
+
+	/**
+	 * Gets the offset that the reader will seek to when restored from this checkpoint.
+	 */
+	public long getOffset() {
+		return offset;
+	}
+
+	/**
+	 * Gets the records to skip after the offset.
+	 */
+	public long getRecordsAfterOffset() {
+		return recordsAfterOffset;
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		final CheckpointedPosition that = (CheckpointedPosition) o;
+		return offset == that.offset &&
+			recordsAfterOffset == that.recordsAfterOffset;
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(offset, recordsAfterOffset);
+	}
+
+	@Override
+	public String toString() {
+		return String.format("CheckpointedPosition: offset=%s, recordsToSkip=%d",
+				offset == NO_OFFSET ? "NO_OFFSET" : String.valueOf(offset),
+				recordsAfterOffset);
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/IteratorResultIterator.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/IteratorResultIterator.java
new file mode 100644
index 0000000..5369e54
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/IteratorResultIterator.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.util;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+
+import javax.annotation.Nullable;
+
+import java.util.Iterator;
+
+/**
+ * A simple {@link BulkFormat.RecordIterator} that returns the elements of an iterator,
+ * augmented with position information.
+ *
+ * @param <E> The type of the record returned by the iterator.
+ */
+@PublicEvolving
+public final class IteratorResultIterator<E> extends RecyclableIterator<E> implements BulkFormat.RecordIterator<E> {
+
+	private final Iterator<E> records;
+
+	private final MutableRecordAndPosition<E> recordAndPosition;
+
+	/**
+	 * Creates a new {@code RecordIterator} returning the records from the given iterator, augmented
+	 * with their position information.
+	 *
+	 * <p>Each record's {@link RecordAndPosition} will have the same offset value for
+	 * {@link RecordAndPosition#getOffset()}. The first returned record will have a records-to-skip count
+	 * of {@code startingSkipCount + 1}, following the contract that each record needs to point to the
+	 * position AFTER itself (because a checkpoint taken after the record was emitted needs to resume
+	 * from after that record).
+	 */
+	public IteratorResultIterator(
+			final Iterator<E> records,
+			final long offset,
+			final long startingSkipCount) {
+		this(records, offset, startingSkipCount, null);
+	}
+
+	/**
+	 * Creates a new {@code RecordIterator} returning the records from the given iterator, augmented
+	 * with their position information. When the iterator is marked as done (via {@link #releaseBatch()}, the
+	 * given {@code recycler} is called.
+	 *
+	 * <p>Each record's {@link RecordAndPosition} will have the same offset value for
+	 * {@link RecordAndPosition#getOffset()}. The first returned record will have a records-to-skip count
+	 * of {@code startingSkipCount + 1}, following the contract that each record needs to point to the
+	 * position AFTER itself (because a checkpoint taken after the record was emitted needs to resume
+	 * from after that record).
+	 */
+	public IteratorResultIterator(
+			final Iterator<E> records,
+			final long offset,
+			final long startingSkipCount,
+			final @Nullable Runnable recycler) {
+
+		super(recycler);
+		this.records = records;
+		this.recordAndPosition = new MutableRecordAndPosition<>();
+		this.recordAndPosition.setPosition(offset, startingSkipCount);
+	}
+
+	// -------------------------------------------------------------------------
+	//  Result Iterator Methods
+	// -------------------------------------------------------------------------
+
+	@Nullable
+	@Override
+	public RecordAndPosition<E> next() {
+		if (records.hasNext()) {
+			recordAndPosition.setNext(records.next());
+			return recordAndPosition;
+		} else {
+			return null;
+		}
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/MutableRecordAndPosition.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/MutableRecordAndPosition.java
new file mode 100644
index 0000000..07de831
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/MutableRecordAndPosition.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.util;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+
+/**
+ * A mutable version of the {@link RecordAndPosition}.
+ *
+ * <p>This mutable object is useful in cases where only once instance of a {@code RecordAndPosition}
+ * is needed at a time, like for the result values of the {@link BulkFormat.RecordIterator}.
+ */
+@PublicEvolving
+public class MutableRecordAndPosition<E> extends RecordAndPosition<E> {
+
+	/**
+	 * Updates the record and position in this object.
+	 */
+	public void set(E record, long offset, long recordSkipCount) {
+		this.record = record;
+		this.offset = offset;
+		this.recordSkipCount = recordSkipCount;
+	}
+
+	/**
+	 * Sets the position without setting a record.
+	 */
+	public void setPosition(long offset, long recordSkipCount) {
+		this.offset = offset;
+		this.recordSkipCount = recordSkipCount;
+	}
+
+	/**
+	 * Sets the next record of a sequence. This increments the {@code recordSkipCount} by one.
+	 */
+	public void setNext(E record) {
+		this.record = record;
+		this.recordSkipCount++;
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/Pool.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/Pool.java
new file mode 100644
index 0000000..274e775
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/Pool.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.util;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.ArrayBlockingQueue;
+
+/**
+ * A pool to cache and recycle heavyweight objects, to reduce object allocation.
+ *
+ * <p>This pool can be used in the {@link BulkFormat.Reader}, when the returned objects are heavyweight
+ * and need to be reused for efficiency. Because the reading happens in I/O threads while the record
+ * processing happens in Flink's main processing threads, these objects cannot be reused immediately
+ * after being returned. They can be reused, once they are recycled back to the pool.
+ *
+ * @param <T> The type of object cached in the pool.
+ */
+@PublicEvolving
+public class Pool<T> {
+
+	private final ArrayBlockingQueue<T> pool;
+
+	private final Recycler<T> recycler;
+
+	private final int poolCapacity;
+	private int poolSize;
+
+	/**
+	 * Creates a pool with the given capacity. No more than that many elements may be added to the pool.
+	 */
+	public Pool(int poolCapacity) {
+		this.pool = new ArrayBlockingQueue<>(poolCapacity);
+		this.recycler = this::addBack;
+		this.poolCapacity = poolCapacity;
+		this.poolSize = 0;
+	}
+
+	/**
+	 * Gets the recycler for this pool. The recycler returns its given objects back to this pool.
+	 */
+	public Recycler<T> recycler() {
+		return recycler;
+	}
+
+	/**
+	 * Adds an entry to the pool with an optional payload.
+	 * This method fails if called more often than the pool capacity specified during construction.
+	 */
+	public synchronized void add(T object) {
+		if (poolSize >= poolCapacity) {
+			throw new IllegalStateException("No space left in pool");
+		}
+		poolSize++;
+
+		addBack(object);
+	}
+
+	/**
+	 * Gets the next cached entry. This blocks until the next entry is available.
+	 */
+	public T pollEntry() throws InterruptedException {
+		return pool.take();
+	}
+
+	/**
+	 * Tries to get the next cached entry. If the pool is empty, this method returns null.
+	 */
+	@Nullable
+	public T tryPollEntry() {
+		return pool.poll();
+	}
+
+	/**
+	 * Internal callback to put an entry back to the pool.
+	 */
+	void addBack(T object) {
+		pool.add(object);
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * A Recycler puts objects into the pool that the recycler is associated with.
+	 *
+	 * @param <T> The pooled and recycled type.
+	 */
+	@FunctionalInterface
+	public interface Recycler<T> {
+
+		/**
+		 * Recycles the given object to the pool that this recycler works with.
+		 */
+		void recycle(T object);
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/RecordAndPosition.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/RecordAndPosition.java
new file mode 100644
index 0000000..4c5079c
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/RecordAndPosition.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.util;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * A record, together with the reader position to be stored in the checkpoint.
+ *
+ * <p>The position defines the point in the reader AFTER the record. Record processing and
+ * updating checkpointed state happens atomically. The position points to where the reader should
+ * resume after this record is processed.
+ *
+ * <p>For example, the very first record in a file split could have an {@code offset} of zero and a
+ * {@code recordSkipCount} of one.
+ *
+ * <p>This class is immutable for safety. Use {@link MutableRecordAndPosition} if you need a mutable
+ * version for efficiency.
+ *
+ * <p>Note on this design: Why do we not make the position point to the current record and always skip
+ * one record after recovery (the just processed record)? We need to be able to support formats where
+ * skipping records (even one) is not an option. For example formats that execute (pushed down) filters
+ * may want to avoid a skip-record-count all together, so that they don't skip the wrong records when
+ * the filter gets updated around a checkpoint/savepoint.
+ */
+@PublicEvolving
+public class RecordAndPosition<E> {
+
+	/**
+	 * Constant for the offset, reflecting that the position does not contain any offset information.
+	 * It is used in positions that are defined only by a number of records to skip.
+	 */
+	public static final long NO_OFFSET = CheckpointedPosition.NO_OFFSET;
+
+	// these are package private and non-final so we can mutate them from the MutableRecordAndPosition
+	E record;
+	long offset;
+	long recordSkipCount;
+
+	/**
+	 * Creates a new {@code RecordAndPosition} with the given record and position info.
+	 */
+	public RecordAndPosition(E record, long offset, long recordSkipCount) {
+		this.record = record;
+		this.offset = offset;
+		this.recordSkipCount = recordSkipCount;
+	}
+
+	/**
+	 * Package private constructor for the {@link MutableRecordAndPosition}.
+	 */
+	RecordAndPosition() {}
+
+	// ------------------------------------------------------------------------
+
+	public E getRecord() {
+		return record;
+	}
+
+	public long getOffset() {
+		return offset;
+	}
+
+	public long getRecordSkipCount() {
+		return recordSkipCount;
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public String toString() {
+		return String.format("%s @ %d + %d", record, offset, recordSkipCount);
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/RecyclableIterator.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/RecyclableIterator.java
new file mode 100644
index 0000000..47cf967
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/RecyclableIterator.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+
+import javax.annotation.Nullable;
+
+/**
+ * Utility base class for iterators that accept a recycler.
+ *
+ * @param <E> The type of the records returned by the iterator.
+ */
+@Internal
+abstract class RecyclableIterator<E> implements BulkFormat.RecordIterator<E> {
+
+	@Nullable
+	private final Runnable recycler;
+
+	/**
+	 * Creates a {@code RecyclableIterator} with the given optional recycler.
+	 */
+	protected RecyclableIterator(@Nullable Runnable recycler) {
+		this.recycler = recycler;
+	}
+
+	@Override
+	public void releaseBatch() {
+		if (recycler != null) {
+			recycler.run();
+		}
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/SingletonResultIterator.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/SingletonResultIterator.java
new file mode 100644
index 0000000..7b56e5c
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/SingletonResultIterator.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.util;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+
+import javax.annotation.Nullable;
+
+/**
+ * A simple {@link BulkFormat.RecordIterator} that returns a single value.
+ * The iterator is mutable to support object reuse and supports recycling.
+ *
+ * @param <E> The type of the record returned by the iterator.
+ */
+@PublicEvolving
+public final class SingletonResultIterator<E> extends RecyclableIterator<E> implements BulkFormat.RecordIterator<E> {
+
+	@Nullable
+	private RecordAndPosition<E> element;
+
+	private final MutableRecordAndPosition<E> recordAndPosition;
+
+	public SingletonResultIterator() {
+		this(null);
+	}
+
+	public SingletonResultIterator(@Nullable Runnable recycler) {
+		super(recycler);
+		this.recordAndPosition = new MutableRecordAndPosition<>();
+	}
+
+	// -------------------------------------------------------------------------
+	//  Setting
+	// -------------------------------------------------------------------------
+
+	/**
+	 * Sets the single record to be returned by this iterator.
+	 * The offset and records-to-skip count will be used as provided here for the returned
+	 * {@link RecordAndPosition}, meaning they need to point to AFTER this specific record
+	 * (because a checkpoint taken after the record was processed needs to resume from after this record).
+	 */
+	public void set(final E element, final long offset, final long skipCount) {
+		this.recordAndPosition.set(element, offset, skipCount);
+		this.element = this.recordAndPosition;
+	}
+
+	// -------------------------------------------------------------------------
+	//  Result Iterator Methods
+	// -------------------------------------------------------------------------
+
+	@Nullable
+	@Override
+	public RecordAndPosition<E> next() {
+		final RecordAndPosition<E> next = this.element;
+		this.element = null;
+		return next;
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/Utils.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/Utils.java
new file mode 100644
index 0000000..bdadc1b
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/Utils.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.util;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Miscellaneous utilities for the file source.
+ */
+@PublicEvolving
+public final class Utils {
+
+	/**
+	 * Runs the given {@code SupplierWithException} (a piece of code producing a result).
+	 * If an exception happens during that, the given closable is quietly closed.
+	 */
+	public static <E> E doWithCleanupOnException(
+			final Closeable toCleanUp,
+			final SupplierWithException<E, IOException> code) throws IOException {
+
+		try {
+			return code.get();
+		}
+		catch (Throwable t) {
+			IOUtils.closeQuietly(toCleanUp);
+			ExceptionUtils.rethrowIOException(t);
+			return null; // silence the compiler
+		}
+	}
+
+	/**
+	 * Runs the given {@code Runnable}. If an exception happens during that, the given closable
+	 * is quietly closed.
+	 */
+	public static void doWithCleanupOnException(
+			final Closeable toCleanUp,
+			final ThrowingRunnable<IOException> code) throws IOException {
+
+		doWithCleanupOnException(toCleanUp, (SupplierWithException<Void, IOException>) () -> {
+			code.run();
+			return null;
+		});
+	}
+
+	// ------------------------------------------------------------------------
+
+	/** This class is not meant to be instantiated. */
+	private Utils() {}
+}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceHeavyThroughputTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceHeavyThroughputTest.java
new file mode 100644
index 0000000..278d214
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceHeavyThroughputTest.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src;
+
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.event.NoMoreSplitsEvent;
+import org.apache.flink.connector.file.src.reader.SimpleStreamFormat;
+import org.apache.flink.connector.file.src.reader.StreamFormat;
+import org.apache.flink.connector.file.src.testutils.TestingFileSystem;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+
+import org.junit.After;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * This test simulates readers that just produce byte arrays very fast. The test
+ * is meant to check that this does not break the system in terms of object allocations, etc.
+ */
+public class FileSourceHeavyThroughputTest {
+
+	/** Testing file system reference, to be cleaned up in an @After method. That way it also gets
+	 * cleaned up on a test failure, without needing finally clauses in every test. */
+	private TestingFileSystem testFs;
+
+	@After
+	public void unregisterTestFs() throws Exception {
+		if (testFs != null) {
+			testFs.unregister();
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testHeavyThroughput() throws Exception {
+		final Path path = new Path("testfs:///testpath");
+		final long fileSize = 20L << 30; // 20 GB
+		final FileSourceSplit split = new FileSourceSplit("testsplitId", path, 0, fileSize);
+
+		testFs = TestingFileSystem.createForFileStatus(
+				path.toUri().getScheme(),
+				TestingFileSystem.TestFileStatus.forFileWithStream(path, fileSize, new GeneratingInputStream(fileSize)));
+		testFs.register();
+
+		final FileSource<byte[]> source = FileSource.forRecordStreamFormat(new ArrayReaderFormat(), path).build();
+		final SourceReader<byte[], FileSourceSplit> reader = source.createReader(new NoOpReaderContext());
+		reader.addSplits(Collections.singletonList(split));
+		reader.handleSourceEvents(new NoMoreSplitsEvent());
+
+		final ReaderOutput<byte[]> out = new NoOpReaderOutput<>();
+
+		InputStatus status;
+		while ((status = reader.pollNext(out)) != InputStatus.END_OF_INPUT) {
+			// if nothing is available currently, wait for more
+			if (status == InputStatus.NOTHING_AVAILABLE) {
+				reader.isAvailable().get();
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  test mocks
+	// ------------------------------------------------------------------------
+
+	private static final class ArrayReader implements StreamFormat.Reader<byte[]> {
+
+		private static final int ARRAY_SIZE = 1 << 20; // 1 MiByte
+
+		private final FSDataInputStream in;
+
+		ArrayReader(FSDataInputStream in) {
+			this.in = in;
+		}
+
+		@Nullable
+		@Override
+		public byte[] read() throws IOException {
+			final byte[] array = new byte[ARRAY_SIZE];
+			final int read = in.read(array);
+			if (read == array.length) {
+				return array;
+			} else if (read == -1) {
+				return null;
+			} else {
+				return Arrays.copyOf(array, read);
+			}
+		}
+
+		@Override
+		public void close() throws IOException {
+			in.close();
+		}
+	}
+
+	private static final class ArrayReaderFormat extends SimpleStreamFormat<byte[]> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Reader<byte[]> createReader(Configuration config, FSDataInputStream stream) throws IOException {
+			return new ArrayReader(stream);
+		}
+
+		@Override
+		public TypeInformation<byte[]> getProducedType() {
+			return PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO;
+		}
+	}
+
+	private static final class GeneratingInputStream extends FSDataInputStream {
+
+		private final long length;
+		private long pos;
+
+		GeneratingInputStream(long length) {
+			this.length = length;
+		}
+
+		@Override
+		public void seek(long desired) throws IOException {
+			checkArgument(desired >= 0 && desired <= length);
+			pos = desired;
+		}
+
+		@Override
+		public long getPos() throws IOException {
+			return pos;
+		}
+
+		@Override
+		public int read() throws IOException {
+			if (pos < length) {
+				pos++;
+				return 0;
+			} else {
+				return -1;
+			}
+		}
+
+		@Override
+		public int read(byte[] b, int off, int len) throws IOException {
+			if (pos < length) {
+				final int remaining = (int) Math.min(len, length - pos);
+				pos += remaining;
+				return remaining;
+			} else {
+				return -1;
+			}
+		}
+	}
+
+	private static final class NoOpReaderContext implements SourceReaderContext {
+
+		@Override
+		public MetricGroup metricGroup() {
+			return new UnregisteredMetricsGroup();
+		}
+
+		@Override
+		public Configuration getConfiguration() {
+			return new Configuration();
+		}
+
+		@Override
+		public String getLocalHostName() {
+			return "localhost";
+		}
+
+		@Override
+		public void sendSourceEventToCoordinator(SourceEvent sourceEvent) {}
+	}
+
+	private static final class NoOpReaderOutput<E> implements ReaderOutput<E> {
+
+		@Override
+		public void collect(E record) {}
+
+		@Override
+		public void collect(E record, long timestamp) {}
+
+		@Override
+		public void emitWatermark(Watermark watermark) {}
+
+		@Override
+		public void markIdle() {}
+
+		@Override
+		public SourceOutput<E> createOutputForSplit(String splitId) {
+			return this;
+		}
+
+		@Override
+		public void releaseOutputForSplit(String splitId) {}
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceSplitSerializerTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceSplitSerializerTest.java
new file mode 100644
index 0000000..16f5720
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceSplitSerializerTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src;
+
+import org.apache.flink.connector.file.src.util.CheckpointedPosition;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.SimpleVersionedSerialization;
+
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+
+/**
+ * Unit tests for the {@link FileSourceSplitSerializer}.
+ */
+public class FileSourceSplitSerializerTest {
+
+	@Test
+	public void serializeSplitWithHosts() throws Exception {
+		final FileSourceSplit split = new FileSourceSplit(
+			"random-id",
+			new Path("hdfs://namenode:14565/some/path/to/a/file"),
+			100_000_000,
+			64_000_000,
+			"host1", "host2", "host3");
+
+		final FileSourceSplit deSerialized = serializeAndDeserialize(split);
+
+		assertSplitsEqual(split, deSerialized);
+	}
+
+	@Test
+	public void serializeSplitWithoutHosts() throws Exception {
+		final FileSourceSplit split = new FileSourceSplit(
+			"some-id",
+			new Path("file:/some/path/to/a/file"),
+			0,
+			0);
+
+		final FileSourceSplit deSerialized = serializeAndDeserialize(split);
+
+		assertSplitsEqual(split, deSerialized);
+	}
+
+	@Test
+	public void serializeSplitWithReaderPosition() throws Exception {
+		final FileSourceSplit split = new FileSourceSplit(
+			"random-id",
+			new Path("hdfs://namenode:14565/some/path/to/a/file"),
+			100_000_000,
+			64_000_000,
+			new String[] {"host1", "host2", "host3"},
+			new CheckpointedPosition(7665391L, 100L));
+
+		final FileSourceSplit deSerialized = serializeAndDeserialize(split);
+
+		assertSplitsEqual(split, deSerialized);
+	}
+
+	@Test
+	public void repeatedSerialization() throws Exception {
+		final FileSourceSplit split = new FileSourceSplit(
+			"an-id",
+			new Path("s3://some-bucket/key/to/the/object"),
+			0,
+			1234567);
+
+		serializeAndDeserialize(split);
+		serializeAndDeserialize(split);
+		final FileSourceSplit deSerialized = serializeAndDeserialize(split);
+
+		assertSplitsEqual(split, deSerialized);
+	}
+
+	@Test
+	public void repeatedSerializationCaches() throws Exception {
+		final FileSourceSplit split = new FileSourceSplit(
+			"random-id",
+			new Path("hdfs://namenode:14565/some/path/to/a/file"),
+			100_000_000,
+			64_000_000,
+			"host1", "host2", "host3");
+
+		final byte[] ser1 = FileSourceSplitSerializer.INSTANCE.serialize(split);
+		final byte[] ser2 = FileSourceSplitSerializer.INSTANCE.serialize(split);
+
+		assertSame(ser1, ser2);
+	}
+
+	// ------------------------------------------------------------------------
+	//  test utils
+	// ------------------------------------------------------------------------
+
+	private static FileSourceSplit serializeAndDeserialize(FileSourceSplit split) throws IOException {
+		final FileSourceSplitSerializer serializer = new FileSourceSplitSerializer();
+		final byte[] bytes = SimpleVersionedSerialization.writeVersionAndSerialize(serializer, split);
+		return SimpleVersionedSerialization.readVersionAndDeSerialize(serializer, bytes);
+	}
+
+	static void assertSplitsEqual(FileSourceSplit expected, FileSourceSplit actual) {
+		assertEquals(expected.splitId(), actual.splitId());
+		assertEquals(expected.path(), actual.path());
+		assertEquals(expected.offset(), actual.offset());
+		assertEquals(expected.length(), actual.length());
+		assertArrayEquals(expected.hostnames(), actual.hostnames());
+		assertEquals(expected.getReaderPosition(), actual.getReaderPosition());
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceSplitStateTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceSplitStateTest.java
new file mode 100644
index 0000000..95c704a
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceSplitStateTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src;
+
+import org.apache.flink.connector.file.src.util.CheckpointedPosition;
+import org.apache.flink.core.fs.Path;
+
+import org.junit.Test;
+
+import java.util.Optional;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unit tests for the {@link FileSourceSplitState}.
+ */
+public class FileSourceSplitStateTest {
+
+	@Test
+	public void testRoundTripWithoutModification() {
+		final FileSourceSplit split = getTestSplit();
+		final FileSourceSplitState state = new FileSourceSplitState(split);
+
+		final FileSourceSplit resultSplit = state.toFileSourceSplit();
+
+		assertEquals(split.getReaderPosition(), resultSplit.getReaderPosition());
+	}
+
+	@Test
+	public void testStateStartsWithSplitValues() {
+		final FileSourceSplit split = getTestSplit(new CheckpointedPosition(123L, 456L));
+		final FileSourceSplitState state = new FileSourceSplitState(split);
+
+		assertEquals(123L, state.getOffset());
+		assertEquals(456L, state.getRecordsToSkipAfterOffset());
+	}
+
+	@Test
+	public void testNewSplitTakesModifiedOffsetAndCount() {
+		final FileSourceSplit split = getTestSplit();
+		final FileSourceSplitState state = new FileSourceSplitState(split);
+
+		state.setPosition(1234L, 7566L);
+		final Optional<CheckpointedPosition> position = state.toFileSourceSplit().getReaderPosition();
+
+		assertTrue(position.isPresent());
+		assertEquals(new CheckpointedPosition(1234L, 7566L), position.get());
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static FileSourceSplit getTestSplit() {
+		return getTestSplit(null);
+	}
+
+	private static FileSourceSplit getTestSplit(CheckpointedPosition position) {
+		return new FileSourceSplit(
+				"test-id",
+				new Path("file:/some/random/path"),
+				17,
+				121,
+				new String[] {"localhost"},
+				position);
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceSplitTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceSplitTest.java
new file mode 100644
index 0000000..e58ef6f
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceSplitTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src;
+
+import org.apache.flink.core.fs.Path;
+
+import org.junit.Test;
+
+/**
+ * Unit tests for the {@link FileSourceSplit}.
+ */
+public class FileSourceSplitTest {
+
+	@Test(expected = IllegalArgumentException.class)
+	public void noNullHostsAllowed() {
+		new FileSourceSplit("id", new Path("file:/some/random/path"), 0, 10, "host1", null, "host2");
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
new file mode 100644
index 0000000..8876e7e
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.connector.file.src.reader.TextLineFormat;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * MiniCluster-based integration test for the {@link FileSource}.
+ */
+public class FileSourceTextLinesITCase extends TestLogger {
+
+	private static final int PARALLELISM = 4;
+
+	@ClassRule
+	public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
+
+	@ClassRule
+	public static final MiniClusterWithClientResource MINI_CLUSTER = new MiniClusterWithClientResource(
+		new MiniClusterResourceConfiguration.Builder()
+			.setNumberTaskManagers(1)
+			.setNumberSlotsPerTaskManager(PARALLELISM)
+			.build());
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * This test runs a job reading bounded input with a stream record format (text lines).
+	 */
+	@Test
+	public void testBoundedTextFileSource() throws Exception {
+		final File testDir = TMP_FOLDER.newFolder();
+
+		// our main test data
+		writeAllFiles(testDir);
+
+		// write some junk to hidden files test that common hidden file patterns are filtered by default
+		writeHiddenJunkFiles(testDir);
+
+		final FileSource<String> source = FileSource
+				.forRecordStreamFormat(new TextLineFormat(), Path.fromLocalFile(testDir))
+				.build();
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(PARALLELISM);
+
+		final DataStream<String> stream = env.fromSource(
+				source,
+				WatermarkStrategy.noWatermarks(),
+				"file-source");
+
+		final List<String> result = DataStreamUtils.collectBoundedStream(stream, "Bounded TextFiles Test");
+
+		verifyResult(result);
+	}
+
+	/**
+	 * This test runs a job reading continuous input (files appearing over time)
+	 * with a stream record format (text lines).
+	 */
+	@Test
+	public void testContinuousTextFileSource() throws Exception {
+		final File testDir = TMP_FOLDER.newFolder();
+
+		final FileSource<String> source = FileSource
+				.forRecordStreamFormat(new TextLineFormat(), Path.fromLocalFile(testDir))
+				.monitorContinuously(Duration.ofMillis(5))
+				.build();
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(PARALLELISM);
+
+		final DataStream<String> stream = env.fromSource(
+				source,
+				WatermarkStrategy.noWatermarks(),
+				"file-source");
+
+		final DataStreamUtils.ClientAndIterator<String> client =
+				DataStreamUtils.collectWithClient(stream, "Continuous TextFiles Monitoring Test");
+
+		// write one file, execute, and wait for its result
+		// that way we know that the application was running and the source has
+		// done its first chunk of work already
+
+		final int numLinesFirst = LINES_PER_FILE[0].length;
+		final int numLinesAfter = LINES.length - numLinesFirst;
+
+		writeFile(testDir, 0);
+		final List<String> result1 = DataStreamUtils.collectRecordsFromUnboundedStream(client, numLinesFirst);
+
+		// write the remaining files over time, after that collect the final result
+		for (int i = 1; i < LINES_PER_FILE.length; i++) {
+			Thread.sleep(10);
+			writeFile(testDir, i);
+		}
+
+		final List<String> result2 = DataStreamUtils.collectRecordsFromUnboundedStream(client, numLinesAfter);
+
+		// shut down the job, now that we have all the results we expected.
+		client.client.cancel().get();
+
+		result1.addAll(result2);
+		verifyResult(result1);
+	}
+
+	// ------------------------------------------------------------------------
+	//  verification
+	// ------------------------------------------------------------------------
+
+	private static void verifyResult(List<String> lines) {
+		final String[] expected = Arrays.copyOf(LINES, LINES.length);
+		final String[] actual = lines.toArray(new String[0]);
+
+		Arrays.sort(expected);
+		Arrays.sort(actual);
+
+		assertThat(actual, equalTo(expected));
+	}
+
+	// ------------------------------------------------------------------------
+	//  test data
+	// ------------------------------------------------------------------------
+
+	private static final String[] FILE_PATHS = new String[] {
+			"text.2",
+			"nested1/text.1",
+			"text.1",
+			"text.3",
+			"nested2/nested21/text",
+			"nested1/text.2",
+			"nested2/text"};
+
+	private static final String[] HIDDEN_JUNK_PATHS = new String[] {
+			// all file names here start with '.' or '_'
+			"_something",
+			".junk",
+			"nested1/.somefile",
+			"othernested/_ignoredfile",
+			"_nested/file",
+			"nested1/.intermediate/somefile"};
+
+	private static final String[] LINES = new String[] {
+		"To be, or not to be,--that is the question:--",
+		"Whether 'tis nobler in the mind to suffer",
+		"The slings and arrows of outrageous fortune",
+		"Or to take arms against a sea of troubles,",
+		"And by opposing end them?--To die,--to sleep,--",
+		"No more; and by a sleep to say we end",
+		"The heartache, and the thousand natural shocks",
+		"That flesh is heir to,--'tis a consummation",
+		"Devoutly to be wish'd. To die,--to sleep;--",
+		"To sleep! perchance to dream:--ay, there's the rub;",
+		"For in that sleep of death what dreams may come,",
+		"When we have shuffled off this mortal coil,",
+		"Must give us pause: there's the respect",
+		"That makes calamity of so long life;",
+		"For who would bear the whips and scorns of time,",
+		"The oppressor's wrong, the proud man's contumely,",
+		"The pangs of despis'd love, the law's delay,",
+		"The insolence of office, and the spurns",
+		"That patient merit of the unworthy takes,",
+		"When he himself might his quietus make",
+		"With a bare bodkin? who would these fardels bear,",
+		"To grunt and sweat under a weary life,",
+		"But that the dread of something after death,--",
+		"The undiscover'd country, from whose bourn",
+		"No traveller returns,--puzzles the will,",
+		"And makes us rather bear those ills we have",
+		"Than fly to others that we know not of?",
+		"Thus conscience does make cowards of us all;",
+		"And thus the native hue of resolution",
+		"Is sicklied o'er with the pale cast of thought;",
+		"And enterprises of great pith and moment,",
+		"With this regard, their currents turn awry,",
+		"And lose the name of action.--Soft you now!",
+		"The fair Ophelia!--Nymph, in thy orisons",
+		"Be all my sins remember'd."
+	};
+
+	private static final String[][] LINES_PER_FILE = splitLinesForFiles();
+
+	private static String[][] splitLinesForFiles() {
+		final String[][] result = new String[FILE_PATHS.length][];
+
+		final int linesPerFile = LINES.length / FILE_PATHS.length;
+		final int linesForLastFile = LINES.length - ((FILE_PATHS.length - 1) * linesPerFile);
+
+		int pos = 0;
+		for (int i = 0; i < FILE_PATHS.length - 1; i++) {
+			String[] lines = new String[linesPerFile];
+			result[i] = lines;
+			for (int k = 0; k < lines.length; k++) {
+				lines[k] = LINES[pos++];
+			}
+		}
+		String[] lines = new String[linesForLastFile];
+		result[result.length - 1] = lines;
+		for (int k = 0; k < lines.length; k++) {
+			lines[k] = LINES[pos++];
+		}
+		return result;
+	}
+
+	private static void writeFile(File testDir, int num) throws IOException {
+		final File file = new File(testDir, FILE_PATHS[num]);
+		final File parent = file.getParentFile();
+		assertTrue(parent.mkdirs() || parent.exists());
+
+		try (PrintWriter writer = new PrintWriter(new FileWriter(file))) {
+			for (String line : LINES_PER_FILE[num]) {
+				writer.println(line);
+			}
+		}
+	}
+
+	private static void writeAllFiles(File testDir) throws IOException {
+		for (int i = 0; i < FILE_PATHS.length; i++) {
+			writeFile(testDir, i);
+		}
+	}
+
+	private static void writeHiddenJunkFiles(File testDir) throws IOException {
+		for (String junkPath : HIDDEN_JUNK_PATHS) {
+			final File file = new File(testDir, junkPath);
+			final File parent = file.getParentFile();
+			assertTrue(parent.mkdirs() || parent.exists());
+
+			try (PrintWriter writer = new PrintWriter(new FileWriter(file))) {
+				writer.println("This should not end up in the test result.");
+				writer.println("Foo bar bazzl junk");
+			}
+		}
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/PendingSplitsCheckpointSerializerTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/PendingSplitsCheckpointSerializerTest.java
new file mode 100644
index 0000000..bc89dce
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/PendingSplitsCheckpointSerializerTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.SimpleVersionedSerialization;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.function.BiConsumer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+
+/**
+ * Unit tests for the {@link FileSourceSplitSerializer}.
+ */
+public class PendingSplitsCheckpointSerializerTest {
+
+	@Test
+	public void serializeEmptyCheckpoint() throws Exception {
+		final PendingSplitsCheckpoint checkpoint =
+				PendingSplitsCheckpoint.fromCollectionSnapshot(Collections.emptyList());
+
+		final PendingSplitsCheckpoint deSerialized = serializeAndDeserialize(checkpoint);
+
+		assertCheckpointsEqual(checkpoint, deSerialized);
+	}
+
+	@Test
+	public void serializeSomeSplits() throws Exception {
+		final PendingSplitsCheckpoint checkpoint = PendingSplitsCheckpoint.fromCollectionSnapshot(
+				Arrays.asList(testSplit1(), testSplit2(), testSplit3()));
+
+		final PendingSplitsCheckpoint deSerialized = serializeAndDeserialize(checkpoint);
+
+		assertCheckpointsEqual(checkpoint, deSerialized);
+	}
+
+	@Test
+	public void serializeSplitsAndProcessedPaths() throws Exception {
+		final PendingSplitsCheckpoint checkpoint = PendingSplitsCheckpoint.fromCollectionSnapshot(
+				Arrays.asList(testSplit1(), testSplit2(), testSplit3()),
+				Arrays.asList(new Path("file:/some/path"), new Path("s3://bucket/key/and/path"), new Path("hdfs://namenode:12345/path")));
+
+		final PendingSplitsCheckpoint deSerialized = serializeAndDeserialize(checkpoint);
+
+		assertCheckpointsEqual(checkpoint, deSerialized);
+	}
+
+	@Test
+	public void repeatedSerialization() throws Exception {
+		final PendingSplitsCheckpoint checkpoint = PendingSplitsCheckpoint.fromCollectionSnapshot(
+			Arrays.asList(testSplit3(), testSplit1()));
+
+		serializeAndDeserialize(checkpoint);
+		serializeAndDeserialize(checkpoint);
+		final PendingSplitsCheckpoint deSerialized = serializeAndDeserialize(checkpoint);
+
+		assertCheckpointsEqual(checkpoint, deSerialized);
+	}
+
+	@Test
+	public void repeatedSerializationCaches() throws Exception {
+		final PendingSplitsCheckpoint checkpoint = PendingSplitsCheckpoint.fromCollectionSnapshot(
+				Collections.singletonList(testSplit2()));
+
+		final byte[] ser1 = PendingSplitsCheckpointSerializer.INSTANCE.serialize(checkpoint);
+		final byte[] ser2 = PendingSplitsCheckpointSerializer.INSTANCE.serialize(checkpoint);
+
+		assertSame(ser1, ser2);
+	}
+
+	// ------------------------------------------------------------------------
+	//  test utils
+	// ------------------------------------------------------------------------
+
+	private static FileSourceSplit testSplit1() {
+		return new FileSourceSplit(
+				"random-id",
+				new Path("hdfs://namenode:14565/some/path/to/a/file"),
+				100_000_000,
+				64_000_000,
+				"host1", "host2", "host3");
+	}
+
+	private static FileSourceSplit testSplit2() {
+		return new FileSourceSplit(
+				"some-id",
+				new Path("file:/some/path/to/a/file"),
+				0,
+				0);
+	}
+
+	private static FileSourceSplit testSplit3() {
+		return new FileSourceSplit(
+				"an-id",
+				new Path("s3://some-bucket/key/to/the/object"),
+				0,
+				1234567);
+	}
+
+	private static PendingSplitsCheckpoint serializeAndDeserialize(PendingSplitsCheckpoint split) throws IOException {
+		final PendingSplitsCheckpointSerializer serializer = new PendingSplitsCheckpointSerializer();
+		final byte[] bytes = SimpleVersionedSerialization.writeVersionAndSerialize(serializer, split);
+		return SimpleVersionedSerialization.readVersionAndDeSerialize(serializer, bytes);
+	}
+
+	private static void assertCheckpointsEqual(PendingSplitsCheckpoint expected, PendingSplitsCheckpoint actual) {
+		assertOrderedCollectionEquals(expected.getSplits(), actual.getSplits(), FileSourceSplitSerializerTest::assertSplitsEqual);
+
+		assertOrderedCollectionEquals(
+				expected.getAlreadyProcessedPaths(), actual.getAlreadyProcessedPaths(), Assert::assertEquals);
+	}
+
+	private static <E> void assertOrderedCollectionEquals(
+			Collection<E> expected,
+			Collection<E> actual,
+			BiConsumer<E, E> equalityAsserter) {
+
+		assertEquals(expected.size(), actual.size());
+		final Iterator<E> expectedIter = expected.iterator();
+		final Iterator<E> actualIter = actual.iterator();
+		while (expectedIter.hasNext()) {
+			equalityAsserter.accept(expectedIter.next(), actualIter.next());
+		}
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/enumerate/BlockSplittingRecursiveEnumeratorTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/enumerate/BlockSplittingRecursiveEnumeratorTest.java
new file mode 100644
index 0000000..24d0414
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/enumerate/BlockSplittingRecursiveEnumeratorTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.enumerate;
+
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.testutils.TestingFileSystem;
+import org.apache.flink.core.fs.Path;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+/**
+ * Unit tests for the {@link BlockSplittingRecursiveEnumerator}.
+ */
+public class BlockSplittingRecursiveEnumeratorTest extends NonSplittingRecursiveEnumeratorTest {
+
+	@Test
+	@Override
+	public void testFileWithMultipleBlocks() throws Exception {
+		final Path testPath = new Path("testfs:///dir/file");
+		testFs = TestingFileSystem.createForFileStatus(
+				"testfs",
+				TestingFileSystem.TestFileStatus.forFileWithBlocks(testPath, 1000L,
+						new TestingFileSystem.TestBlockLocation(0L, 100L, "host1", "host2"),
+						new TestingFileSystem.TestBlockLocation(100L, 520L, "host2", "host3"),
+						new TestingFileSystem.TestBlockLocation(620L, 380L, "host3", "host4")));
+		testFs.register();
+
+		final BlockSplittingRecursiveEnumerator enumerator = createEnumerator();
+		final Collection<FileSourceSplit> splits = enumerator.enumerateSplits(
+			new Path[] { new Path("testfs:///dir")}, 0);
+
+		final Collection<FileSourceSplit> expected = Arrays.asList(
+				new FileSourceSplit("ignoredId", testPath, 0L, 100L, "host1", "host2"),
+				new FileSourceSplit("ignoredId", testPath, 100L, 520L, "host1", "host2"),
+				new FileSourceSplit("ignoredId", testPath, 620L, 380L, "host1", "host2"));
+
+		assertSplitsEqual(expected, splits);
+	}
+
+	protected BlockSplittingRecursiveEnumerator createEnumerator() {
+		return new BlockSplittingRecursiveEnumerator();
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/enumerate/NonSplittingRecursiveEnumeratorTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/enumerate/NonSplittingRecursiveEnumeratorTest.java
new file mode 100644
index 0000000..9837975
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/enumerate/NonSplittingRecursiveEnumeratorTest.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.enumerate;
+
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.testutils.TestingFileSystem;
+import org.apache.flink.core.fs.Path;
+
+import org.junit.After;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Unit tests for the {@link NonSplittingRecursiveEnumerator}.
+ */
+public class NonSplittingRecursiveEnumeratorTest {
+
+	/** Testing file system reference, to be cleaned up in an @After method. That way it also gets
+	 * cleaned up on a test failure, without needing finally clauses in every test. */
+	protected TestingFileSystem testFs;
+
+	@After
+	public void unregisterTestFs() throws Exception {
+		if (testFs != null) {
+			testFs.unregister();
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testIncludeFilesFromNestedDirectories() throws Exception {
+		final Path[] testPaths = new Path[] {
+				new Path("testfs:///dir/file1"),
+				new Path("testfs:///dir/nested/file.out"),
+				new Path("testfs:///dir/nested/anotherfile.txt")};
+		testFs = TestingFileSystem.createWithFiles("testfs", testPaths);
+		testFs.register();
+
+		final NonSplittingRecursiveEnumerator enumerator = createEnumerator();
+		final Collection<FileSourceSplit> splits = enumerator.enumerateSplits(
+				new Path[] { new Path("testfs:///dir")}, 1);
+
+		assertThat(toPaths(splits), containsInAnyOrder(testPaths));
+	}
+
+	@Test
+	public void testDefaultHiddenFilesFilter() throws Exception {
+		final Path[] testPaths = new Path[] {
+				new Path("testfs:///visiblefile"),
+				new Path("testfs:///.hiddenfile1"),
+				new Path("testfs:///_hiddenfile2")};
+		testFs = TestingFileSystem.createWithFiles("testfs", testPaths);
+		testFs.register();
+
+		final NonSplittingRecursiveEnumerator enumerator = createEnumerator();
+		final Collection<FileSourceSplit> splits = enumerator.enumerateSplits(
+				new Path[] { new Path("testfs:///")}, 1);
+
+		assertEquals(Collections.singletonList(new Path("testfs:///visiblefile")), toPaths(splits));
+	}
+
+	@Test
+	public void testHiddenDirectories() throws Exception {
+		final Path[] testPaths = new Path[] {
+				new Path("testfs:///dir/visiblefile"),
+				new Path("testfs:///dir/.hiddendir/file"),
+				new Path("testfs:///_notvisible/afile")};
+		testFs = TestingFileSystem.createWithFiles("testfs", testPaths);
+		testFs.register();
+
+		final NonSplittingRecursiveEnumerator enumerator = createEnumerator();
+		final Collection<FileSourceSplit> splits = enumerator.enumerateSplits(
+				new Path[] { new Path("testfs:///")}, 1);
+
+		assertEquals(Collections.singletonList(new Path("testfs:///dir/visiblefile")), toPaths(splits));
+	}
+
+	@Test
+	public void testFilesWithNoBlockInfo() throws Exception {
+		final Path testPath = new Path("testfs:///dir/file1");
+		testFs = TestingFileSystem.createForFileStatus(
+				"testfs",
+				TestingFileSystem.TestFileStatus.forFileWithBlocks(testPath, 12345L));
+		testFs.register();
+
+		final NonSplittingRecursiveEnumerator enumerator = createEnumerator();
+		final Collection<FileSourceSplit> splits = enumerator.enumerateSplits(
+				new Path[] { new Path("testfs:///dir")}, 0);
+
+		assertEquals(1, splits.size());
+		assertSplitsEqual(
+				new FileSourceSplit("ignoredId", testPath, 0L, 12345L),
+				splits.iterator().next());
+	}
+
+	@Test
+	public void testFileWithIncorrectBlocks() throws Exception {
+		final Path testPath = new Path("testfs:///testdir/testfile");
+
+		testFs = TestingFileSystem.createForFileStatus(
+				"testfs",
+				TestingFileSystem.TestFileStatus.forFileWithBlocks(testPath, 10000L,
+						new TestingFileSystem.TestBlockLocation(0L, 1000L),
+						new TestingFileSystem.TestBlockLocation(2000L, 1000L)));
+		testFs.register();
+
+		final NonSplittingRecursiveEnumerator enumerator = createEnumerator();
+		final Collection<FileSourceSplit> splits = enumerator.enumerateSplits(
+				new Path[] { new Path("testfs:///testdir")}, 0);
+
+		assertEquals(1, splits.size());
+		assertSplitsEqual(
+				new FileSourceSplit("ignoredId", testPath, 0L, 10000L),
+				splits.iterator().next());
+	}
+
+	@Test
+	public void testFileWithMultipleBlocks() throws Exception {
+		final Path testPath = new Path("testfs:///dir/file");
+		testFs = TestingFileSystem.createForFileStatus(
+				"testfs",
+				TestingFileSystem.TestFileStatus.forFileWithBlocks(testPath, 1000L,
+						new TestingFileSystem.TestBlockLocation(0L, 100L, "host1", "host2"),
+						new TestingFileSystem.TestBlockLocation(100L, 520L, "host2", "host3"),
+						new TestingFileSystem.TestBlockLocation(620L, 380L, "host3", "host4")));
+		testFs.register();
+
+		final NonSplittingRecursiveEnumerator enumerator = createEnumerator();
+		final Collection<FileSourceSplit> splits = enumerator.enumerateSplits(
+				new Path[] { new Path("testfs:///dir")}, 0);
+
+		assertSplitsEqual(
+				new FileSourceSplit("ignoredId", testPath, 0L, 1000L, "host1", "host2", "host3", "host4"),
+				splits.iterator().next());
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * The instantiation of the enumerator is overridable so that we can reuse these tests for sub-classes.
+	 */
+	protected NonSplittingRecursiveEnumerator createEnumerator() {
+		return new NonSplittingRecursiveEnumerator();
+	}
+
+	// ------------------------------------------------------------------------
+
+	protected static void assertSplitsEqual(final FileSourceSplit expected, final FileSourceSplit actual) {
+		assertEquals(expected.path(), actual.path());
+		assertEquals(expected.offset(), actual.offset());
+		assertEquals(expected.length(), actual.length());
+		assertArrayEquals(expected.hostnames(), actual.hostnames());
+	}
+
+	protected static void assertSplitsEqual(
+			final Collection<FileSourceSplit> expected,
+			final Collection<FileSourceSplit> actual) {
+
+		assertEquals(expected.size(), actual.size());
+
+		final ArrayList<FileSourceSplit> expectedCopy = new ArrayList<>(expected);
+		final ArrayList<FileSourceSplit> actualCopy = new ArrayList<>(expected);
+		expectedCopy.sort(NonSplittingRecursiveEnumeratorTest::compareFileSourceSplit);
+		actualCopy.sort(NonSplittingRecursiveEnumeratorTest::compareFileSourceSplit);
+
+		for (int i = 0; i < expectedCopy.size(); i++) {
+			assertSplitsEqual(expectedCopy.get(i), actualCopy.get(i));
+		}
+	}
+
+	protected static Collection<Path> toPaths(Collection<FileSourceSplit> splits) {
+		return splits.stream().map(FileSourceSplit::path).collect(Collectors.toList());
+	}
+
+	private static int compareFileSourceSplit(FileSourceSplit a, FileSourceSplit b) {
+		return Long.compare(a.offset(), b.offset());
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/AdapterTestBase.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/AdapterTestBase.java
new file mode 100644
index 0000000..ea59379
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/AdapterTestBase.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.impl;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connector.file.src.reader.StreamFormat;
+import org.apache.flink.connector.file.src.testutils.TestingFileSystem;
+import org.apache.flink.connector.file.src.util.CheckpointedPosition;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.Path;
+
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Queue;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Base class for adapters, as used by {@link StreamFormatAdapterTest} and {@link FileRecordFormatAdapterTest}.
+ */
+public abstract class AdapterTestBase<FormatT> {
+
+	@ClassRule
+	public static final TemporaryFolder TMP_DIR = new TemporaryFolder();
+
+	protected static final int NUM_NUMBERS = 100;
+	protected static final long FILE_LEN = 4 * NUM_NUMBERS;
+
+	protected static Path testPath;
+
+	@BeforeClass
+	public static void writeTestFile() throws IOException {
+		final File testFile = new File(TMP_DIR.getRoot(), "testFile");
+		testPath = Path.fromLocalFile(testFile);
+
+		try (DataOutputStream out = new DataOutputStream(new FileOutputStream(testFile))) {
+			for (int i = 0; i < NUM_NUMBERS; i++) {
+				out.writeInt(i);
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  format specific instantiation
+	// ------------------------------------------------------------------------
+
+	protected abstract FormatT createCheckpointedFormat();
+
+	protected abstract FormatT createNonCheckpointedFormat();
+
+	protected abstract FormatT createFormatFailingInInstantiation();
+
+	protected abstract BulkFormat<Integer> wrapWithAdapter(FormatT format);
+
+	// ------------------------------------------------------------------------
+	//  shared tests
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testRecoverCheckpointedFormatOneSplit() throws IOException {
+		testReading(createCheckpointedFormat(), 1, 5, 44);
+	}
+
+	@Test
+	public void testRecoverCheckpointedFormatMultipleSplits() throws IOException {
+		testReading(createCheckpointedFormat(), 3, 11, 33, 56);
+	}
+
+	@Test
+	public void testRecoverNonCheckpointedFormatOneSplit() throws IOException {
+		testReading(createNonCheckpointedFormat(), 1, 5, 44);
+	}
+
+	private void testReading(FormatT format, int numSplits, int... recoverAfterRecords) throws IOException {
+		// add the end boundary for recovery
+		final int[] boundaries = Arrays.copyOf(recoverAfterRecords, recoverAfterRecords.length + 1);
+		boundaries[boundaries.length - 1] = NUM_NUMBERS;
+
+		// set a fetch size so that we get three records per fetch
+		final Configuration config = new Configuration();
+		config.set(StreamFormat.FETCH_IO_SIZE, new MemorySize(10));
+
+		final BulkFormat<Integer> adapter = wrapWithAdapter(format);
+		final Queue<FileSourceSplit> splits = buildSplits(numSplits);
+		final List<Integer> result = new ArrayList<>();
+
+		FileSourceSplit currentSplit = null;
+		BulkFormat.Reader<Integer> currentReader = null;
+
+		for (int nextRecordToRecover : boundaries) {
+			final Tuple2<FileSourceSplit, CheckpointedPosition> toRecoverFrom = readNumbers(
+				currentReader, currentSplit,
+				adapter, splits, config,
+				result,
+				nextRecordToRecover - result.size());
+
+			currentSplit = toRecoverFrom.f0;
+			currentReader = adapter.restoreReader(config, currentSplit.path(), currentSplit.offset(), currentSplit.length(), toRecoverFrom.f1);
+		}
+
+		verifyIntListResult(result);
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testClosesStreamIfReaderCreationFails() throws Exception {
+		// setup
+		final Path testPath = new Path("testFs:///testpath-1");
+		final CloseTestingInputStream in = new CloseTestingInputStream();
+		final TestingFileSystem testFs = TestingFileSystem.createForFileStatus("testFs",
+				TestingFileSystem.TestFileStatus.forFileWithStream(testPath, 1024, in));
+		testFs.register();
+
+		// test
+		final BulkFormat<Integer> adapter = wrapWithAdapter(createFormatFailingInInstantiation());
+		try {
+			adapter.createReader(new Configuration(), testPath, 0, 1024);
+		} catch (IOException ignored) {}
+
+		// assertions
+		assertTrue(in.closed);
+
+		// cleanup
+		testFs.unregister();
+	}
+
+	@Test
+	public void testClosesStreamIfReaderRestoreFails() throws Exception {
+		// setup
+		final Path testPath = new Path("testFs:///testpath-1");
+		final CloseTestingInputStream in = new CloseTestingInputStream();
+		final TestingFileSystem testFs = TestingFileSystem.createForFileStatus("testFs",
+			TestingFileSystem.TestFileStatus.forFileWithStream(testPath, 1024, in));
+		testFs.register();
+
+		// test
+		final BulkFormat<Integer> adapter = wrapWithAdapter(createFormatFailingInInstantiation());
+		try {
+			adapter.restoreReader(
+					new Configuration(), testPath, 0, 1024,
+					new CheckpointedPosition(0L, 5L));
+		} catch (IOException ignored) {}
+
+		// assertions
+		assertTrue(in.closed);
+
+		// cleanup
+		testFs.unregister();
+	}
+
+	// ------------------------------------------------------------------------
+	//  test helpers
+	// ------------------------------------------------------------------------
+
+	protected static void verifyIntListResult(List<Integer> result) {
+		assertEquals("wrong result size", NUM_NUMBERS, result.size());
+		int nextExpected = 0;
+		for (int next : result) {
+			if (next != nextExpected++) {
+				fail("Wrong result: " + result);
+			}
+		}
+	}
+
+	protected static void readNumbers(BulkFormat.Reader<Integer> reader, List<Integer> result, int num) throws IOException {
+		readNumbers(reader, null, null, null, null, result, num);
+	}
+
+	protected static Tuple2<FileSourceSplit, CheckpointedPosition> readNumbers(
+			BulkFormat.Reader<Integer> currentReader,
+			FileSourceSplit currentSplit,
+			BulkFormat<Integer> format,
+			Queue<FileSourceSplit> moreSplits,
+			Configuration config,
+			List<Integer> result,
+			int num) throws IOException {
+
+		long offset = Long.MIN_VALUE;
+		long skip = Long.MIN_VALUE;
+
+		// loop across splits
+		while (num > 0) {
+			if (currentReader == null) {
+				currentSplit = moreSplits.poll();
+				assertNotNull(currentSplit);
+				currentReader = format.createReader(config, currentSplit.path(), currentSplit.offset(), currentSplit.length());
+			}
+
+			// loop across batches
+			BulkFormat.RecordIterator<Integer> nextBatch;
+			while (num > 0 && (nextBatch = currentReader.readBatch()) != null) {
+
+				// loop across record in batch
+				RecordAndPosition<Integer> next;
+				while (num > 0 && (next = nextBatch.next()) != null) {
+					num--;
+					result.add(next.getRecord());
+					offset = next.getOffset();
+					skip = next.getRecordSkipCount();
+				}
+			}
+
+			currentReader.close();
+			currentReader = null;
+		}
+
+		return new Tuple2<>(currentSplit, new CheckpointedPosition(offset, skip));
+	}
+
+	static Queue<FileSourceSplit> buildSplits(int numSplits) {
+		final Queue<FileSourceSplit>  splits = new ArrayDeque<>();
+		final long rangeForSplit = FILE_LEN / numSplits;
+
+		for (int i = 0; i < numSplits - 1; i++) {
+			splits.add(new FileSourceSplit("ID-" + i, testPath, i * rangeForSplit, rangeForSplit));
+		}
+		final long startOfLast = (numSplits - 1) * rangeForSplit;
+		splits.add(new FileSourceSplit("ID-" + (numSplits - 1), testPath, startOfLast, FILE_LEN - startOfLast));
+		return splits;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Test Mocks and Stubs
+	// ------------------------------------------------------------------------
+
+	private static class CloseTestingInputStream extends FSDataInputStream {
+
+		boolean closed;
+
+		@Override
+		public void seek(long desired) throws IOException {}
+
+		@Override
+		public long getPos() throws IOException {
+			return 0;
+		}
+
+		@Override
+		public int read() throws IOException {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public void close() throws IOException {
+			closed = true;
+		}
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/FileRecordFormatAdapterTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/FileRecordFormatAdapterTest.java
new file mode 100644
index 0000000..779f38e
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/FileRecordFormatAdapterTest.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.impl;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connector.file.src.reader.FileRecordFormat;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Unit and behavior tests for the {@link FileRecordFormatAdapter}.
+ */
+@SuppressWarnings("serial")
+public class FileRecordFormatAdapterTest extends AdapterTestBase<FileRecordFormat<Integer>> {
+
+	@Override
+	protected FileRecordFormat<Integer> createCheckpointedFormat() {
+		return new IntFileRecordFormat(true);
+	}
+
+	@Override
+	protected FileRecordFormat<Integer> createNonCheckpointedFormat() {
+		return new IntFileRecordFormat(false);
+	}
+
+	@Override
+	protected FileRecordFormat<Integer> createFormatFailingInInstantiation() {
+		return new FailingInstantiationFormat();
+	}
+
+	@Override
+	protected BulkFormat<Integer> wrapWithAdapter(FileRecordFormat<Integer> format) {
+		return new FileRecordFormatAdapter<>(format);
+	}
+
+	// ------------------------------------------------------------------------
+	//  not applicable tests
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void testClosesStreamIfReaderCreationFails() throws Exception {
+		// ignore this test
+	}
+
+
+	// ------------------------------------------------------------------------
+	//  test mocks
+	// ------------------------------------------------------------------------
+
+	private static final class IntFileRecordFormat implements FileRecordFormat<Integer> {
+
+		private final boolean checkpointed;
+
+		IntFileRecordFormat(boolean checkpointed) {
+			this.checkpointed = checkpointed;
+		}
+
+		@Override
+		public Reader<Integer> createReader(
+				Configuration config,
+				Path filePath,
+				long splitOffset,
+				long splitLength) throws IOException {
+
+			final FileSystem fs = filePath.getFileSystem();
+			final FileStatus status = fs.getFileStatus(filePath);
+			final FSDataInputStream in = fs.open(filePath);
+
+			final long fileLen = status.getLen();
+			final long splitEnd = splitOffset + splitLength;
+
+			assertEquals("invalid file length", 0, fileLen % 4);
+
+			// round all positions to the next integer boundary
+			// to simulate common split behavior, we round up to the next int boundary even when we
+			// are at a perfect boundary. exceptions are if we are start or end.
+			final long start = splitOffset == 0L ? 0L : splitOffset + 4 - splitOffset % 4;
+			final long end = splitEnd == fileLen ? fileLen : splitEnd + 4 - splitEnd % 4;
+			in.seek(start);
+
+			return new TestIntReader(in, end, checkpointed);
+		}
+
+		@Override
+		public Reader<Integer> restoreReader(
+				Configuration config,
+				Path filePath,
+				long restoredOffset,
+				long splitOffset,
+				long splitLength) throws IOException {
+
+			final FileSystem fs = filePath.getFileSystem();
+			final FileStatus status = fs.getFileStatus(filePath);
+			final FSDataInputStream in = fs.open(filePath);
+
+			final long fileLen = status.getLen();
+			final long splitEnd = splitOffset + splitLength;
+
+			assertEquals("invalid file length", 0, fileLen % 4);
+
+			// round end position to the next integer boundary
+			final long end = splitEnd == fileLen ? fileLen : splitEnd + 4 - splitEnd % 4;
+			// no rounding of checkpointed offset
+			in.seek(restoredOffset);
+			return new TestIntReader(in, end, checkpointed);
+		}
+
+		@Override
+		public boolean isSplittable() {
+			return true;
+		}
+
+		@Override
+		public TypeInformation<Integer> getProducedType() {
+			return Types.INT;
+		}
+	}
+
+	private static final class FailingInstantiationFormat implements FileRecordFormat<Integer> {
+
+		@Override
+		public Reader<Integer> createReader(
+				Configuration config,
+				Path filePath,
+				long splitOffset,
+				long splitLength) throws IOException {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public Reader<Integer> restoreReader(
+				Configuration config,
+				Path filePath,
+				long restoredOffset,
+				long splitOffset,
+				long splitLength) throws IOException {
+
+			final FSDataInputStream in = filePath.getFileSystem().open(filePath);
+			return new FailingReader(in);
+		}
+
+		@Override
+		public boolean isSplittable() {
+			return false;
+		}
+
+		@Override
+		public TypeInformation<Integer> getProducedType() {
+			return Types.INT;
+		}
+
+		private static final class FailingReader implements FileRecordFormat.Reader<Integer> {
+
+			private final FSDataInputStream stream;
+
+			FailingReader(FSDataInputStream stream) {
+				this.stream = stream;
+			}
+
+			@Nullable
+			@Override
+			public Integer read() throws IOException {
+				throw new IOException("test exception");
+			}
+
+			@Override
+			public void close() throws IOException {
+				stream.close();
+			}
+		}
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/FileRecordsTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/FileRecordsTest.java
new file mode 100644
index 0000000..42bc443
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/FileRecordsTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.impl;
+
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+import org.apache.flink.connector.file.src.util.SingletonResultIterator;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unit tests for the {@link FileRecords} class.
+ */
+public class FileRecordsTest {
+
+	@Test
+	public void testEmptySplits() {
+		final String split = "empty";
+		final FileRecords<Object> records = FileRecords.finishedSplit(split);
+
+		assertEquals(Collections.singleton(split), records.finishedSplits());
+	}
+
+	@Test
+	public void testMoveToFirstSplit() {
+		final String splitId = "splitId";
+		final FileRecords<Object> records = FileRecords.forRecords(splitId, new SingletonResultIterator<>());
+
+		final String firstSplitId = records.nextSplit();
+
+		assertEquals(splitId, firstSplitId);
+	}
+
+	@Test
+	public void testMoveToSecondSplit() {
+		final FileRecords<Object> records = FileRecords.forRecords("splitId", new SingletonResultIterator<>());
+		records.nextSplit();
+
+		final String secondSplitId = records.nextSplit();
+
+		assertNull(secondSplitId);
+	}
+
+	@Test
+	public void testRecordsFromFirstSplit() {
+		final SingletonResultIterator<String> iter = new SingletonResultIterator<>();
+		iter.set("test", 18, 99);
+		final FileRecords<String> records = FileRecords.forRecords("splitId", iter);
+		records.nextSplit();
+
+		final RecordAndPosition<String> recAndPos = records.nextRecordFromSplit();
+
+		assertEquals("test", recAndPos.getRecord());
+		assertEquals(18, recAndPos.getOffset());
+		assertEquals(99, recAndPos.getRecordSkipCount());
+	}
+
+	@Test(expected = IllegalStateException.class)
+	public void testRecordsInitiallyIllegal() {
+		final FileRecords<Object> records = FileRecords.forRecords("splitId", new SingletonResultIterator<>());
+
+		records.nextRecordFromSplit();
+	}
+
+	@Test(expected = IllegalStateException.class)
+	public void testRecordsOnSecondSplitIllegal() {
+		final FileRecords<Object> records = FileRecords.forRecords("splitId", new SingletonResultIterator<>());
+		records.nextSplit();
+		records.nextSplit();
+
+		records.nextRecordFromSplit();
+	}
+
+	@Test
+	public void testRecycleExhaustedBatch() {
+		final AtomicBoolean recycled = new AtomicBoolean(false);
+		final SingletonResultIterator<Object> iter = new SingletonResultIterator<>(() -> recycled.set(true));
+		iter.set(new Object(), 1L, 2L);
+
+		final FileRecords<Object> records = FileRecords.forRecords("test split", iter);
+		records.nextSplit();
+		records.nextRecordFromSplit();
+
+		// make sure we exhausted the iterator
+		assertNull(records.nextRecordFromSplit());
+		assertNull(records.nextSplit());
+
+		records.recycle();
+		assertTrue(recycled.get());
+	}
+
+	@Test
+	public void testRecycleNonExhaustedBatch() {
+		final AtomicBoolean recycled = new AtomicBoolean(false);
+		final SingletonResultIterator<Object> iter = new SingletonResultIterator<>(() -> recycled.set(true));
+		iter.set(new Object(), 1L, 2L);
+
+		final FileRecords<Object> records = FileRecords.forRecords("test split", iter);
+		records.nextSplit();
+
+		records.recycle();
+		assertTrue(recycled.get());
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/StreamFormatAdapterTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/StreamFormatAdapterTest.java
new file mode 100644
index 0000000..29e74b1
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/StreamFormatAdapterTest.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.impl;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connector.file.src.reader.SimpleStreamFormat;
+import org.apache.flink.connector.file.src.reader.StreamFormat;
+import org.apache.flink.core.fs.FSDataInputStream;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Unit and behavior tests for the {@link StreamFormatAdapter}.
+ */
+@SuppressWarnings("serial")
+public class StreamFormatAdapterTest extends AdapterTestBase<StreamFormat<Integer>> {
+
+	// ------------------------------------------------------------------------
+	//  Factories for Shared Tests
+	// ------------------------------------------------------------------------
+
+	@Override
+	protected StreamFormat<Integer> createCheckpointedFormat() {
+		return new CheckpointedIntFormat();
+	}
+
+	@Override
+	protected StreamFormat<Integer> createNonCheckpointedFormat() {
+		return new NonCheckpointedIntFormat();
+	}
+
+	@Override
+	protected StreamFormat<Integer> createFormatFailingInInstantiation() {
+		return new FailingInstantiationFormat();
+	}
+
+	@Override
+	protected BulkFormat<Integer> wrapWithAdapter(StreamFormat<Integer> format) {
+		return new StreamFormatAdapter<>(format);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Additional Unit Tests
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testReadSmallBatchSize() throws IOException {
+		simpleReadTest(1);
+	}
+
+	@Test
+	public void testBatchSizeMatchesOneRecord() throws IOException {
+		simpleReadTest(4);
+	}
+
+	@Test
+	public void testBatchSizeIsRecordMultiple() throws IOException {
+		simpleReadTest(20);
+	}
+
+	private void simpleReadTest(int batchSize) throws IOException {
+		final Configuration config = new Configuration();
+		config.set(StreamFormat.FETCH_IO_SIZE, new MemorySize(batchSize));
+		final StreamFormatAdapter<Integer> format = new StreamFormatAdapter<>(new CheckpointedIntFormat());
+		final BulkFormat.Reader<Integer> reader = format.createReader(config, testPath, 0L, FILE_LEN);
+
+		final List<Integer> result = new ArrayList<>();
+		readNumbers(reader, result, NUM_NUMBERS);
+
+		verifyIntListResult(result);
+	}
+
+	// ------------------------------------------------------------------------
+	//  test mocks
+	// ------------------------------------------------------------------------
+
+	private static final class CheckpointedIntFormat implements StreamFormat<Integer> {
+
+		@Override
+		public Reader<Integer> createReader(
+				Configuration config,
+				FSDataInputStream stream,
+				long fileLen,
+				long splitEnd) throws IOException {
+
+			assertEquals("invalid file length", 0, fileLen % 4);
+
+			// round all positions to the next integer boundary
+			// to simulate common split behavior, we round up to the next int boundary even when we
+			// are at a perfect boundary. exceptions are if we are start or end.
+			final long currPos = stream.getPos();
+			final long start = currPos == 0L ? 0L : currPos + 4 - currPos % 4;
+			final long end = splitEnd == fileLen ? fileLen : splitEnd + 4 - splitEnd % 4;
+			stream.seek(start);
+
+			return new TestIntReader(stream, end, true);
+		}
+
+		@Override
+		public Reader<Integer> restoreReader(
+				Configuration config,
+				FSDataInputStream stream,
+				long restoredOffset,
+				long fileLen,
+				long splitEnd) throws IOException {
+
+			assertEquals("invalid file length", 0, fileLen % 4);
+
+			// round end position to the next integer boundary
+			final long end = splitEnd == fileLen ? fileLen : splitEnd + 4 - splitEnd % 4;
+			// no rounding of checkpointed offset
+			stream.seek(restoredOffset);
+			return new TestIntReader(stream, end, true);
+		}
+
+		@Override
+		public boolean isSplittable() {
+			return true;
+		}
+
+		@Override
+		public TypeInformation<Integer> getProducedType() {
+			return Types.INT;
+		}
+	}
+
+	private static final class NonCheckpointedIntFormat extends SimpleStreamFormat<Integer> {
+
+		@Override
+		public Reader<Integer> createReader(Configuration config, FSDataInputStream stream) throws IOException {
+			return new TestIntReader(stream, Long.MAX_VALUE, false);
+		}
+
+		@Override
+		public TypeInformation<Integer> getProducedType() {
+			return Types.INT;
+		}
+	}
+
+	private static final class FailingInstantiationFormat implements StreamFormat<Integer> {
+
+		@Override
+		public Reader<Integer> createReader(
+				Configuration config,
+				FSDataInputStream stream,
+				long fileLen,
+				long splitEnd) throws IOException {
+			throw new IOException("test exception");
+		}
+
+		@Override
+		public Reader<Integer> restoreReader(
+				Configuration config,
+				FSDataInputStream stream,
+				long restoredOffset,
+				long fileLen,
+				long splitEnd) throws IOException {
+			throw new IOException("test exception");
+		}
+
+		@Override
+		public boolean isSplittable() {
+			return false;
+		}
+
+		@Override
+		public TypeInformation<Integer> getProducedType() {
+			return Types.INT;
+		}
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/TestIntReader.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/TestIntReader.java
new file mode 100644
index 0000000..b37e02e
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/TestIntReader.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.impl;
+
+import org.apache.flink.connector.file.src.reader.FileRecordFormat;
+import org.apache.flink.connector.file.src.reader.StreamFormat;
+import org.apache.flink.connector.file.src.util.CheckpointedPosition;
+import org.apache.flink.core.fs.FSDataInputStream;
+
+import javax.annotation.Nullable;
+
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+
+/**
+ * Simple reader for integers, that is both a {@link StreamFormat.Reader} and a {@link FileRecordFormat.Reader}.
+ */
+class TestIntReader implements StreamFormat.Reader<Integer>, FileRecordFormat.Reader<Integer> {
+
+	private static final int SKIPS_PER_OFFSET = 7;
+
+	private final FSDataInputStream in;
+	private final DataInputStream din;
+
+	private final long endOffset;
+	private long currentOffset;
+	private long currentSkipCount;
+
+	private final boolean checkpointed;
+
+	TestIntReader(FSDataInputStream in, long endOffset, boolean checkpointsPosition) throws IOException {
+		this.in = in;
+		this.endOffset = endOffset;
+		this.currentOffset = in.getPos();
+		this.din = new DataInputStream(in);
+		this.checkpointed = checkpointsPosition;
+	}
+
+	@Nullable
+	@Override
+	public Integer read() throws IOException {
+		if (in.getPos() >= endOffset) {
+			return null;
+		}
+
+		try {
+			final int next = din.readInt();
+			incrementPosition();
+			return next;
+		} catch (EOFException e) {
+			return null;
+		}
+	}
+
+	@Override
+	public void close() throws IOException {
+		in.close();
+	}
+
+	@Nullable
+	@Override
+	public CheckpointedPosition getCheckpointedPosition() {
+		return checkpointed ? new CheckpointedPosition(currentOffset, currentSkipCount) : null;
+	}
+
+	private void incrementPosition() {
+		currentSkipCount++;
+		if (currentSkipCount >= SKIPS_PER_OFFSET) {
+			currentOffset += 4 * currentSkipCount;
+			currentSkipCount = 0;
+		}
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/testutils/TestingFileSystem.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/testutils/TestingFileSystem.java
new file mode 100644
index 0000000..3596024
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/testutils/TestingFileSystem.java
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.testutils;
+
+import org.apache.flink.core.fs.BlockLocation;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemKind;
+import org.apache.flink.core.fs.Path;
+
+import javax.annotation.Nullable;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link FileSystem} for tests containing a pre-defined set of files and directories.
+ *
+ * <p>The file system can be registered under its schema at the file system registry, so that
+ * the {@link Path#getFileSystem()} method finds this test file system. Use the
+ * {@link TestingFileSystem#register()} method to do that, and don't forget to
+ * {@link TestingFileSystem#unregister()} when the test is done.
+ */
+public class TestingFileSystem extends FileSystem {
+
+	private final String scheme;
+
+	private final Map<Path, Collection<FileStatus>> directories;
+
+	private final Map<Path, TestFileStatus> files;
+
+	private TestingFileSystem(
+			final String scheme,
+			final Map<Path, Collection<FileStatus>> directories,
+			final Map<Path, TestFileStatus> files) {
+		this.scheme = scheme;
+		this.directories = directories;
+		this.files = files;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Factories
+	// ------------------------------------------------------------------------
+
+	public static TestingFileSystem createWithFiles(final String scheme, final Path... files) {
+		return createWithFiles(scheme, Arrays.asList(files));
+	}
+
+	public static TestingFileSystem createWithFiles(final String scheme, final Collection<Path> files) {
+		checkNotNull(scheme, "scheme");
+		checkNotNull(files, "files");
+
+		final Collection<TestFileStatus> status = files.stream()
+				.map((path) -> TestFileStatus.forFileWithDefaultBlock(path, 10L << 20))
+				.collect(Collectors.toList());
+
+		return createForFileStatus(scheme, status);
+	}
+
+	public static TestingFileSystem createForFileStatus(final String scheme, final TestFileStatus... files) {
+		return createForFileStatus(scheme, Arrays.asList(files));
+	}
+
+	public static TestingFileSystem createForFileStatus(final String scheme, final Collection<TestFileStatus> files) {
+		checkNotNull(scheme, "scheme");
+		checkNotNull(files, "files");
+
+		final HashMap<Path, TestFileStatus> fileMap = new HashMap<>(files.size());
+		final HashMap<Path, Collection<FileStatus>> directories = new HashMap<>();
+
+		for (TestFileStatus file : files) {
+			if (fileMap.putIfAbsent(file.getPath(), file) != null) {
+				throw new IllegalStateException("Already have a status for path " + file.getPath());
+			}
+			addParentDirectories(file, fileMap, directories);
+		}
+
+		return new TestingFileSystem(scheme, directories, fileMap);
+	}
+
+	private static void addParentDirectories(
+			final TestFileStatus file,
+			final Map<Path, TestFileStatus> files,
+			final Map<Path, Collection<FileStatus>> directories) {
+
+		final Path parentPath = file.getPath().getParent();
+		if (parentPath == null) {
+			return;
+		}
+
+		final TestFileStatus parentStatus = TestFileStatus.forDirectory(parentPath);
+		directories.computeIfAbsent(parentPath, (key) -> new ArrayList<>()).add(file);
+
+		final TestFileStatus existingParent = files.putIfAbsent(parentPath, parentStatus);
+		if (existingParent == null) {
+			addParentDirectories(parentStatus, files, directories);
+		} else {
+			checkArgument(existingParent.isDir(), "have a file already for a directory path");
+		}
+	}
+
+
+	// ------------------------------------------------------------------------
+	//  Test Utility
+	// ------------------------------------------------------------------------
+
+	public void register() throws Exception {
+		final Object key = createFsKey(scheme);
+		final Map<Object, Object> fsMap = getFsRegistry();
+		fsMap.put(key, this);
+	}
+
+	public void unregister() throws Exception {
+		final Object key = createFsKey(scheme);
+		final Map<Object, Object> fsMap = getFsRegistry();
+		fsMap.remove(key);
+	}
+
+	private static Object createFsKey(String scheme) throws Exception {
+		final Class<?> fsKeyClass = Class.forName("org.apache.flink.core.fs.FileSystem$FSKey");
+		final Constructor<?> ctor = fsKeyClass.getConstructor(String.class, String.class);
+		ctor.setAccessible(true);
+		return ctor.newInstance(scheme, null);
+	}
+
+	@SuppressWarnings("unchecked")
+	private static Map<Object, Object> getFsRegistry() throws Exception {
+		final Field cacheField = FileSystem.class.getDeclaredField("CACHE");
+		cacheField.setAccessible(true);
+		return (Map<Object, Object>) cacheField.get(null);
+	}
+
+	// ------------------------------------------------------------------------
+	//  File System Methods
+	// ------------------------------------------------------------------------
+
+	@Override
+	public Path getWorkingDirectory() {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public Path getHomeDirectory() {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public URI getUri() {
+		return URI.create(scheme + "://");
+	}
+
+	@Override
+	public FileStatus getFileStatus(Path f) throws IOException {
+		final FileStatus status = files.get(f);
+		if (status != null) {
+			return status;
+		} else {
+			throw new FileNotFoundException("File not found: " + f);
+		}
+	}
+
+	@Override
+	public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException {
+		final TestFileStatus status = file instanceof TestFileStatus
+				? (TestFileStatus) file
+				: files.get(file.getPath());
+
+		if (status == null) {
+			throw new FileNotFoundException(file.getPath().toString());
+		}
+
+		return status.getBlocks();
+	}
+
+	@Override
+	public FSDataInputStream open(Path f, int bufferSize) throws IOException {
+		return open(f);
+	}
+
+	@Override
+	public FSDataInputStream open(Path f) throws IOException {
+		final TestFileStatus status = (TestFileStatus) getFileStatus(f);
+		if (status.stream != null) {
+			return status.stream;
+		} else {
+			throw new UnsupportedOperationException("No stream registered for this file");
+		}
+	}
+
+	@Override
+	public FileStatus[] listStatus(Path f) throws IOException {
+		final Collection<FileStatus> dirContents = directories.get(f);
+		if (dirContents != null) {
+			return dirContents.toArray(new FileStatus[dirContents.size()]);
+		} else {
+			throw new FileNotFoundException("Directory not found: " + f);
+		}
+	}
+
+	@Override
+	public boolean delete(Path f, boolean recursive) throws IOException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public boolean mkdirs(Path f) throws IOException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public FSDataOutputStream create(Path f, WriteMode overwriteMode) throws IOException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public boolean rename(Path src, Path dst) throws IOException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public boolean isDistributedFS() {
+		return false;
+	}
+
+	@Override
+	public FileSystemKind getKind() {
+		return FileSystemKind.FILE_SYSTEM;
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Test implementation of a {@link FileStatus}.
+	 */
+	public static final class TestFileStatus implements FileStatus {
+
+		private static final long TIME = System.currentTimeMillis();
+
+		public static TestFileStatus forFileWithDefaultBlock(final Path path, final long len) {
+			return forFileWithBlocks(path, len, new TestBlockLocation(0L, len));
+		}
+
+		public static TestFileStatus forFileWithBlocks(final Path path, final long len, final BlockLocation... blocks) {
+			checkNotNull(blocks);
+			return new TestFileStatus(path, len, false, blocks, null);
+		}
+
+		public static TestFileStatus forFileWithStream(final Path path, final long len, final FSDataInputStream stream) {
+			return new TestFileStatus(path, len, false, new BlockLocation[] {new TestBlockLocation(0L, len)}, stream);
+		}
+
+		public static TestFileStatus forDirectory(final Path path) {
+			return new TestFileStatus(path, 4096L, true, null, null);
+		}
+
+		// ------------------------------------------------
+
+		private final Path path;
+		private final long len;
+		private final boolean isDir;
+		private @Nullable final BlockLocation[] blocks;
+		@Nullable final FSDataInputStream stream;
+
+		private TestFileStatus(
+				final Path path,
+				final long len,
+				final boolean isDir,
+				@Nullable final BlockLocation[] blocks,
+				@Nullable final FSDataInputStream stream) {
+			this.path = path;
+			this.len = len;
+			this.isDir = isDir;
+			this.blocks = blocks;
+			this.stream = stream;
+		}
+
+		@Override
+		public long getLen() {
+			return len;
+		}
+
+		@Override
+		public long getBlockSize() {
+			return 64 << 20;
+		}
+
+		@Override
+		public short getReplication() {
+			return 1;
+		}
+
+		@Override
+		public long getModificationTime() {
+			return TIME;
+		}
+
+		@Override
+		public long getAccessTime() {
+			return TIME;
+		}
+
+		@Override
+		public boolean isDir() {
+			return isDir;
+		}
+
+		@Override
+		public Path getPath() {
+			return path;
+		}
+
+		public BlockLocation[] getBlocks() {
+			return blocks;
+		}
+
+		@Override
+		public String toString() {
+			return path.toString() + (isDir ? " [DIR]" : " [FILE]");
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Test implementation of a {@link BlockLocation}.
+	 */
+	public static final class TestBlockLocation implements BlockLocation {
+
+		private final String[] hosts;
+		private final long offset;
+		private final long length;
+
+		public TestBlockLocation(long offset, long length, String... hosts) {
+			checkArgument(offset >= 0);
+			checkArgument(length >= 0);
+			this.offset = offset;
+			this.length = length;
+			this.hosts = checkNotNull(hosts);
+		}
+
+		@Override
+		public String[] getHosts() throws IOException {
+			return hosts;
+		}
+
+		@Override
+		public long getOffset() {
+			return offset;
+		}
+
+		@Override
+		public long getLength() {
+			return length;
+		}
+
+		@Override
+		public int compareTo(BlockLocation o) {
+			return 0;
+		}
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/util/ArrayResultIteratorTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/util/ArrayResultIteratorTest.java
new file mode 100644
index 0000000..a46ffcd
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/util/ArrayResultIteratorTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.util;
+
+import org.junit.Test;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unit tests for the {@link ArrayResultIterator}.
+ */
+public class ArrayResultIteratorTest {
+
+	@Test
+	public void testEmptyConstruction() {
+		final ArrayResultIterator<Object> iter = new ArrayResultIterator<>();
+		assertNull(iter.next());
+	}
+
+	@Test
+	public void testGetElements() {
+		final String[] elements = new String[] { "1", "2", "3", "4"};
+		final long initialPos = 1422;
+		final long initialSkipCount = 17;
+
+		final ArrayResultIterator<String> iter = new ArrayResultIterator<>();
+		iter.set(elements, elements.length, initialPos, initialSkipCount);
+
+		for (int i = 0; i < elements.length; i++) {
+			final RecordAndPosition<String> recAndPos = iter.next();
+			assertEquals(elements[i], recAndPos.getRecord());
+			assertEquals(initialPos, recAndPos.getOffset());
+			assertEquals(initialSkipCount + i + 1, recAndPos.getRecordSkipCount());
+		}
+	}
+
+	@Test
+	public void testExhausted() {
+		final ArrayResultIterator<String> iter = new ArrayResultIterator<>();
+		iter.set(new String[] { "1", "2"}, 2, 0L, 0L);
+
+		iter.next();
+		iter.next();
+
+		assertNull(iter.next());
+	}
+
+	@Test
+	public void testArraySubRange() {
+		final ArrayResultIterator<String> iter = new ArrayResultIterator<>();
+		iter.set(new String[] { "1", "2", "3"}, 2, 0L, 0L);
+
+		assertNotNull(iter.next());
+		assertNotNull(iter.next());
+		assertNull(iter.next());
+	}
+
+	@Test
+	public void testNoRecycler() {
+		final ArrayResultIterator<Object> iter = new ArrayResultIterator<>();
+		iter.releaseBatch();
+	}
+
+	@Test
+	public void testRecycler() {
+		final AtomicBoolean recycled = new AtomicBoolean();
+		final ArrayResultIterator<Object> iter = new ArrayResultIterator<>(() -> recycled.set(true));
+
+		iter.releaseBatch();
+
+		assertTrue(recycled.get());
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/util/IteratorResultIteratorTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/util/IteratorResultIteratorTest.java
new file mode 100644
index 0000000..e355e79
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/util/IteratorResultIteratorTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.util;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+/**
+ * Unit tests for the {@link IteratorResultIterator}.
+ */
+public class IteratorResultIteratorTest {
+
+	@Test
+	public void testGetElements() {
+		final String[] elements = new String[] { "1", "2", "3", "4"};
+		final long initialPos = 1422;
+		final long initialSkipCount = 17;
+
+		final IteratorResultIterator<String> iter = new IteratorResultIterator<>(
+				Arrays.asList(elements).iterator(), initialPos, initialSkipCount);
+
+		for (int i = 0; i < elements.length; i++) {
+			final RecordAndPosition<String> recAndPos = iter.next();
+			assertEquals(elements[i], recAndPos.getRecord());
+			assertEquals(initialPos, recAndPos.getOffset());
+			assertEquals(initialSkipCount + i + 1, recAndPos.getRecordSkipCount());
+		}
+	}
+
+	@Test
+	public void testExhausted() {
+		final IteratorResultIterator<String> iter = new IteratorResultIterator<>(
+				Arrays.asList("1", "2").iterator(), 0L, 0L);
+
+		iter.next();
+		iter.next();
+
+		assertNull(iter.next());
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/util/SingletonResultIteratorTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/util/SingletonResultIteratorTest.java
new file mode 100644
index 0000000..e1bbcd4
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/util/SingletonResultIteratorTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.util;
+
+import org.junit.Test;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unit tests for the {@link SingletonResultIterator}.
+ */
+public class SingletonResultIteratorTest {
+
+	@Test
+	public void testEmptyConstruction() {
+		final SingletonResultIterator<Object> iter = new SingletonResultIterator<>();
+		assertNull(iter.next());
+	}
+
+	@Test
+	public void testGetElement() {
+		final Object element = new Object();
+		final long pos = 1422;
+		final long skipCount = 17;
+
+		final SingletonResultIterator<Object> iter = new SingletonResultIterator<>();
+		iter.set(element, pos, skipCount);
+
+		final RecordAndPosition<Object> record = iter.next();
+		assertNotNull(record);
+		assertEquals(element, record.getRecord());
+		assertEquals(pos, record.getOffset());
+		assertEquals(skipCount, record.getRecordSkipCount());
+	}
+
+	@Test
+	public void testExhausted() {
+		final SingletonResultIterator<Object> iter = new SingletonResultIterator<>();
+		iter.set(new Object(), 1, 2);
+		iter.next();
+
+		assertNull(iter.next());
+	}
+
+	@Test
+	public void testNoRecycler() {
+		final SingletonResultIterator<Object> iter = new SingletonResultIterator<>();
+		iter.releaseBatch();
+	}
+
+	@Test
+	public void testRecycler() {
+		final AtomicBoolean recycled = new AtomicBoolean();
+		final SingletonResultIterator<Object> iter = new SingletonResultIterator<>(() -> recycled.set(true));
+
+		iter.releaseBatch();
+
+		assertTrue(recycled.get());
+	}
+}
diff --git a/flink-connectors/flink-connector-files/src/test/resources/log4j2-test.properties b/flink-connectors/flink-connector-files/src/test/resources/log4j2-test.properties
new file mode 100644
index 0000000..b4d2ba9
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/resources/log4j2-test.properties
@@ -0,0 +1,28 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = OFF
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%t] %-5p %c - %m%n
diff --git a/flink-connectors/pom.xml b/flink-connectors/pom.xml
index 8a0d867..9401351 100644
--- a/flink-connectors/pom.xml
+++ b/flink-connectors/pom.xml
@@ -55,6 +55,7 @@ under the License.
 		<module>flink-connector-gcp-pubsub</module>
 		<module>flink-connector-kinesis</module>
 		<module>flink-connector-base</module>
+		<module>flink-connector-files</module>
 	</modules>
 
 	<!-- override these root dependencies as 'provided', so they don't end up
diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml
index b8a005c..d22b0f1 100644
--- a/flink-dist/pom.xml
+++ b/flink-dist/pom.xml
@@ -144,6 +144,12 @@ under the License.
 			<version>${project.version}</version>
 		</dependency>
 
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-files</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
 		<!-- Default file system support. The Hadoop and MapR dependencies -->
 		<!--       are optional, so not being added to the dist jar        -->
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index 1af572c..05b3216 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -173,11 +173,12 @@ public class SourceOperator<OUT, SplitT extends SourceSplit>
 			sourceReader.addSplits(splits);
 		}
 
-		// Start the reader.
-		sourceReader.start();
 		// Register the reader to the coordinator.
 		registerReader();
 
+		// Start the reader after registration, sending messages in start is allowed.
+		sourceReader.start();
+
 		eventTimeLogic.startPeriodicWatermarkEmits();
 	}