You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/09/21 19:17:15 UTC
[flink] 01/02: [FLINK-19161][file connector] Add first version of
the FLIP-27 File Source
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 90d01f7409c579313e70fcbfa9e0342e6af50ff8
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Mon Jun 29 11:00:59 2020 +0200
[FLINK-19161][file connector] Add first version of the FLIP-27 File Source
This closes #13401
---
.../SingleThreadMultiplexSourceReaderBase.java | 63 ++-
.../source/reader/mocks/TestingReaderContext.java | 4 +-
flink-connectors/flink-connector-files/pom.xml | 83 ++++
.../file/src/ContinuousEnumerationSettings.java | 55 +++
.../flink/connector/file/src/FileSource.java | 437 +++++++++++++++++++++
.../flink/connector/file/src/FileSourceSplit.java | 219 +++++++++++
.../file/src/FileSourceSplitSerializer.java | 128 ++++++
.../connector/file/src/FileSourceSplitState.java | 106 +++++
.../file/src/PendingSplitsCheckpoint.java | 93 +++++
.../src/PendingSplitsCheckpointSerializer.java | 150 +++++++
.../file/src/assigners/FileSplitAssigner.java | 71 ++++
.../file/src/assigners/SimpleSplitAssigner.java | 65 +++
.../src/compression/StandardDeCompressors.java | 105 +++++
.../BlockSplittingRecursiveEnumerator.java | 150 +++++++
.../file/src/enumerate/DefaultFileFilter.java | 42 ++
.../file/src/enumerate/FileEnumerator.java | 57 +++
.../enumerate/NonSplittingRecursiveEnumerator.java | 144 +++++++
.../src/impl/ContinuousFileSplitEnumerator.java | 162 ++++++++
.../file/src/impl/FileRecordFormatAdapter.java | 168 ++++++++
.../flink/connector/file/src/impl/FileRecords.java | 109 +++++
.../connector/file/src/impl/FileSourceReader.java | 72 ++++
.../file/src/impl/FileSourceRecordEmitter.java | 47 +++
.../file/src/impl/FileSourceSplitReader.java | 116 ++++++
.../file/src/impl/StaticFileSplitEnumerator.java | 128 ++++++
.../file/src/impl/StreamFormatAdapter.java | 269 +++++++++++++
.../connector/file/src/reader/BulkFormat.java | 199 ++++++++++
.../file/src/reader/FileRecordFormat.java | 200 ++++++++++
.../file/src/reader/SimpleStreamFormat.java | 115 ++++++
.../connector/file/src/reader/StreamFormat.java | 219 +++++++++++
.../connector/file/src/reader/TextLineFormat.java | 96 +++++
.../file/src/util/ArrayResultIterator.java | 90 +++++
.../file/src/util/CheckpointedPosition.java | 107 +++++
.../file/src/util/IteratorResultIterator.java | 95 +++++
.../file/src/util/MutableRecordAndPosition.java | 57 +++
.../apache/flink/connector/file/src/util/Pool.java | 115 ++++++
.../connector/file/src/util/RecordAndPosition.java | 90 +++++
.../file/src/util/RecyclableIterator.java | 50 +++
.../file/src/util/SingletonResultIterator.java | 75 ++++
.../flink/connector/file/src/util/Utils.java | 72 ++++
.../file/src/FileSourceHeavyThroughputTest.java | 230 +++++++++++
.../file/src/FileSourceSplitSerializerTest.java | 128 ++++++
.../file/src/FileSourceSplitStateTest.java | 82 ++++
.../connector/file/src/FileSourceSplitTest.java | 34 ++
.../file/src/FileSourceTextLinesITCase.java | 273 +++++++++++++
.../src/PendingSplitsCheckpointSerializerTest.java | 150 +++++++
.../BlockSplittingRecursiveEnumeratorTest.java | 62 +++
.../NonSplittingRecursiveEnumeratorTest.java | 204 ++++++++++
.../connector/file/src/impl/AdapterTestBase.java | 284 +++++++++++++
.../file/src/impl/FileRecordFormatAdapterTest.java | 199 ++++++++++
.../connector/file/src/impl/FileRecordsTest.java | 126 ++++++
.../file/src/impl/StreamFormatAdapterTest.java | 197 ++++++++++
.../connector/file/src/impl/TestIntReader.java | 90 +++++
.../file/src/testutils/TestingFileSystem.java | 391 ++++++++++++++++++
.../file/src/util/ArrayResultIteratorTest.java | 94 +++++
.../file/src/util/IteratorResultIteratorTest.java | 60 +++
.../file/src/util/SingletonResultIteratorTest.java | 81 ++++
.../src/test/resources/log4j2-test.properties | 28 ++
flink-connectors/pom.xml | 1 +
flink-dist/pom.xml | 6 +
.../streaming/api/operators/SourceOperator.java | 5 +-
60 files changed, 7335 insertions(+), 13 deletions(-)
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java
index ab87db0..f5806d1 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java
@@ -26,24 +26,67 @@ import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcher
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import java.util.Collection;
import java.util.function.Supplier;
/**
- * A abstract {@link SourceReader} implementation that assign all the splits to a single thread to consume.
- * @param <E>
- * @param <T>
- * @param <SplitT>
- * @param <SplitStateT>
+ * A base for {@link SourceReader}s that read splits with one thread using one {@link SplitReader}.
+ * The splits can be read either one after the other (like in a file source) or concurrently by changing
+ * the subscription in the split reader (like in the Kafka Source).
+ *
+ * <p>To implement a source reader based on this class, implementors need to supply the following:
+ * <ul>
+ * <li>A {@link SplitReader}, which connects to the source and reads/polls data. The split reader
+ * gets notified whenever there is a new split. The split reader would read files, contain a
+ * Kafka or other source client, etc.</li>
+ * <li>A {@link RecordEmitter} that takes a record from the Split Reader and updates the checkpointing
+ * state and converts it into the final form. For example for Kafka, the Record Emitter takes a
+ * {@code ConsumerRecord}, puts the offset information into state, transforms the records with the
+ * deserializers into the final type, and emits the record.</li>
+ * <li>The class must override the methods to convert back and forth between the immutable splits
+ * ({@code SplitT}) and the mutable split state representation ({@code SplitStateT}).</li>
+ * <li>Finally, the reader must decide what to do when it starts ({@link #start()}) or when a split is
+ * finished ({@link #onSplitFinished(Collection)}).</li>
+ * </ul>
+ *
+ * @param <E> The type of the records (the raw type that typically contains checkpointing information).
+ * @param <T> The final type of the records emitted by the source.
+ * @param <SplitT> The type of the splits processed by the source.
+ * @param <SplitStateT> The type of the mutable state per split.
*/
public abstract class SingleThreadMultiplexSourceReaderBase<E, T, SplitT extends SourceSplit, SplitStateT>
extends SourceReaderBase<E, T, SplitT, SplitStateT> {
+ /**
+ * The primary constructor for the source reader.
+ *
+ * <p>The reader will use a handover queue sized as configured via
+ * {@link SourceReaderOptions#ELEMENT_QUEUE_CAPACITY}.
+ */
+ public SingleThreadMultiplexSourceReaderBase(
+ Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
+ RecordEmitter<E, T, SplitStateT> recordEmitter,
+ Configuration config,
+ SourceReaderContext context) {
+ this(
+ new FutureCompletingBlockingQueue<>(config.getInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY)),
+ splitReaderSupplier,
+ recordEmitter,
+ config,
+ context);
+ }
+
+ /**
+ * This constructor behaves like
+ * {@link #SingleThreadMultiplexSourceReaderBase(Supplier, RecordEmitter, Configuration, SourceReaderContext)},
+ * but accepts a specific {@link FutureCompletingBlockingQueue}.
+ */
public SingleThreadMultiplexSourceReaderBase(
- FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
- Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
- RecordEmitter<E, T, SplitStateT> recordEmitter,
- Configuration config,
- SourceReaderContext context) {
+ FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
+ Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
+ RecordEmitter<E, T, SplitStateT> recordEmitter,
+ Configuration config,
+ SourceReaderContext context) {
super(
elementsQueue,
new SingleThreadFetcherManager<>(elementsQueue, splitReaderSupplier),
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingReaderContext.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingReaderContext.java
index 02faf1f..5bb9b59 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingReaderContext.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingReaderContext.java
@@ -64,7 +64,9 @@ public class TestingReaderContext implements SourceReaderContext {
}
@Override
- public void sendSourceEventToCoordinator(SourceEvent sourceEvent) {}
+ public void sendSourceEventToCoordinator(SourceEvent sourceEvent) {
+ sentEvents.add(sourceEvent);
+ }
// ------------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-files/pom.xml b/flink-connectors/flink-connector-files/pom.xml
new file mode 100644
index 0000000..e1a5149
--- /dev/null
+++ b/flink-connectors/flink-connector-files/pom.xml
@@ -0,0 +1,83 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connectors</artifactId>
+ <version>1.12-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-connector-files</artifactId>
+ <name>Flink : Connectors : Files</name>
+
+ <packaging>jar</packaging>
+
+ <dependencies>
+
+ <!-- core dependencies -->
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-base</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- test dependencies -->
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-base</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils_${scala.binary.version}</artifactId>
+ <version>1.12-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+</project>
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/ContinuousEnumerationSettings.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/ContinuousEnumerationSettings.java
new file mode 100644
index 0000000..4123c43
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/ContinuousEnumerationSettings.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src;
+
+import org.apache.flink.annotation.Internal;
+
+import java.io.Serializable;
+import java.time.Duration;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Settings describing how to do continuous file discovery and enumeration for the
+ * file source's continuous discovery and streaming mode.
+ */
+@Internal
+final class ContinuousEnumerationSettings implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final Duration discoveryInterval;
+
+ ContinuousEnumerationSettings(Duration discoveryInterval) {
+ this.discoveryInterval = checkNotNull(discoveryInterval);
+ }
+
+ public Duration getDiscoveryInterval() {
+ return discoveryInterval;
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public String toString() {
+ return "ContinuousEnumerationSettings{" +
+ "discoveryInterval=" + discoveryInterval +
+ '}';
+ }
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSource.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSource.java
new file mode 100644
index 0000000..296abf9
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSource.java
@@ -0,0 +1,437 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.connector.file.src.assigners.FileSplitAssigner;
+import org.apache.flink.connector.file.src.assigners.SimpleSplitAssigner;
+import org.apache.flink.connector.file.src.enumerate.BlockSplittingRecursiveEnumerator;
+import org.apache.flink.connector.file.src.enumerate.FileEnumerator;
+import org.apache.flink.connector.file.src.impl.ContinuousFileSplitEnumerator;
+import org.apache.flink.connector.file.src.impl.FileRecordFormatAdapter;
+import org.apache.flink.connector.file.src.impl.FileSourceReader;
+import org.apache.flink.connector.file.src.impl.StaticFileSplitEnumerator;
+import org.apache.flink.connector.file.src.impl.StreamFormatAdapter;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connector.file.src.reader.FileRecordFormat;
+import org.apache.flink.connector.file.src.reader.StreamFormat;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A unified data source that reads files - both in batch and in streaming mode.
+ *
+ * <p>This source supports all (distributed) file systems and object stores that can be
+ * accessed via the Flink's {@link FileSystem} class.
+ *
+ * <p>Start building a file source via one of the following calls:
+ * <ul>
+ * <li>{@link FileSource#forRecordStreamFormat(StreamFormat, Path...)}</li>
+ * <li>{@link FileSource#forBulkFileFormat(BulkFormat, Path...)}</li>
+ * <li>{@link FileSource#forRecordFileFormat(FileRecordFormat, Path...)}</li>
+ * </ul>
+ * This creates a {@link FileSource.FileSourceBuilder} on which you can configure all the
+ * properties of the file source.
+ *
+ * <h2>Batch and Streaming</h2>
+ *
+ * <p>This source supports both bounded/batch and continuous/streaming data inputs. For the
+ * bounded/batch case, the file source processes all files under the given path(s).
+ * In the continuous/streaming case, the source periodically checks the paths for new files
+ * and will start reading those.
+ *
+ * <p>When you start creating a file source (via the {@link FileSource.FileSourceBuilder} created
+ * through one of the above-mentioned methods) the source is by default in bounded/batch mode.
+ * Call {@link FileSource.FileSourceBuilder#monitorContinuously(Duration)} to put the source
+ * into continuous streaming mode.
+ *
+ * <h2>Format Types</h2>
+ *
+ * <p>The reading of each file happens through file readers defined by <i>file formats</i>.
+ * These define the parsing logic for the contents of the file. There are multiple classes that
+ * the source supports. Their interfaces trade of simplicity of implementation and flexibility/efficiency.
+ * <ul>
+ * <li>A {@link StreamFormat} reads the contents of a file from a file stream. It is the simplest
+ * format to implement, and provides many features out-of-the-box (like checkpointing logic)
+ * but is limited in the optimizations it can apply (such as object reuse, batching, etc.).</li>
+ * <li>A {@link BulkFormat} reads batches of records from a file at a time. It is the most "low level"
+ * format to implement, but offers the greatest flexibility to optimize the implementation.</li>
+ * <li>A {@link FileRecordFormat} is in the middle of the trade-off spectrum between the
+ * {@code StreamFormat} and the {@code BulkFormat}.</li>
+ * </ul>
+ *
+ * <h2>Discovering / Enumerating Files</h2>
+ *
+ * <p>The way that the source lists the files to be processes is defined by the {@link FileEnumerator}.
+ * The {@code FileEnumerator} is responsible to select the relevant files (for example filter out
+ * hidden files) and to optionally splits files into multiple regions (= file source splits) that
+ * can be read in parallel).
+ *
+ * @param <T> The type of the events/records produced by this source.
+ */
+@PublicEvolving
+public final class FileSource<T> implements Source<T, FileSourceSplit, PendingSplitsCheckpoint>, ResultTypeQueryable<T> {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * The default split assigner, a lazy non-locality-aware assigner.
+ */
+ public static final FileSplitAssigner.Provider DEFAULT_SPLIT_ASSIGNER = SimpleSplitAssigner::new;
+
+ /**
+ * The default file enumerator used for splittable formats.
+ * The enumerator recursively enumerates files, split files that consist of multiple distributed storage
+ * blocks into multiple splits, and filters hidden files (files starting with '.' or '_').
+ * Files with suffixes of common compression formats (for example '.gzip', '.bz2', '.xy', '.zip', ...)
+ * will not be split.
+ */
+ public static final FileEnumerator.Provider DEFAULT_SPLITTABLE_FILE_ENUMERATOR = BlockSplittingRecursiveEnumerator::new;
+
+ /**
+ * The default file enumerator used for non-splittable formats.
+ * The enumerator recursively enumerates files, creates one split for the file, and filters hidden
+ * files (files starting with '.' or '_').
+ */
+ public static final FileEnumerator.Provider DEFAULT_NON_SPLITTABLE_FILE_ENUMERATOR = BlockSplittingRecursiveEnumerator::new;
+
+ // ------------------------------------------------------------------------
+
+ private final Path[] inputPaths;
+
+ private final FileEnumerator.Provider enumeratorFactory;
+
+ private final FileSplitAssigner.Provider assignerFactory;
+
+ private final BulkFormat<T> readerFormat;
+
+ @Nullable
+ private final ContinuousEnumerationSettings continuousEnumerationSettings;
+
+ // ------------------------------------------------------------------------
+
+ private FileSource(
+ final Path[] inputPaths,
+ final FileEnumerator.Provider fileEnumerator,
+ final FileSplitAssigner.Provider splitAssigner,
+ final BulkFormat<T> readerFormat,
+ @Nullable final ContinuousEnumerationSettings continuousEnumerationSettings) {
+
+ checkArgument(inputPaths.length > 0);
+ this.inputPaths = inputPaths;
+ this.enumeratorFactory = checkNotNull(fileEnumerator);
+ this.assignerFactory = checkNotNull(splitAssigner);
+ this.readerFormat = checkNotNull(readerFormat);
+ this.continuousEnumerationSettings = continuousEnumerationSettings;
+ }
+
+ // ------------------------------------------------------------------------
+ // Source API Methods
+ // ------------------------------------------------------------------------
+
+ @Override
+ public Boundedness getBoundedness() {
+ return continuousEnumerationSettings == null ? Boundedness.BOUNDED : Boundedness.CONTINUOUS_UNBOUNDED;
+ }
+
+ @Override
+ public SourceReader<T, FileSourceSplit> createReader(SourceReaderContext readerContext) {
+ return new FileSourceReader<>(readerContext, readerFormat, readerContext.getConfiguration());
+ }
+
+ @Override
+ public SplitEnumerator<FileSourceSplit, PendingSplitsCheckpoint> createEnumerator(
+ SplitEnumeratorContext<FileSourceSplit> enumContext) {
+
+ final FileEnumerator enumerator = enumeratorFactory.create();
+
+ // read the initial set of splits (which is also the total set of splits for bounded sources)
+ final Collection<FileSourceSplit> splits;
+ try {
+ // TODO - in the next cleanup pass, we should try to remove the need to "wrap unchecked" here
+ splits = enumerator.enumerateSplits(inputPaths, enumContext.currentParallelism());
+ } catch (IOException e) {
+ throw new FlinkRuntimeException("Could not enumerate file splits", e);
+ }
+
+ return createSplitEnumerator(enumContext, enumerator, splits, null);
+ }
+
+ @Override
+ public SplitEnumerator<FileSourceSplit, PendingSplitsCheckpoint> restoreEnumerator(
+ SplitEnumeratorContext<FileSourceSplit> enumContext,
+ PendingSplitsCheckpoint checkpoint) throws IOException {
+
+ final FileEnumerator enumerator = enumeratorFactory.create();
+
+ return createSplitEnumerator(enumContext, enumerator, checkpoint.getSplits(), checkpoint.getAlreadyProcessedPaths());
+ }
+
+ @Override
+ public SimpleVersionedSerializer<FileSourceSplit> getSplitSerializer() {
+ return FileSourceSplitSerializer.INSTANCE;
+ }
+
+ @Override
+ public SimpleVersionedSerializer<PendingSplitsCheckpoint> getEnumeratorCheckpointSerializer() {
+ return PendingSplitsCheckpointSerializer.INSTANCE;
+ }
+
+ @Override
+ public TypeInformation<T> getProducedType() {
+ return readerFormat.getProducedType();
+ }
+
+ // ------------------------------------------------------------------------
+ // helpers
+ // ------------------------------------------------------------------------
+
+ private SplitEnumerator<FileSourceSplit, PendingSplitsCheckpoint> createSplitEnumerator(
+ SplitEnumeratorContext<FileSourceSplit> context,
+ FileEnumerator enumerator,
+ Collection<FileSourceSplit> splits,
+ @Nullable Collection<Path> alreadyProcessedPaths) {
+
+ final FileSplitAssigner splitAssigner = assignerFactory.create(splits);
+
+ if (continuousEnumerationSettings == null) {
+ // bounded case
+ return new StaticFileSplitEnumerator(context, splitAssigner);
+ } else {
+ // unbounded case
+ if (alreadyProcessedPaths == null) {
+ alreadyProcessedPaths = splitsToPaths(splits);
+ }
+
+ return new ContinuousFileSplitEnumerator(
+ context,
+ enumerator,
+ splitAssigner,
+ inputPaths,
+ alreadyProcessedPaths,
+ continuousEnumerationSettings.getDiscoveryInterval().toMillis());
+ }
+ }
+
+ private static Collection<Path> splitsToPaths(Collection<FileSourceSplit> splits) {
+ return splits.stream()
+ .map(FileSourceSplit::path)
+ .collect(Collectors.toCollection(HashSet::new));
+ }
+
+ // ------------------------------------------------------------------------
+ // Entry-point Factory Methods
+ // ------------------------------------------------------------------------
+
+ /**
+ * Builds a new {@code FileSource} using a {@link StreamFormat} to read record-by-record from a
+ * file stream.
+ *
+ * <p>When possible, stream-based formats are generally easier (preferable) to file-based formats,
+ * because they support better default behavior around I/O batching, or better progress tracking to
+ * avoid re-doing work on recovery.
+ */
+ public static <T> FileSourceBuilder<T> forRecordStreamFormat(final StreamFormat<T> reader, final Path... paths) {
+ checkNotNull(reader, "reader");
+ checkNotNull(paths, "paths");
+ checkArgument(paths.length > 0, "paths must not be empty");
+
+ final BulkFormat<T> bulkFormat = new StreamFormatAdapter<>(reader);
+ return new FileSourceBuilder<>(paths, bulkFormat);
+ }
+
+ /**
+ * Builds a new {@code FileSource} using a {@link BulkFormat} to read batches of records
+ * from files.
+ *
+ * <p>Examples for bulk readers are compressed and vectorized formats such as ORC or Parquet.
+ */
+ public static <T> FileSourceBuilder<T> forBulkFileFormat(final BulkFormat<T> reader, final Path... paths) {
+ checkNotNull(reader, "reader");
+ checkNotNull(paths, "paths");
+ checkArgument(paths.length > 0, "paths must not be empty");
+
+ return new FileSourceBuilder<>(paths, reader);
+ }
+
+ /**
+ * Builds a new {@code FileSource} using a {@link FileRecordFormat} to read record-by-record from a
+ * a file path.
+ *
+ * <p>A {@code FileRecordFormat} is more general than the {@link StreamFormat}, but also
+ * requires often more careful parametrization.
+ */
+ public static <T> FileSourceBuilder<T> forRecordFileFormat(final FileRecordFormat<T> reader, final Path... paths) {
+ checkNotNull(reader, "reader");
+ checkNotNull(paths, "paths");
+ checkArgument(paths.length > 0, "paths must not be empty");
+
+ final BulkFormat<T> bulkFormat = new FileRecordFormatAdapter<>(reader);
+ return new FileSourceBuilder<>(paths, bulkFormat);
+ }
+
+ // ------------------------------------------------------------------------
+ // Builder
+ // ------------------------------------------------------------------------
+
+ /**
+ * The builder for the {@code FileSource}, to configure the various behaviors.
+ *
+ * <p>Start building the source via one of the following methods:
+ * <ul>
+ * <li>{@link FileSource#forRecordStreamFormat(StreamFormat, Path...)}</li>
+ * <li>{@link FileSource#forBulkFileFormat(BulkFormat, Path...)}</li>
+ * <li>{@link FileSource#forRecordFileFormat(FileRecordFormat, Path...)}</li>
+ * </ul>
+ */
+ public static final class FileSourceBuilder<T> extends AbstractFileSourceBuilder<T, FileSourceBuilder<T>> {
+ public FileSourceBuilder(Path[] inputPaths, BulkFormat<T> readerFormat) {
+ super(inputPaths, readerFormat);
+ }
+ }
+
+ /**
+ * The generic base builder. This builder carries a <i>SELF</i> type to make it convenient to
+ * extend this for subclasses, using the following pattern.
+ * <pre>{@code
+ * public class SubBuilder<T> extends AbstractFileSourceBuilder<T, SubBuilder<T>> {
+ * ...
+ * }
+ * }</pre>
+ * That way, all return values from builder method defined here are typed to the sub-class
+ * type and support fluent chaining.
+ *
+ * <p>We don't make the publicly visible builder generic with a SELF type, because it leads to
+ * generic signatures that can look complicated and confusing.
+ */
+ public static class AbstractFileSourceBuilder<T, SELF extends AbstractFileSourceBuilder<T, SELF>> {
+
+ // mandatory - have no defaults
+ private final Path[] inputPaths;
+ private final BulkFormat<T> readerFormat;
+
+ // optional - have defaults
+ private FileEnumerator.Provider fileEnumerator;
+ private FileSplitAssigner.Provider splitAssigner;
+ @Nullable
+ private ContinuousEnumerationSettings continuousSourceSettings;
+
+ protected AbstractFileSourceBuilder(Path[] inputPaths, BulkFormat<T> readerFormat) {
+ this.inputPaths = checkNotNull(inputPaths);
+ this.readerFormat = checkNotNull(readerFormat);
+
+ this.fileEnumerator = readerFormat.isSplittable()
+ ? DEFAULT_SPLITTABLE_FILE_ENUMERATOR
+ : DEFAULT_NON_SPLITTABLE_FILE_ENUMERATOR;
+ this.splitAssigner = DEFAULT_SPLIT_ASSIGNER;
+ }
+
+ /**
+ * Creates the file source with the settings applied to this builder.
+ */
+ public FileSource<T> build() {
+ return new FileSource<>(
+ inputPaths,
+ fileEnumerator,
+ splitAssigner,
+ readerFormat,
+ continuousSourceSettings);
+ }
+
+ /**
+ * Sets this source to streaming ("continuous monitoring") mode.
+ *
+ * <p>This makes the source a "continuous streaming" source that keeps running, monitoring
+ * for new files, and reads these files when they appear and are discovered by the monitoring.
+ *
+ * <p>The interval in which the source checks for new files is the {@code discoveryInterval}.
+ * Shorter intervals mean that files are discovered more quickly, but also imply more frequent
+ * listing or directory traversal of the file system / object store.
+ */
+ public SELF monitorContinuously(Duration discoveryInterval) {
+ checkNotNull(discoveryInterval, "discoveryInterval");
+ checkArgument(!(discoveryInterval.isNegative() || discoveryInterval.isZero()), "discoveryInterval must be > 0");
+
+ this.continuousSourceSettings = new ContinuousEnumerationSettings(discoveryInterval);
+ return self();
+ }
+
+ /**
+ * Sets this source to bounded (batch) mode.
+ *
+ * <p>In this mode, the source processes the files that are under the given paths when the
+ * application is started. Once all files are processed, the source will finish.
+ *
+ * <p>This setting is also the default behavior. This method is mainly here to "switch back" to
+ * bounded (batch) mode, or to make it explicit in the source construction.
+ */
+ public SELF processStaticFileSet() {
+ this.continuousSourceSettings = null;
+ return self();
+ }
+
+ /**
+ * Configures the {@link FileEnumerator} for the source.
+ * The File Enumerator is responsible for selecting from the input path the set of files
+ * that should be processed (and which to filter out). Furthermore, the File Enumerator
+ * may split the files further into sub-regions, to enable parallelization beyond the number
+ * of files.
+ */
+ public SELF setFileEnumerator(FileEnumerator.Provider fileEnumerator) {
+ this.fileEnumerator = checkNotNull(fileEnumerator);
+ return self();
+ }
+
+ /**
+ * Configures the {@link FileSplitAssigner} for the source.
+ * The File Split Assigner determines which parallel reader instance gets which
+ * {@link FileSourceSplit}, and in which order these splits are assigned.
+ */
+ public SELF setSplitAssigner(FileSplitAssigner.Provider splitAssigner) {
+ this.splitAssigner = checkNotNull(splitAssigner);
+ return self();
+ }
+
+ @SuppressWarnings("unchecked")
+ private SELF self() {
+ return (SELF) this;
+ }
+ }
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSourceSplit.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSourceSplit.java
new file mode 100644
index 0000000..7f85618
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSourceSplit.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.connector.file.src.util.CheckpointedPosition;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.StringUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link SourceSplit} that represents a file, or a region of a file.
+ *
+ * <p>The split has an offset and an end, which defines the region of the file represented by
+ * the split. For splits representing the while file, the offset is zero and the length is the
+ * file size.
+ *
+ * <p>The split may furthermore have a "reader position", which is the checkpointed position from
+ * a reader previously reading this split. This position is typically null when the split is assigned
+ * from the enumerator to the readers, and is non-null when the readers checkpoint their state
+ * in a file source split.
+ *
+ * <p>This class is {@link Serializable} for convenience. For Flink's internal serialization (both for
+ * RPC and for checkpoints), the {@link FileSourceSplitSerializer} is used.
+ */
+@PublicEvolving
+public class FileSourceSplit implements SourceSplit, Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final String[] NO_HOSTS = StringUtils.EMPTY_STRING_ARRAY;
+
+ /** The unique ID of the split. Unique within the scope of this source. */
+ private final String id;
+
+ /** The path of the file referenced by this split. */
+ private final Path filePath;
+
+ /** The position of the first byte in the file to process. */
+ private final long offset;
+
+ /** The number of bytes in the file to process. */
+ private final long length;
+
+ /** The names of the hosts storing this range of the file. Empty, if no host information is available. */
+ private final String[] hostnames;
+
+ /** The precise reader position in the split, to resume from. */
+ @Nullable
+ private final CheckpointedPosition readerPosition;
+
+ /** The splits are frequently serialized into checkpoints.
+ * Caching the byte representation makes repeated serialization cheap.
+ * This field is used by {@link FileSourceSplitSerializer}. */
+ @Nullable
+ transient byte[] serializedFormCache;
+
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Constructs a split with host information.
+ *
+ * @param id The unique ID of this source split.
+ * @param filePath The path to the file.
+ * @param offset The start (inclusive) of the split's rage in the file.
+ * @param length The number of bytes in the split (starting from the offset)
+ */
+ public FileSourceSplit(String id, Path filePath, long offset, long length) {
+ this(id, filePath, offset, length, NO_HOSTS);
+ }
+
+ /**
+ * Constructs a split with host information.
+ *
+ * @param filePath The path to the file.
+ * @param offset The start (inclusive) of the split's rage in the file.
+ * @param length The number of bytes in the split (starting from the offset)
+ * @param hostnames The hostnames of the nodes storing the split's file range.
+ */
+ public FileSourceSplit(String id, Path filePath, long offset, long length, String... hostnames) {
+ this(id, filePath, offset, length, hostnames, null, null);
+ }
+
+ /**
+ * Constructs a split with host information.
+ *
+ * @param filePath The path to the file.
+ * @param offset The start (inclusive) of the split's rage in the file.
+ * @param length The number of bytes in the split (starting from the offset)
+ * @param hostnames The hostnames of the nodes storing the split's file range.
+ */
+ public FileSourceSplit(
+ String id,
+ Path filePath,
+ long offset,
+ long length,
+ String[] hostnames,
+ @Nullable CheckpointedPosition readerPosition) {
+ this(id, filePath, offset, length, hostnames, readerPosition, null);
+ }
+
+ /**
+ * Package private constructor, used by the serializers to directly cache the serialized form.
+ */
+ FileSourceSplit(
+ String id,
+ Path filePath,
+ long offset,
+ long length,
+ String[] hostnames,
+ @Nullable CheckpointedPosition readerPosition,
+ @Nullable byte[] serializedForm) {
+
+ checkArgument(offset >= 0, "offset must be >= 0");
+ checkArgument(length >= 0, "length must be >= 0");
+ checkNoNullHosts(hostnames);
+
+ this.id = checkNotNull(id);
+ this.filePath = checkNotNull(filePath);
+ this.offset = offset;
+ this.length = length;
+ this.hostnames = hostnames;
+ this.readerPosition = readerPosition;
+ this.serializedFormCache = serializedForm;
+ }
+
+ // ------------------------------------------------------------------------
+ // split properties
+ // ------------------------------------------------------------------------
+
+ @Override
+ public String splitId() {
+ return id;
+ }
+
+ /**
+ * Gets the file's path.
+ */
+ public Path path() {
+ return filePath;
+ }
+
+ /**
+ * Returns the start of the file region referenced by this source split.
+ * The position is inclusive, the value indicates the first byte that is part of the split.
+ */
+ public long offset() {
+ return offset;
+ }
+
+ /**
+ * Returns the number of bytes in the file region described by this source split.
+ */
+ public long length() {
+ return length;
+ }
+
+ /**
+ * Gets the hostnames of the nodes storing the file range described by this split.
+ * The returned array is empty, if no host information is available.
+ *
+ * <p>Host information is typically only available on specific file systems, like HDFS.
+ */
+ public String[] hostnames() {
+ return hostnames;
+ }
+
+ /**
+ * Gets the (checkpointed) position of the reader, if set.
+ * This value is typically absent for splits when assigned from the enumerator to the readers,
+ * and present when the splits are recovered from a checkpoint.
+ */
+ public Optional<CheckpointedPosition> getReaderPosition() {
+ return Optional.ofNullable(readerPosition);
+ }
+
+ // ------------------------------------------------------------------------
+ // utils
+ // ------------------------------------------------------------------------
+
+ @Override
+ public String toString() {
+ final String hosts = hostnames.length == 0 ? "(no host info)" : " hosts=" + Arrays.toString(hostnames);
+ return String.format("FileSourceSplit: %s [%d, %d) %s ID=%s position=%s",
+ filePath, offset, offset + length, hosts, id, readerPosition);
+ }
+
+ private static void checkNoNullHosts(String[] hosts) {
+ checkNotNull(hosts, "hostnames array must not be null");
+ for (String host : hosts) {
+ checkArgument(host != null, "the hostnames must not contain null entries");
+ }
+ }
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSourceSplitSerializer.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSourceSplitSerializer.java
new file mode 100644
index 0000000..1674230
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSourceSplitSerializer.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.file.src.util.CheckpointedPosition;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A serializer for the {@link FileSourceSplit}.
+ */
+@PublicEvolving
+public final class FileSourceSplitSerializer implements SimpleVersionedSerializer<FileSourceSplit> {
+
+ public static final FileSourceSplitSerializer INSTANCE = new FileSourceSplitSerializer();
+
+ private static final ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE =
+ ThreadLocal.withInitial(() -> new DataOutputSerializer(64));
+
+ private static final int VERSION = 1;
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public int getVersion() {
+ return VERSION;
+ }
+
+ @Override
+ public byte[] serialize(FileSourceSplit split) throws IOException {
+ checkArgument(split.getClass() == FileSourceSplit.class, "Cannot serialize subclasses of FileSourceSplit");
+
+ // optimization: the splits lazily cache their own serialized form
+ if (split.serializedFormCache != null) {
+ return split.serializedFormCache;
+ }
+
+ final DataOutputSerializer out = SERIALIZER_CACHE.get();
+
+ out.writeUTF(split.splitId());
+ split.path().write(out);
+ out.writeLong(split.offset());
+ out.writeLong(split.length());
+ writeStringArray(out, split.hostnames());
+
+ final Optional<CheckpointedPosition> readerPosition = split.getReaderPosition();
+ out.writeBoolean(readerPosition.isPresent());
+ if (readerPosition.isPresent()) {
+ out.writeLong(readerPosition.get().getOffset());
+ out.writeLong(readerPosition.get().getRecordsAfterOffset());
+ }
+
+ final byte[] result = out.getCopyOfBuffer();
+ out.clear();
+
+ // optimization: cache the serialized from, so we avoid the byte work during repeated serialization
+ split.serializedFormCache = result;
+
+ return result;
+ }
+
+ @Override
+ public FileSourceSplit deserialize(int version, byte[] serialized) throws IOException {
+ if (version == 1) {
+ return deserializeV1(serialized);
+ }
+ throw new IOException("Unknown version: " + version);
+ }
+
+ private static FileSourceSplit deserializeV1(byte[] serialized) throws IOException {
+ final DataInputDeserializer in = new DataInputDeserializer(serialized);
+
+ final String id = in.readUTF();
+ final Path path = new Path();
+ path.read(in);
+ final long offset = in.readLong();
+ final long len = in.readLong();
+ final String[] hosts = readStringArray(in);
+
+ final CheckpointedPosition readerPosition = in.readBoolean()
+ ? new CheckpointedPosition(in.readLong(), in.readLong()) : null;
+
+ // instantiate a new split and cache the serialized form
+ return new FileSourceSplit(id, path, offset, len, hosts, readerPosition, serialized);
+ }
+
+ private static void writeStringArray(DataOutputView out, String[] strings) throws IOException {
+ out.writeInt(strings.length);
+ for (String string : strings) {
+ out.writeUTF(string);
+ }
+ }
+
+ private static String[] readStringArray(DataInputView in) throws IOException {
+ final int len = in.readInt();
+ final String[] strings = new String[len];
+ for (int i = 0; i < len; i++) {
+ strings[i] = in.readUTF();
+ }
+ return strings;
+ }
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSourceSplitState.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSourceSplitState.java
new file mode 100644
index 0000000..e1f6b68
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSourceSplitState.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.file.src.util.CheckpointedPosition;
+
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * State of the reader, essentially a mutable version of the {@link FileSourceSplit}.
+ * Has a modifiable offset and records-to-skip-count.
+ *
+ * <p>The {@link FileSourceSplit} assigned to the reader or stored in the checkpoint points to the
+ * position from where to start reading (after recovery), so the current offset and records-to-skip
+ * need to always point to the record after the last emitted record.
+ */
+@PublicEvolving
+public final class FileSourceSplitState {
+
+ private final FileSourceSplit split;
+
+ private long offset;
+
+ private long recordsToSkipAfterOffset;
+
+ public FileSourceSplitState(FileSourceSplit split) {
+ this.split = checkNotNull(split);
+
+ final Optional<CheckpointedPosition> readerPosition = split.getReaderPosition();
+ if (readerPosition.isPresent()) {
+ this.offset = readerPosition.get().getOffset();
+ this.recordsToSkipAfterOffset = readerPosition.get().getRecordsAfterOffset();
+ } else {
+ this.offset = CheckpointedPosition.NO_OFFSET;
+ this.recordsToSkipAfterOffset = 0L;
+ }
+ }
+
+ public long getOffset() {
+ return offset;
+ }
+
+ public long getRecordsToSkipAfterOffset() {
+ return recordsToSkipAfterOffset;
+ }
+
+ public void setOffset(long offset) {
+ // we skip sanity / boundary checks here for efficiency.
+ // illegal boundaries will eventually be caught when constructing the split on checkpoint.
+ this.offset = offset;
+ }
+
+ public void setRecordsToSkipAfterOffset(long recordsToSkipAfterOffset) {
+ // we skip sanity / boundary checks here for efficiency.
+ // illegal boundaries will eventually be caught when constructing the split on checkpoint.
+ this.recordsToSkipAfterOffset = recordsToSkipAfterOffset;
+ }
+
+ public void setPosition(long offset, long recordsToSkipAfterOffset) {
+ // we skip sanity / boundary checks here for efficiency.
+ // illegal boundaries will eventually be caught when constructing the split on checkpoint.
+ this.offset = offset;
+ this.recordsToSkipAfterOffset = recordsToSkipAfterOffset;
+ }
+
+ public void setPosition(CheckpointedPosition position) {
+ this.offset = position.getOffset();
+ this.recordsToSkipAfterOffset = position.getRecordsAfterOffset();
+ }
+
+ /**
+ * Use the current row count as the starting row count to create a new FileSourceSplit.
+ */
+ public FileSourceSplit toFileSourceSplit() {
+ final CheckpointedPosition position =
+ (offset == CheckpointedPosition.NO_OFFSET && recordsToSkipAfterOffset == 0) ?
+ null : new CheckpointedPosition(offset, recordsToSkipAfterOffset);
+
+ return new FileSourceSplit(
+ split.splitId(),
+ split.path(),
+ split.offset(),
+ split.length(),
+ split.hostnames(),
+ position);
+ }
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/PendingSplitsCheckpoint.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/PendingSplitsCheckpoint.java
new file mode 100644
index 0000000..1515690
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/PendingSplitsCheckpoint.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.fs.Path;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A checkpoint of the current state of the containing the currently pending splits that are not yet assigned.
+ */
+@PublicEvolving
+public final class PendingSplitsCheckpoint {
+
+ /** The splits in the checkpoint. */
+ private final Collection<FileSourceSplit> splits;
+
+ /** The paths that are no longer in the enumerator checkpoint, but have been processed
+ * before and should this be ignored. Relevant only for sources in continuous monitoring mode. */
+ private final Collection<Path> alreadyProcessedPaths;
+
+ /** The cached byte representation from the last serialization step. This helps to avoid
+ * paying repeated serialization cost for the same checkpoint object. This field is used
+ * by {@link PendingSplitsCheckpointSerializer}. */
+ @Nullable
+ byte[] serializedFormCache;
+
+ private PendingSplitsCheckpoint(Collection<FileSourceSplit> splits, Collection<Path> alreadyProcessedPaths) {
+ this.splits = Collections.unmodifiableCollection(splits);
+ this.alreadyProcessedPaths = Collections.unmodifiableCollection(alreadyProcessedPaths);
+ }
+
+ // ------------------------------------------------------------------------
+
+ public Collection<FileSourceSplit> getSplits() {
+ return splits;
+ }
+
+ public Collection<Path> getAlreadyProcessedPaths() {
+ return alreadyProcessedPaths;
+ }
+
+ // ------------------------------------------------------------------------
+ // factories
+ // ------------------------------------------------------------------------
+
+ public static PendingSplitsCheckpoint fromCollectionSnapshot(Collection<FileSourceSplit> splits) {
+ checkNotNull(splits);
+
+ // create a copy of the collection to make sure this checkpoint is immutable
+ final Collection<FileSourceSplit> copy = new ArrayList<>(splits);
+ return new PendingSplitsCheckpoint(copy, Collections.emptySet());
+ }
+
+ public static PendingSplitsCheckpoint fromCollectionSnapshot(
+ Collection<FileSourceSplit> splits,
+ Collection<Path> alreadyProcessedPaths) {
+ checkNotNull(splits);
+
+ // create a copy of the collection to make sure this checkpoint is immutable
+ final Collection<FileSourceSplit> splitsCopy = new ArrayList<>(splits);
+ final Collection<Path> pathsCopy = new ArrayList<>(alreadyProcessedPaths);
+
+ return new PendingSplitsCheckpoint(splitsCopy, pathsCopy);
+ }
+
+ static PendingSplitsCheckpoint reusingCollection(Collection<FileSourceSplit> splits, Collection<Path> alreadyProcessedPaths) {
+ return new PendingSplitsCheckpoint(splits, alreadyProcessedPaths);
+ }
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/PendingSplitsCheckpointSerializer.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/PendingSplitsCheckpointSerializer.java
new file mode 100644
index 0000000..e87ca7d
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/PendingSplitsCheckpointSerializer.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A serializer for the {@link PendingSplitsCheckpoint}.
+ */
+@PublicEvolving
+public final class PendingSplitsCheckpointSerializer implements SimpleVersionedSerializer<PendingSplitsCheckpoint> {
+
+ public static final PendingSplitsCheckpointSerializer INSTANCE = new PendingSplitsCheckpointSerializer();
+
+ private static final int VERSION = 1;
+
+ private static final int VERSION_1_MAGIC_NUMBER = 0xDEADBEEF;
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public int getVersion() {
+ return VERSION;
+ }
+
+ @Override
+ public byte[] serialize(PendingSplitsCheckpoint checkpoint) throws IOException {
+ checkArgument(checkpoint.getClass() == PendingSplitsCheckpoint.class,
+ "Cannot serialize subclasses of PendingSplitsCheckpoint");
+
+ // optimization: the splits lazily cache their own serialized form
+ if (checkpoint.serializedFormCache != null) {
+ return checkpoint.serializedFormCache;
+ }
+
+ final FileSourceSplitSerializer serializer = FileSourceSplitSerializer.INSTANCE;
+ final Collection<FileSourceSplit> splits = checkpoint.getSplits();
+ final Collection<Path> processedPaths = checkpoint.getAlreadyProcessedPaths();
+
+ final ArrayList<byte[]> serializedSplits = new ArrayList<>(splits.size());
+ final ArrayList<byte[]> serializedPaths = new ArrayList<>(processedPaths.size());
+
+ int totalLen = 16; // four ints: magic, version, count splits, count paths
+
+ for (FileSourceSplit split : splits) {
+ final byte[] serSplit = serializer.serialize(split);
+ serializedSplits.add(serSplit);
+ totalLen += serSplit.length + 4; // 4 bytes for the length field
+ }
+
+ for (Path path : processedPaths) {
+ final byte[] serPath = path.toString().getBytes(StandardCharsets.UTF_8);
+ serializedPaths.add(serPath);
+ totalLen += serPath.length + 4; // 4 bytes for the length field
+ }
+
+ final byte[] result = new byte[totalLen];
+ final ByteBuffer byteBuffer = ByteBuffer.wrap(result).order(ByteOrder.LITTLE_ENDIAN);
+ byteBuffer.putInt(VERSION_1_MAGIC_NUMBER);
+ byteBuffer.putInt(serializer.getVersion());
+ byteBuffer.putInt(serializedSplits.size());
+ byteBuffer.putInt(serializedPaths.size());
+
+ for (byte[] splitBytes : serializedSplits) {
+ byteBuffer.putInt(splitBytes.length);
+ byteBuffer.put(splitBytes);
+ }
+
+ for (byte[] pathBytes : serializedPaths) {
+ byteBuffer.putInt(pathBytes.length);
+ byteBuffer.put(pathBytes);
+ }
+
+ assert byteBuffer.remaining() == 0;
+
+ // optimization: cache the serialized from, so we avoid the byte work during repeated serialization
+ checkpoint.serializedFormCache = result;
+
+ return result;
+ }
+
+ @Override
+ public PendingSplitsCheckpoint deserialize(int version, byte[] serialized) throws IOException {
+ if (version == 1) {
+ return deserializeV1(serialized);
+ }
+ throw new IOException("Unknown version: " + version);
+ }
+
+ private static PendingSplitsCheckpoint deserializeV1(byte[] serialized) throws IOException {
+ final ByteBuffer bb = ByteBuffer.wrap(serialized).order(ByteOrder.LITTLE_ENDIAN);
+
+ final int magic = bb.getInt();
+ if (magic != VERSION_1_MAGIC_NUMBER) {
+ throw new IOException(String.format("Invalid magic number for PendingSplitsCheckpoint. " +
+ "Expected: %X , found %X", VERSION_1_MAGIC_NUMBER, magic));
+ }
+
+ final int version = bb.getInt();
+ final int numSplits = bb.getInt();
+ final int numPaths = bb.getInt();
+
+ final FileSourceSplitSerializer serializer = FileSourceSplitSerializer.INSTANCE;
+ final ArrayList<FileSourceSplit> splits = new ArrayList<>(numSplits);
+ final ArrayList<Path> paths = new ArrayList<>(numPaths);
+
+ for (int remaining = numSplits; remaining > 0; remaining--) {
+ final byte[] bytes = new byte[bb.getInt()];
+ bb.get(bytes);
+ final FileSourceSplit split = serializer.deserialize(version, bytes);
+ splits.add(split);
+ }
+
+ for (int remaining = numPaths; remaining > 0; remaining--) {
+ final byte[] bytes = new byte[bb.getInt()];
+ bb.get(bytes);
+ final Path path = new Path(new String(bytes, StandardCharsets.UTF_8));
+ paths.add(path);
+ }
+
+ return PendingSplitsCheckpoint.reusingCollection(splits, paths);
+ }
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/assigners/FileSplitAssigner.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/assigners/FileSplitAssigner.java
new file mode 100644
index 0000000..9182b61
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/assigners/FileSplitAssigner.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.assigners;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Optional;
+
+/**
+ * The {@code FileSplitAssigner} is responsible for deciding what split should be processed next by
+ * which node. It determines split processing order and locality.
+ */
+@PublicEvolving
+public interface FileSplitAssigner {
+
+ /**
+ * Gets the next split.
+ *
+ * <p>When this method returns an empty {@code Optional}, then the set of splits is
+ * assumed to be done and the source will finish once the readers finished their current
+ * splits.
+ */
+ Optional<FileSourceSplit> getNext(@Nullable String hostname);
+
+ /**
+ * Adds a set of splits to this assigner. This happens for example when some split processing
+ * failed and the splits need to be re-added, or when new splits got discovered.
+ */
+ void addSplits(Collection<FileSourceSplit> splits);
+
+ /**
+ * Gets the remaining splits that this assigner has pending.
+ */
+ Collection<FileSourceSplit> remainingSplits();
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * Factory for the {@code FileSplitAssigner}, to allow the {@code FileSplitAssigner} to be eagerly
+ * initialized and to not be serializable.
+ */
+ @FunctionalInterface
+ interface Provider extends Serializable {
+
+ /**
+ * Creates a new {@code FileSplitAssigner} that starts with the given set of initial splits.
+ */
+ FileSplitAssigner create(Collection<FileSourceSplit> initialSplits);
+ }
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/assigners/SimpleSplitAssigner.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/assigners/SimpleSplitAssigner.java
new file mode 100644
index 0000000..ed52f97
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/assigners/SimpleSplitAssigner.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.assigners;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Optional;
+
+/**
+ * The {@code SimpleSplitAssigner} hands out splits in a random order, without any consideration
+ * for order or locality.
+ */
+@PublicEvolving
+public class SimpleSplitAssigner implements FileSplitAssigner {
+
+ private final ArrayList<FileSourceSplit> splits;
+
+ public SimpleSplitAssigner(Collection<FileSourceSplit> splits) {
+ this.splits = new ArrayList<>(splits);
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public Optional<FileSourceSplit> getNext(String hostname) {
+ final int size = splits.size();
+ return size == 0 ? Optional.empty() : Optional.of(splits.remove(size - 1));
+ }
+
+ @Override
+ public void addSplits(Collection<FileSourceSplit> newSplits) {
+ splits.addAll(newSplits);
+ }
+
+ @Override
+ public Collection<FileSourceSplit> remainingSplits() {
+ return splits;
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public String toString() {
+ return "SimpleSplitAssigner " + splits;
+ }
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/compression/StandardDeCompressors.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/compression/StandardDeCompressors.java
new file mode 100644
index 0000000..14a732e
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/compression/StandardDeCompressors.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.compression;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.io.compression.Bzip2InputStreamFactory;
+import org.apache.flink.api.common.io.compression.DeflateInflaterInputStreamFactory;
+import org.apache.flink.api.common.io.compression.GzipInflaterInputStreamFactory;
+import org.apache.flink.api.common.io.compression.InflaterInputStreamFactory;
+import org.apache.flink.api.common.io.compression.XZInputStreamFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * A collection of common compression formats and de-compressors.
+ */
+@PublicEvolving
+public final class StandardDeCompressors {
+
+ /** All supported file compression formats, by common file extensions. */
+ private static final Map<String, InflaterInputStreamFactory<?>> DECOMPRESSORS =
+ buildDecompressorMap(
+ DeflateInflaterInputStreamFactory.getInstance(),
+ GzipInflaterInputStreamFactory.getInstance(),
+ Bzip2InputStreamFactory.getInstance(),
+ XZInputStreamFactory.getInstance());
+
+ /** All common file extensions of supported file compression formats. */
+ private static final Collection<String> COMMON_SUFFIXES =
+ Collections.unmodifiableList(new ArrayList<>(DECOMPRESSORS.keySet()));
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * Gets all common file extensions of supported file compression formats.
+ */
+ public static Collection<String> getCommonSuffixes() {
+ return COMMON_SUFFIXES;
+ }
+
+ /**
+ * Gets the decompressor for a file extension. Returns null if there is no decompressor for this
+ * file extension.
+ */
+ @Nullable
+ public static InflaterInputStreamFactory<?> getDecompressorForExtension(String extension) {
+ return DECOMPRESSORS.get(extension);
+ }
+
+ /**
+ * Gets the decompressor for a file name. This checks the file against all known and supported
+ * file extensions.
+ * Returns null if there is no decompressor for this file name.
+ */
+ @Nullable
+ public static InflaterInputStreamFactory<?> getDecompressorForFileName(String fileName) {
+ for (final Map.Entry<String, InflaterInputStreamFactory<?>> entry : DECOMPRESSORS.entrySet()) {
+ if (fileName.endsWith(entry.getKey())) {
+ return entry.getValue();
+ }
+ }
+ return null;
+ }
+
+ // ------------------------------------------------------------------------
+
+ private static Map<String, InflaterInputStreamFactory<?>> buildDecompressorMap(
+ final InflaterInputStreamFactory<?>... decompressors) {
+
+ final LinkedHashMap<String, InflaterInputStreamFactory<?>> map = new LinkedHashMap<>(decompressors.length);
+ for (InflaterInputStreamFactory<?> decompressor : decompressors) {
+ for (String suffix : decompressor.getCommonFileExtensions()) {
+ map.put(suffix, decompressor);
+ }
+ }
+ return map;
+ }
+
+ // ------------------------------------------------------------------------
+
+ /** This class has purely static utility methods and is not meant to be instantiated. */
+ private StandardDeCompressors() {}
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/BlockSplittingRecursiveEnumerator.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/BlockSplittingRecursiveEnumerator.java
new file mode 100644
index 0000000..f475a68
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/BlockSplittingRecursiveEnumerator.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.enumerate;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.compression.StandardDeCompressors;
+import org.apache.flink.core.fs.BlockLocation;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.function.Predicate;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This {@code FileEnumerator} enumerates all files under the given paths recursively,
+ * and creates a separate split for each file block.
+ *
+ * <p>Please note that file blocks are only exposed by some file systems, such as HDFS. File systems
+ * that do not expose block information will not create multiple file splits per file, but keep
+ * the files as one source split.
+ *
+ * <p>Files with suffixes corresponding to known compression formats (for example '.gzip', '.bz2', ...)
+ * will not be split. See {@link StandardDeCompressors} for a list of known formats and suffixes.
+ *
+ * <p>The default instantiation of this enumerator filters files with the common hidden file prefixes
+ * '.' and '_'. A custom file filter can be specified.
+ */
+@PublicEvolving
+public class BlockSplittingRecursiveEnumerator extends NonSplittingRecursiveEnumerator {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BlockSplittingRecursiveEnumerator.class);
+
+ private final String[] nonSplittableFileSuffixes;
+
+ /**
+ * Creates a new enumerator that enumerates all files except hidden files.
+ * Hidden files are considered files where the filename starts with '.' or with '_'.
+ *
+ * <p>The enumerator does not split files that have a suffix corresponding to a known
+ * compression format (for example '.gzip', '.bz2', '.xy', '.zip', ...).
+ * See {@link StandardDeCompressors} for details.
+ */
+ public BlockSplittingRecursiveEnumerator() {
+ this(new DefaultFileFilter(), StandardDeCompressors.getCommonSuffixes().toArray(new String[0]));
+ }
+
+ /**
+ * Creates a new enumerator that uses the given predicate as a filter
+ * for file paths, and avoids splitting files with the given extension (typically
+ * to avoid splitting compressed files).
+ */
+ public BlockSplittingRecursiveEnumerator(
+ final Predicate<Path> fileFilter,
+ final String[] nonSplittableFileSuffixes) {
+ super(fileFilter);
+ this.nonSplittableFileSuffixes = checkNotNull(nonSplittableFileSuffixes);
+ }
+
+ protected void convertToSourceSplits(
+ final FileStatus file,
+ final FileSystem fs,
+ final List<FileSourceSplit> target) throws IOException {
+
+ if (!isFileSplittable(file.getPath())) {
+ super.convertToSourceSplits(file, fs, target);
+ return;
+ }
+
+ final BlockLocation[] blocks = getBlockLocationsForFile(file, fs);
+ if (blocks == null) {
+ target.add(new FileSourceSplit(getNextId(), file.getPath(), 0L, file.getLen()));
+ } else {
+ for (BlockLocation block : blocks) {
+ target.add(new FileSourceSplit(
+ getNextId(),
+ file.getPath(),
+ block.getOffset(),
+ block.getLength(),
+ block.getHosts()));
+ }
+ }
+ }
+
+ protected boolean isFileSplittable(Path filePath) {
+ if (nonSplittableFileSuffixes.length == 0) {
+ return true;
+ }
+
+ final String path = filePath.getPath();
+ for (String suffix : nonSplittableFileSuffixes) {
+ if (path.endsWith(suffix)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Nullable
+ private static BlockLocation[] getBlockLocationsForFile(FileStatus file, FileSystem fs) throws IOException {
+ final long len = file.getLen();
+
+ final BlockLocation[] blocks = fs.getFileBlockLocations(file, 0, len);
+ if (blocks == null || blocks.length == 0) {
+ return null;
+ }
+
+ // a cheap check whether we have all blocks. we don't check whether the blocks fully cover the
+ // file (too expensive) but make some sanity checks to catch early the common cases where incorrect
+ // bloc info is returned by the implementation.
+
+ long totalLen = 0L;
+ for (BlockLocation block : blocks) {
+ totalLen += block.getLength();
+ }
+ if (totalLen != len) {
+ LOG.warn("Block lengths do not match file length for {}. File length is {}, blocks are {}",
+ file.getPath(), len, Arrays.toString(blocks));
+ return null;
+ }
+
+ return blocks;
+ }
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/DefaultFileFilter.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/DefaultFileFilter.java
new file mode 100644
index 0000000..7ac7d46
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/DefaultFileFilter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.enumerate;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.fs.Path;
+
+import java.util.function.Predicate;
+
+/**
+ * A file filter that filters out hidden files based on common naming patterns,
+ * i.e., files where the filename starts with '.' or with '_'.
+ */
+@PublicEvolving
+public final class DefaultFileFilter implements Predicate<Path> {
+
+ @Override
+ public boolean test(Path path) {
+ final String fileName = path.getName();
+ if (fileName == null || fileName.length() == 0) {
+ return true;
+ }
+ final char first = fileName.charAt(0);
+ return first != '.' && first != '_';
+ }
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/FileEnumerator.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/FileEnumerator.java
new file mode 100644
index 0000000..67c2790
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/FileEnumerator.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.enumerate;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.core.fs.Path;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collection;
+
+/**
+ * The {@code FileEnumerator}'s task is to discover all files to be read and to split them into
+ * a set of {@link FileSourceSplit}.
+ *
+ * <p>This includes possibly, path traversals, file filtering (by name or other patterns) and
+ * deciding whether to split files into multiple splits, and how to split them.
+ */
+@PublicEvolving
+public interface FileEnumerator {
+
+ /**
+ * Generates all file splits for the relevant files under the given paths.
+ * The {@code minDesiredSplits} is an optional hint indicating how many splits would be necessary
+ * to exploit parallelism properly.
+ */
+ Collection<FileSourceSplit> enumerateSplits(Path[] paths, int minDesiredSplits) throws IOException;
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * Factory for the {@code FileEnumerator}, to allow the {@code FileEnumerator} to be eagerly
+ * initialized and to not be serializable.
+ */
+ @FunctionalInterface
+ interface Provider extends Serializable {
+
+ FileEnumerator create();
+ }
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/NonSplittingRecursiveEnumerator.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/NonSplittingRecursiveEnumerator.java
new file mode 100644
index 0000000..a9a06f3
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/NonSplittingRecursiveEnumerator.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.enumerate;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.core.fs.BlockLocation;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.StringUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.function.Predicate;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This {@code FileEnumerator} enumerates all files under the given paths recursively.
+ * Each file becomes one split; this enumerator does not split files into smaller "block" units.
+ *
+ * <p>The default instantiation of this enumerator filters files with the common hidden file prefixes
+ * '.' and '_'. A custom file filter can be specified.
+ */
+@PublicEvolving
+public class NonSplittingRecursiveEnumerator implements FileEnumerator {
+
+ /** The filter predicate to filter out unwanted files. */
+ private final Predicate<Path> fileFilter;
+
+ /** The current Id as a mutable string representation. This covers more values than the
+ * integer value range, so we should never overflow. */
+ private final char[] currentId = "0000000000".toCharArray();
+
+ /**
+ * Creates a NonSplittingRecursiveEnumerator that enumerates all files except hidden files.
+ * Hidden files are considered files where the filename starts with '.' or with '_'.
+ */
+ public NonSplittingRecursiveEnumerator() {
+ this(new DefaultFileFilter());
+ }
+
+ /**
+ * Creates a NonSplittingRecursiveEnumerator that uses the given predicate as a filter
+ * for file paths.
+ */
+ public NonSplittingRecursiveEnumerator(Predicate<Path> fileFilter) {
+ this.fileFilter = checkNotNull(fileFilter);
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public Collection<FileSourceSplit> enumerateSplits(Path[] paths, int minDesiredSplits) throws IOException {
+ final ArrayList<FileSourceSplit> splits = new ArrayList<>();
+
+ for (Path path : paths) {
+ final FileSystem fs = path.getFileSystem();
+ final FileStatus status = fs.getFileStatus(path);
+ addSplitsForPath(status, fs, splits);
+ }
+
+ return splits;
+ }
+
+ private void addSplitsForPath(FileStatus fileStatus, FileSystem fs, ArrayList<FileSourceSplit> target) throws IOException {
+ if (!fileFilter.test(fileStatus.getPath())) {
+ return;
+ }
+
+ if (!fileStatus.isDir()) {
+ convertToSourceSplits(fileStatus, fs, target);
+ return;
+ }
+
+ final FileStatus[] containedFiles = fs.listStatus(fileStatus.getPath());
+ for (FileStatus containedStatus : containedFiles) {
+ addSplitsForPath(containedStatus, fs, target);
+ }
+ }
+
+ protected void convertToSourceSplits(
+ final FileStatus file,
+ final FileSystem fs,
+ final List<FileSourceSplit> target) throws IOException {
+
+ final String[] hosts = getHostsFromBlockLocations(fs.getFileBlockLocations(file, 0L, file.getLen()));
+ target.add(new FileSourceSplit(getNextId(), file.getPath(), 0, file.getLen(), hosts));
+ }
+
+ protected final String getNextId() {
+ // because we just increment numbers, we increment the char representation directly,
+ // rather than incrementing an integer and converting it to a string representation
+ // every time again (requires quite some expensive conversion logic).
+ incrementCharArrayByOne(currentId, currentId.length - 1);
+ return new String(currentId);
+ }
+
+ private static String[] getHostsFromBlockLocations(BlockLocation[] blockLocations) throws IOException {
+ if (blockLocations.length == 0) {
+ return StringUtils.EMPTY_STRING_ARRAY;
+ }
+ if (blockLocations.length == 1) {
+ return blockLocations[0].getHosts();
+ }
+ final LinkedHashSet<String> allHosts = new LinkedHashSet<>();
+ for (BlockLocation block : blockLocations) {
+ allHosts.addAll(Arrays.asList(block.getHosts()));
+ }
+ return allHosts.toArray(new String[allHosts.size()]);
+ }
+
+ private static void incrementCharArrayByOne(char[] array, int pos) {
+ char c = array[pos];
+ c++;
+
+ if (c > '9') {
+ c = '0';
+ incrementCharArrayByOne(array, pos - 1);
+ }
+ array[pos] = c;
+ }
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/ContinuousFileSplitEnumerator.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/ContinuousFileSplitEnumerator.java
new file mode 100644
index 0000000..72fa0ad
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/ContinuousFileSplitEnumerator.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.impl;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.connector.base.source.event.RequestSplitEvent;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.PendingSplitsCheckpoint;
+import org.apache.flink.connector.file.src.assigners.FileSplitAssigner;
+import org.apache.flink.connector.file.src.enumerate.FileEnumerator;
+import org.apache.flink.core.fs.Path;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A continuously monitoring enumerator.
+ */
+@Internal
+public class ContinuousFileSplitEnumerator implements SplitEnumerator<FileSourceSplit, PendingSplitsCheckpoint> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ContinuousFileSplitEnumerator.class);
+
+ private final SplitEnumeratorContext<FileSourceSplit> context;
+
+ private final FileSplitAssigner splitAssigner;
+
+ private final FileEnumerator enumerator;
+
+ private final HashSet<Path> pathsAlreadyProcessed;
+
+ private final LinkedHashMap<Integer, String> readersAwaitingSplit;
+
+ private final Path[] paths;
+
+ private final long discoveryInterval;
+
+ // ------------------------------------------------------------------------
+
+ public ContinuousFileSplitEnumerator(
+ SplitEnumeratorContext<FileSourceSplit> context,
+ FileEnumerator enumerator,
+ FileSplitAssigner splitAssigner,
+ Path[] paths,
+ Collection<Path> alreadyDiscoveredPaths,
+ long discoveryInterval) {
+
+ checkArgument(discoveryInterval > 0L);
+ this.context = checkNotNull(context);
+ this.enumerator = checkNotNull(enumerator);
+ this.splitAssigner = checkNotNull(splitAssigner);
+ this.paths = paths;
+ this.discoveryInterval = discoveryInterval;
+ this.pathsAlreadyProcessed = new HashSet<>(alreadyDiscoveredPaths);
+ this.readersAwaitingSplit = new LinkedHashMap<>();
+ }
+
+ @Override
+ public void start() {
+ context.callAsync(
+ () -> enumerator.enumerateSplits(paths, 1),
+ this::processDiscoveredSplits,
+ discoveryInterval, discoveryInterval);
+ }
+
+ @Override
+ public void close() throws IOException {
+ // no resources to close
+ }
+
+ @Override
+ public void addReader(int subtaskId) {
+ // this source is purely lazy-pull-based, nothing to do upon registration
+ }
+
+ @Override
+ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
+ if (sourceEvent instanceof RequestSplitEvent) {
+ readersAwaitingSplit.put(subtaskId, ((RequestSplitEvent) sourceEvent).hostName());
+ assignSplits();
+ }
+ else {
+ LOG.error("Received unrecognized event: {}", sourceEvent);
+ }
+ }
+
+ @Override
+ public void addSplitsBack(List<FileSourceSplit> splits, int subtaskId) {
+ LOG.debug("File Source Enumerator adds splits back: {}", splits);
+ splitAssigner.addSplits(splits);
+ }
+
+ @Override
+ public PendingSplitsCheckpoint snapshotState() throws Exception {
+ return PendingSplitsCheckpoint.fromCollectionSnapshot(splitAssigner.remainingSplits(), pathsAlreadyProcessed);
+ }
+
+ // ------------------------------------------------------------------------
+
+ private void processDiscoveredSplits(Collection<FileSourceSplit> splits, Throwable error) {
+ if (error != null) {
+ LOG.error("Failed to enumerate files", error);
+ return;
+ }
+
+ final Collection<FileSourceSplit> newSplits = splits.stream()
+ .filter((split) -> pathsAlreadyProcessed.add(split.path()))
+ .collect(Collectors.toList());
+ splitAssigner.addSplits(newSplits);
+
+ assignSplits();
+ }
+
+ private void assignSplits() {
+ final Iterator<Map.Entry<Integer, String>> awaitingReader = readersAwaitingSplit.entrySet().iterator();
+
+ while (awaitingReader.hasNext()) {
+ final Map.Entry<Integer, String> nextAwaiting = awaitingReader.next();
+ final String hostname = nextAwaiting.getValue();
+ final int awaitingSubtask = nextAwaiting.getKey();
+ final Optional<FileSourceSplit> nextSplit = splitAssigner.getNext(hostname);
+ if (nextSplit.isPresent()) {
+ context.assignSplit(nextSplit.get(), awaitingSubtask);
+ awaitingReader.remove();
+ } else {
+ break;
+ }
+ }
+ }
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/FileRecordFormatAdapter.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/FileRecordFormatAdapter.java
new file mode 100644
index 0000000..e084926
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/FileRecordFormatAdapter.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.impl;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connector.file.src.reader.FileRecordFormat;
+import org.apache.flink.connector.file.src.util.CheckpointedPosition;
+import org.apache.flink.connector.file.src.util.IteratorResultIterator;
+import org.apache.flink.core.fs.Path;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import static org.apache.flink.connector.file.src.util.Utils.doWithCleanupOnException;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The FormatReaderAdapter turns a {@link FileRecordFormat} into a {@link BulkFormat}.
+ */
+@Internal
+public final class FileRecordFormatAdapter<T> implements BulkFormat<T> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final FileRecordFormat<T> fileFormat;
+
+ public FileRecordFormatAdapter(FileRecordFormat<T> fileFormat) {
+ this.fileFormat = checkNotNull(fileFormat);
+ }
+
+ @Override
+ public BulkFormat.Reader<T> createReader(
+ final Configuration config,
+ final Path filePath,
+ final long splitOffset,
+ final long splitLength) throws IOException {
+
+ final FileRecordFormat.Reader<T> reader = fileFormat.createReader(config, filePath, splitOffset, splitLength);
+ return doWithCleanupOnException(reader, () -> {
+ //noinspection CodeBlock2Expr
+ return wrapReader(reader, config, CheckpointedPosition.NO_OFFSET, 0L);
+ });
+ }
+
+ @Override
+ public BulkFormat.Reader<T> restoreReader(
+ final Configuration config,
+ final Path filePath,
+ final long splitOffset,
+ final long splitLength,
+ final CheckpointedPosition checkpointedPosition) throws IOException {
+
+ final FileRecordFormat.Reader<T> reader = checkpointedPosition.getOffset() == CheckpointedPosition.NO_OFFSET
+ ? fileFormat.createReader(config, filePath, splitOffset, splitLength)
+ : fileFormat.restoreReader(config, filePath, checkpointedPosition.getOffset(), splitOffset, splitLength);
+
+ return doWithCleanupOnException(reader, () -> {
+ long remaining = checkpointedPosition.getRecordsAfterOffset();
+ while (remaining > 0 && reader.read() != null) {
+ remaining--;
+ }
+
+ return wrapReader(reader, config, checkpointedPosition.getOffset(), checkpointedPosition.getRecordsAfterOffset());
+ });
+ }
+
+ @Override
+ public boolean isSplittable() {
+ return fileFormat.isSplittable();
+ }
+
+ @Override
+ public TypeInformation<T> getProducedType() {
+ return fileFormat.getProducedType();
+ }
+
+ private static <T> Reader<T> wrapReader(
+ final FileRecordFormat.Reader<T> reader,
+ final Configuration config,
+ final long startingOffset,
+ final long startingSkipCount) {
+
+ final int numRecordsPerBatch = config.get(FileRecordFormat.RECORDS_PER_FETCH);
+ return new Reader<>(reader, numRecordsPerBatch, startingOffset, startingSkipCount);
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * The reader adapter, from {@link FileRecordFormat.Reader} to {@link BulkFormat.Reader}.
+ */
+ public static final class Reader<T> implements BulkFormat.Reader<T> {
+
+ private final FileRecordFormat.Reader<T> reader;
+ private final int numRecordsPerBatch;
+
+ private long lastOffset;
+ private long lastRecordsAfterOffset;
+
+ Reader(
+ final FileRecordFormat.Reader<T> reader,
+ final int numRecordsPerBatch,
+ final long initialOffset,
+ final long initialSkipCount) {
+ checkArgument(numRecordsPerBatch > 0, "numRecordsPerBatch must be > 0");
+ this.reader = checkNotNull(reader);
+ this.numRecordsPerBatch = numRecordsPerBatch;
+ this.lastOffset = initialOffset;
+ this.lastRecordsAfterOffset = initialSkipCount;
+ }
+
+ @Nullable
+ @Override
+ public RecordIterator<T> readBatch() throws IOException {
+ updateCheckpointablePosition();
+
+ final ArrayList<T> result = new ArrayList<>(numRecordsPerBatch);
+ T next;
+ int remaining = numRecordsPerBatch;
+ while (remaining-- > 0 && ((next = reader.read()) != null)) {
+ result.add(next);
+ }
+
+ if (result.isEmpty()) {
+ return null;
+ }
+
+ final RecordIterator<T> iter = new IteratorResultIterator<>(result.iterator(), lastOffset, lastRecordsAfterOffset);
+ lastRecordsAfterOffset += result.size();
+ return iter;
+ }
+
+ @Override
+ public void close() throws IOException {
+ reader.close();
+ }
+
+ private void updateCheckpointablePosition() {
+ final CheckpointedPosition position = reader.getCheckpointedPosition();
+ if (position != null) {
+ this.lastOffset = position.getOffset();
+ this.lastRecordsAfterOffset = position.getRecordsAfterOffset();
+ }
+ }
+ }
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/FileRecords.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/FileRecords.java
new file mode 100644
index 0000000..b3d2157f
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/FileRecords.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.impl;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.Set;
+
+/**
+ * A collection of records for one file split.
+ *
+ * <p>This is essentially a slim wrapper around the {@link BulkFormat.RecordIterator} that only
+ * adds information about the current split, or finished splits (to keep knowledge about current split
+ * IDs out of the reader formats).
+ */
+@Internal
+public final class FileRecords<T> implements RecordsWithSplitIds<RecordAndPosition<T>> {
+
+ @Nullable
+ private String splitId;
+
+ @Nullable
+ private BulkFormat.RecordIterator<T> recordsForSplitCurrent;
+
+ @Nullable
+ private final BulkFormat.RecordIterator<T> recordsForSplit;
+
+ private final Set<String> finishedSplits;
+
+ private FileRecords(
+ @Nullable String splitId,
+ @Nullable BulkFormat.RecordIterator<T> recordsForSplit,
+ Set<String> finishedSplits) {
+
+ this.splitId = splitId;
+ this.recordsForSplit = recordsForSplit;
+ this.finishedSplits = finishedSplits;
+ }
+
+ @Nullable
+ @Override
+ public String nextSplit() {
+ // move the split one (from current value to null)
+ final String nextSplit = this.splitId;
+ this.splitId = null;
+
+ // move the iterator, from null to value (if first move) or to null (if second move)
+ this.recordsForSplitCurrent = nextSplit != null ? this.recordsForSplit : null;
+
+ return nextSplit;
+ }
+
+ @Nullable
+ @Override
+ public RecordAndPosition<T> nextRecordFromSplit() {
+ final BulkFormat.RecordIterator<T> recordsForSplit = this.recordsForSplitCurrent;
+ if (recordsForSplit != null) {
+ return recordsForSplit.next();
+ } else {
+ throw new IllegalStateException();
+ }
+ }
+
+ @Override
+ public void recycle() {
+ if (recordsForSplit != null) {
+ recordsForSplit.releaseBatch();
+ }
+ }
+
+ @Override
+ public Set<String> finishedSplits() {
+ return finishedSplits;
+ }
+
+ // ------------------------------------------------------------------------
+
+ public static <T> FileRecords<T> forRecords(
+ final String splitId,
+ final BulkFormat.RecordIterator<T> recordsForSplit) {
+ return new FileRecords<>(splitId, recordsForSplit, Collections.emptySet());
+ }
+
+ public static <T> FileRecords<T> finishedSplit(String splitId) {
+ return new FileRecords<>(null, null, Collections.singleton(splitId));
+ }
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/FileSourceReader.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/FileSourceReader.java
new file mode 100644
index 0000000..08c965e
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/FileSourceReader.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.impl;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.event.RequestSplitEvent;
+import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.FileSourceSplitState;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+
+import java.util.Collection;
+
+/**
+ * A {@link SourceReader} that read records from {@link FileSourceSplit}.
+ */
+@Internal
+public final class FileSourceReader<T>
+ extends SingleThreadMultiplexSourceReaderBase<RecordAndPosition<T>, T, FileSourceSplit, FileSourceSplitState> {
+
+ public FileSourceReader(SourceReaderContext readerContext, BulkFormat<T> readerFormat, Configuration config) {
+ super(
+ () -> new FileSourceSplitReader<>(config, readerFormat),
+ new FileSourceRecordEmitter<>(),
+ config,
+ readerContext);
+ }
+
+ @Override
+ public void start() {
+ requestSplit();
+ }
+
+ @Override
+ protected void onSplitFinished(Collection<String> finishedSplitIds) {
+ requestSplit();
+ }
+
+ @Override
+ protected FileSourceSplitState initializedState(FileSourceSplit split) {
+ return new FileSourceSplitState(split);
+ }
+
+ @Override
+ protected FileSourceSplit toSplitType(String splitId, FileSourceSplitState splitState) {
+ return splitState.toFileSourceSplit();
+ }
+
+ private void requestSplit() {
+ context.sendSourceEventToCoordinator(new RequestSplitEvent(context.getLocalHostName()));
+ }
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/FileSourceRecordEmitter.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/FileSourceRecordEmitter.java
new file mode 100644
index 0000000..5f55159
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/FileSourceRecordEmitter.java
@@ -0,0 +1,47 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.impl;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.connector.base.source.reader.RecordEmitter;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.FileSourceSplitState;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+
+/**
+ * The {@link RecordEmitter} implementation for {@link FileSourceReader}.
+ *
+ * <p>This updates the {@link FileSourceSplit} for every emitted record.
+ * Because the {@link FileSourceSplit} points to the position from where to start reading (after recovery),
+ * the current offset and records-to-skip need to always point to the record after the emitted record.
+ */
+@Internal
+final class FileSourceRecordEmitter<T> implements RecordEmitter<RecordAndPosition<T>, T, FileSourceSplitState> {
+
+ @Override
+ public void emitRecord(
+ final RecordAndPosition<T> element,
+ final SourceOutput<T> output,
+ final FileSourceSplitState splitState) {
+
+ output.collect(element.getRecord());
+ splitState.setPosition(element.getOffset(), element.getRecordSkipCount());
+ }
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/FileSourceSplitReader.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/FileSourceSplitReader.java
new file mode 100644
index 0000000..ed9159b
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/FileSourceSplitReader.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.impl;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connector.file.src.util.CheckpointedPosition;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Optional;
+import java.util.Queue;
+
+/**
+ * The {@link SplitReader} implementation for the file source.
+ */
+@Internal
+final class FileSourceSplitReader<T> implements SplitReader<RecordAndPosition<T>, FileSourceSplit> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FileSourceSplitReader.class);
+
+ private final Configuration config;
+ private final BulkFormat<T> readerFactory;
+
+ private final Queue<FileSourceSplit> splits;
+
+ @Nullable
+ private BulkFormat.Reader<T> currentReader;
+ @Nullable
+ private String currentSplitId;
+
+ public FileSourceSplitReader(Configuration config, BulkFormat<T> readerFactory) {
+ this.config = config;
+ this.readerFactory = readerFactory;
+ this.splits = new ArrayDeque<>();
+ }
+
+ @Override
+ public RecordsWithSplitIds<RecordAndPosition<T>> fetch() throws IOException {
+ checkSplitOrStartNext();
+
+ final BulkFormat.RecordIterator<T> nextBatch = currentReader.readBatch();
+ return nextBatch == null ? finishSplit() : FileRecords.forRecords(currentSplitId, nextBatch);
+ }
+
+ @Override
+ public void handleSplitsChanges(final SplitsChange<FileSourceSplit> splitChange) {
+ if (!(splitChange instanceof SplitsAddition)) {
+ throw new UnsupportedOperationException(String.format(
+ "The SplitChange type of %s is not supported.", splitChange.getClass()));
+ }
+
+ LOG.debug("Handling split change {}", splitChange);
+ splits.addAll(splitChange.splits());
+ }
+
+ @Override
+ public void wakeUp() {}
+
+ private void checkSplitOrStartNext() throws IOException {
+ if (currentReader != null) {
+ return;
+ }
+
+ final FileSourceSplit nextSplit = splits.poll();
+ if (nextSplit == null) {
+ throw new IOException("Cannot fetch from another split - no split remaining");
+ }
+
+ currentSplitId = nextSplit.splitId();
+
+ final Optional<CheckpointedPosition> position = nextSplit.getReaderPosition();
+ currentReader = position.isPresent()
+ ? readerFactory.restoreReader(config, nextSplit.path(), nextSplit.offset(), nextSplit.length(), position.get())
+ : readerFactory.createReader(config, nextSplit.path(), nextSplit.offset(), nextSplit.length());
+ }
+
+ private FileRecords<T> finishSplit() throws IOException {
+ if (currentReader != null) {
+ currentReader.close();
+ currentReader = null;
+ }
+
+ final FileRecords<T> finishRecords = FileRecords.finishedSplit(currentSplitId);
+ currentSplitId = null;
+ return finishRecords;
+ }
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/StaticFileSplitEnumerator.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/StaticFileSplitEnumerator.java
new file mode 100644
index 0000000..05d7897
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/StaticFileSplitEnumerator.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.impl;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.connector.base.source.event.NoMoreSplitsEvent;
+import org.apache.flink.connector.base.source.event.RequestSplitEvent;
+import org.apache.flink.connector.file.src.FileSource;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.PendingSplitsCheckpoint;
+import org.apache.flink.connector.file.src.assigners.FileSplitAssigner;
+import org.apache.flink.connector.file.src.enumerate.FileEnumerator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A SplitEnumerator implementation for bounded / batch {@link FileSource} input.
+ *
+ * <p>This enumerator takes all files that are present in the configured input directories and assigns
+ * them to the readers. Once all files are processed, the source is finished.
+ *
+ * <p>The implementation of this class is rather thin. The actual logic for creating the set of
+ * FileSourceSplits to process, and the logic to decide which reader gets what split, are in
+ * {@link FileEnumerator} and in {@link FileSplitAssigner}, respectively.
+ */
+@Internal
+public class StaticFileSplitEnumerator implements SplitEnumerator<FileSourceSplit, PendingSplitsCheckpoint> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(StaticFileSplitEnumerator.class);
+
+ private final SplitEnumeratorContext<FileSourceSplit> context;
+
+ private final FileSplitAssigner splitAssigner;
+
+ // ------------------------------------------------------------------------
+
+ public StaticFileSplitEnumerator(
+ SplitEnumeratorContext<FileSourceSplit> context,
+ FileSplitAssigner splitAssigner) {
+ this.context = checkNotNull(context);
+ this.splitAssigner = checkNotNull(splitAssigner);
+ }
+
+ @Override
+ public void start() {
+ // no resources to start
+ }
+
+ @Override
+ public void close() throws IOException {
+ // no resources to close
+ }
+
+ @Override
+ public void addReader(int subtaskId) {
+ // this source is purely lazy-pull-based, nothing to do upon registration
+ }
+
+ @Override
+ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
+ if (sourceEvent instanceof RequestSplitEvent) {
+ final RequestSplitEvent requestSplitEvent = (RequestSplitEvent) sourceEvent;
+ assignNextEvents(subtaskId, requestSplitEvent.hostName());
+ }
+ else {
+ LOG.error("Received unrecognized event: {}", sourceEvent);
+ }
+ }
+
+ @Override
+ public void addSplitsBack(List<FileSourceSplit> splits, int subtaskId) {
+ LOG.debug("File Source Enumerator adds splits back: {}", splits);
+ splitAssigner.addSplits(splits);
+ }
+
+ @Override
+ public PendingSplitsCheckpoint snapshotState() {
+ return PendingSplitsCheckpoint.fromCollectionSnapshot(splitAssigner.remainingSplits());
+ }
+
+ // ------------------------------------------------------------------------
+
+ private void assignNextEvents(int subtask, @Nullable String hostname) {
+ if (LOG.isInfoEnabled()) {
+ final String hostInfo = hostname == null ? "(no host locality info)" : "(on host '" + hostname + "')";
+ LOG.info("Subtask {} {} is requesting a file source split", subtask, hostInfo);
+ }
+
+ final Optional<FileSourceSplit> nextSplit = splitAssigner.getNext(hostname);
+ if (nextSplit.isPresent()) {
+ final FileSourceSplit split = nextSplit.get();
+ context.assignSplit(split, subtask);
+ LOG.info("Assigned split to subtask {} : {}", subtask, split);
+ }
+ else {
+ context.sendEventToSourceReader(subtask, new NoMoreSplitsEvent());
+ LOG.info("No more splits available for subtask {}", subtask);
+ }
+ }
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/StreamFormatAdapter.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/StreamFormatAdapter.java
new file mode 100644
index 0000000..e085a0e
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/StreamFormatAdapter.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.impl;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connector.file.src.reader.StreamFormat;
+import org.apache.flink.connector.file.src.util.CheckpointedPosition;
+import org.apache.flink.connector.file.src.util.IteratorResultIterator;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.MathUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import static org.apache.flink.connector.file.src.util.Utils.doWithCleanupOnException;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Adapter to turn a {@link StreamFormat} into a {@link BulkFormat}.
+ */
+@Internal
+public final class StreamFormatAdapter<T> implements BulkFormat<T> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final StreamFormat<T> streamFormat;
+
+ public StreamFormatAdapter(StreamFormat<T> streamFormat) {
+ this.streamFormat = checkNotNull(streamFormat);
+ }
+
+ @Override
+ public BulkFormat.Reader<T> createReader(
+ final Configuration config,
+ final Path filePath,
+ final long splitOffset,
+ final long splitLength) throws IOException {
+
+ final TrackingFsDataInputStream trackingStream = openStream(filePath, config, splitOffset);
+
+ return doWithCleanupOnException(trackingStream, () -> {
+ final StreamFormat.Reader<T> streamReader = streamFormat.createReader(
+ config, trackingStream, trackingStream.getFileLength(), splitOffset + splitLength);
+ return new Reader<>(streamReader, trackingStream, CheckpointedPosition.NO_OFFSET, 0L);
+ });
+ }
+
+ @Override
+ public BulkFormat.Reader<T> restoreReader(
+ final Configuration config,
+ final Path filePath,
+ final long splitOffset,
+ final long splitLength,
+ final CheckpointedPosition checkpointedPosition) throws IOException {
+
+ final TrackingFsDataInputStream trackingStream = openStream(filePath, config, splitOffset);
+
+ return doWithCleanupOnException(trackingStream, () -> {
+ // if there never was a checkpointed offset, yet, we need to initialize the reader like a fresh reader.
+ // see the JavaDocs on StreamFormat.restoreReader() for details
+ final StreamFormat.Reader<T> streamReader = checkpointedPosition.getOffset() == CheckpointedPosition.NO_OFFSET
+ ? streamFormat.createReader(
+ config, trackingStream, trackingStream.getFileLength(), splitOffset + splitLength)
+ : streamFormat.restoreReader(
+ config, trackingStream, checkpointedPosition.getOffset(),
+ trackingStream.getFileLength(), splitOffset + splitLength);
+
+ // skip the records to skip, but make sure we close the reader if something goes wrong
+ doWithCleanupOnException(streamReader, () -> {
+ long toSkip = checkpointedPosition.getRecordsAfterOffset();
+ while (toSkip > 0 && streamReader.read() != null) {
+ toSkip--;
+ }
+ });
+
+ return new Reader<>(
+ streamReader, trackingStream,
+ checkpointedPosition.getOffset(), checkpointedPosition.getRecordsAfterOffset());
+ });
+ }
+
+ @Override
+ public boolean isSplittable() {
+ return streamFormat.isSplittable();
+ }
+
+ @Override
+ public TypeInformation<T> getProducedType() {
+ return streamFormat.getProducedType();
+ }
+
+ private static TrackingFsDataInputStream openStream(
+ final Path file,
+ final Configuration config,
+ final long seekPosition) throws IOException {
+
+ final FileSystem fs = file.getFileSystem();
+ final long fileLength = fs.getFileStatus(file).getLen();
+
+ final int fetchSize = MathUtils.checkedDownCast(config.get(StreamFormat.FETCH_IO_SIZE).getBytes());
+ if (fetchSize <= 0) {
+ throw new IllegalConfigurationException(
+ String.format("The fetch size (%s) must be > 0, but is %d",
+ StreamFormat.FETCH_IO_SIZE.key(), fetchSize));
+ }
+
+ final FSDataInputStream inStream = fs.open(file);
+ return doWithCleanupOnException(inStream, () -> {
+ inStream.seek(seekPosition);
+ return new TrackingFsDataInputStream(inStream, fileLength, fetchSize);
+ });
+
+ }
+
+ // ----------------------------------------------------------------------------------
+
+ /**
+ * The reader adapter, from {@link StreamFormat.Reader} to {@link BulkFormat.Reader}.
+ */
+ public static final class Reader<T> implements BulkFormat.Reader<T> {
+
+ private final StreamFormat.Reader<T> reader;
+ private final TrackingFsDataInputStream stream;
+ private long lastOffset;
+ private long lastRecordsAfterOffset;
+
+ Reader(
+ final StreamFormat.Reader<T> reader,
+ final TrackingFsDataInputStream stream,
+ final long initialOffset,
+ final long initialSkipCount) {
+
+ this.reader = checkNotNull(reader);
+ this.stream = checkNotNull(stream);
+ this.lastOffset = initialOffset;
+ this.lastRecordsAfterOffset = initialSkipCount;
+ }
+
+ @Nullable
+ @Override
+ public RecordIterator<T> readBatch() throws IOException {
+ updateCheckpointedPosition();
+ stream.newBatch();
+
+ final ArrayList<T> result = new ArrayList<>();
+ T next;
+ while (stream.hasRemainingInBatch() && (next = reader.read()) != null) {
+ result.add(next);
+ }
+
+ if (result.isEmpty()) {
+ return null;
+ }
+
+ final RecordIterator<T> iter = new IteratorResultIterator<>(
+ result.iterator(), lastOffset, lastRecordsAfterOffset);
+ lastRecordsAfterOffset += result.size();
+ return iter;
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ reader.close();
+ } finally {
+ // this is just in case, to guard against resource leaks
+ IOUtils.closeQuietly(stream);
+ }
+ }
+
+ private void updateCheckpointedPosition() {
+ final CheckpointedPosition position = reader.getCheckpointedPosition();
+ if (position != null) {
+ this.lastOffset = position.getOffset();
+ this.lastRecordsAfterOffset = position.getRecordsAfterOffset();
+ }
+ }
+ }
+
+ // ----------------------------------------------------------------------------------
+
+ /**
+ * Utility stream that tracks how much has been read. This is used to decide when the reader
+ * should finish the current batch and start the next batch. That way we make the batch sizes
+ * dependent on the consumed data volume, which is more robust than making it dependent on a
+ * record count.
+ */
+ private static final class TrackingFsDataInputStream extends FSDataInputStream {
+
+ private final FSDataInputStream stream;
+ private final long fileLength;
+ private final int batchSize;
+ private int remainingInBatch;
+
+ TrackingFsDataInputStream(FSDataInputStream stream, long fileLength, int batchSize) {
+ checkArgument(fileLength > 0L);
+ checkArgument(batchSize > 0);
+ this.stream = stream;
+ this.fileLength = fileLength;
+ this.batchSize = batchSize;
+ }
+
+ @Override
+ public void seek(long desired) throws IOException {
+ stream.seek(desired);
+ remainingInBatch = 0; // after each seek, we need to start a new batch
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return stream.getPos();
+ }
+
+ @Override
+ public int read() throws IOException {
+ remainingInBatch--;
+ return stream.read();
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ remainingInBatch -= len;
+ return stream.read(b, off, len);
+ }
+
+ @Override
+ public void close() throws IOException {
+ stream.close();
+ }
+
+ boolean hasRemainingInBatch() {
+ return remainingInBatch > 0;
+ }
+
+ void newBatch() {
+ remainingInBatch = batchSize;
+ }
+
+ long getFileLength() {
+ return fileLength;
+ }
+ }
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/BulkFormat.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/BulkFormat.java
new file mode 100644
index 0000000..5c378ed
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/BulkFormat.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.reader;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.util.CheckpointedPosition;
+import org.apache.flink.connector.file.src.util.MutableRecordAndPosition;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+import org.apache.flink.core.fs.Path;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * The {@code BulkFormat} reads and decodes batches of records at a time. Examples of bulk formats
+ * are formats like ORC or Parquet.
+ *
+ * <p>The outer {@code 'BulkFormat'} class acts mainly as a configuration holder and factory for the reader.
+ * The actual reading is done by the {@link BulkFormat.Reader}, which is created in the
+ * {@link BulkFormat#createReader(Configuration, Path, long, long)} method. If a bulk reader is created
+ * based on a checkpoint during checkpointed streaming execution, then the reader is re-created in
+ * the {@link BulkFormat#restoreReader(Configuration, Path, long, long, CheckpointedPosition)} method.
+ *
+ * <h2>Splitting</h2>
+ *
+ * <p>File splitting means dividing a file into multiple regions that can be read independently.
+ * Whether a format supports splitting is indicated via the {@link #isSplittable()} method.
+ *
+ * <p>Splitting has the potential to increase parallelism and performance, but poses additional
+ * constraints on the format readers: Readers need to be able to find a consistent starting point
+ * within the file near the offset where the split starts, (like the next record delimiter, or a
+ * block start or a sync marker). This is not necessarily possible for all formats, which is why
+ * splitting is optional.
+ *
+ * <h2>Checkpointing</h2>
+ *
+ * <p>The bulk reader returns an iterator per batch that it reads. The iterator produces records together
+ * with a position. That position is stored in the checkpointed state atomically with the processing of
+ * the record. That means it must be the position from where the reading can be resumed AFTER
+ * the record was processed; the position hence points effectively to the record AFTER the current record.
+ *
+ * <p>The simplest way to return this position information is to store no offset and simply store an
+ * incrementing count of records to skip after recovery. Given the above contract, the fist record would
+ * be returned with a records-to-skip count of one, the second one with a record count of two, etc.
+ *
+ * <p>Formats that have the ability to efficiently seek to a record (or to every n-th record) can use
+ * the position field to seek to a record directly and avoid having to read and discard many records
+ * on recovery.
+ *
+ * <p>Note on this design: Why do we not make the position point to the current record and always skip
+ * one record after recovery (the just processed record)? We need to be able to support formats where
+ * skipping records (even one) is not an option. For example formats that execute (pushed down) filters
+ * may want to avoid a skip-record-count all together, so that they don't skip the wrong records when
+ * the filter gets updated around a checkpoint/savepoint.
+ *
+ * <h2>Serializable</h2>
+ *
+ * <p>Like many other API classes in Flink, the outer class is serializable to support sending instances
+ * to distributed workers for parallel execution. This is purely short-term serialization for RPC and
+ * no instance of this will be long-term persisted in a serialized form.
+ *
+ * <h2>Record Batching</h2>
+ *
+ * <p>Internally in the file source, the readers pass batches of records from the reading
+ * threads (that perform the typically blocking I/O operations) to the async mailbox threads that
+ * do the streaming and batch data processing. Passing records in batches (rather than one-at-a-time)
+ * much reduce the thread-to-thread handover overhead.
+ *
+ * <p>For the {@code BulkFormat}, one batch (as returned by {@link BulkFormat.Reader#readBatch()}) is
+ * handed over as one.
+ */
+@PublicEvolving
+public interface BulkFormat<T> extends Serializable, ResultTypeQueryable<T> {
+
+ /**
+ * Creates a new reader that reads from {@code filePath} starting at {@code offset} and reads
+ * until {@code length} bytes after the offset.
+ */
+ BulkFormat.Reader<T> createReader(
+ Configuration config,
+ Path filePath,
+ long splitOffset,
+ long splitLength) throws IOException;
+
+ /**
+ * Creates a new reader that reads from {@code filePath} starting at {@code offset} and reads
+ * until {@code length} bytes after the offset. A number of {@code recordsToSkip} records should be
+ * read and discarded after the offset. This is typically part of restoring a reader to a checkpointed
+ * position.
+ */
+ BulkFormat.Reader<T> restoreReader(
+ Configuration config,
+ Path filePath,
+ long splitOffset,
+ long splitLength,
+ CheckpointedPosition checkpointedPosition) throws IOException;
+
+ /**
+ * Checks whether this format is splittable. Splittable formats allow Flink to create multiple splits
+ * per file, so that Flink can read multiple regions of the file concurrently.
+ *
+ * <p>See {@link BulkFormat top-level JavaDocs} (section "Splitting") for details.
+ */
+ boolean isSplittable();
+
+ /**
+ * Gets the type produced by this format. This type will be the type produced by the file
+ * source as a whole.
+ */
+ @Override
+ TypeInformation<T> getProducedType();
+
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * The actual reader that reads the batches of records.
+ */
+ interface Reader<T> extends Closeable {
+
+ /**
+ * Reads one batch. The method should return null when reaching the end of the input.
+ * The returned batch will be handed over to the processing threads as one.
+ *
+ * <p>The returned iterator object and any contained objects may be held onto by the file source
+ * for some time, so it should not be immediately reused by the reader.
+ *
+ * <p>To implement reuse and to save object allocation, consider using a
+ * {@link org.apache.flink.connector.file.src.util.Pool} and recycle objects into the Pool in the
+ * the {@link RecordIterator#releaseBatch()} method.
+ */
+ @Nullable
+ RecordIterator<T> readBatch() throws IOException;
+
+ /**
+ * Closes the reader and should release all resources.
+ */
+ @Override
+ void close() throws IOException;
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * An iterator over records with their position in the file.
+ * The iterator is closeable to support clean resource release and recycling.
+ *
+ * @param <T> The type of the record.
+ */
+ interface RecordIterator<T> {
+
+ /**
+ * Gets the next record from the file, together with its position.
+ *
+ * <p>The position information returned with the record point to the record AFTER the returned
+ * record, because it defines the point where the reading should resume once the current record
+ * is emitted. The position information is put in the source's state when the record
+ * is emitted. If a checkpoint is taken directly after the record is emitted, the checkpoint must
+ * to describe where to resume the source reading from after that record.
+ *
+ * <p>Objects returned by this method may be reused by the iterator. By the time that this
+ * method is called again, no object returned from the previous call will be referenced any more.
+ * That makes it possible to have a single {@link MutableRecordAndPosition} object and return the
+ * same instance (with updated record and position) on every call.
+ */
+ @Nullable
+ RecordAndPosition<T> next();
+
+ /**
+ * Releases the batch that this iterator iterated over.
+ * This is not supposed to close the reader and its resources, but is simply a signal that this
+ * iterator is no used any more. This method can be used as a hook to recycle/reuse heavyweight
+ * object structures.
+ */
+ void releaseBatch();
+ }
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/FileRecordFormat.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/FileRecordFormat.java
new file mode 100644
index 0000000..a5f1def
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/FileRecordFormat.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.reader;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.util.CheckpointedPosition;
+import org.apache.flink.core.fs.Path;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * A reader format that reads individual records from a file.
+ *
+ * <p>This format is for cases where the readers need access to the file directly or need to create a custom
+ * stream. For readers that can directly on input streams, consider using the {@link StreamFormat}, which
+ * is more robust.
+ *
+ * <p>The outer class {@code FileRecordFormat} acts mainly as a configuration holder and factory for the reader.
+ * The actual reading is done by the {@link FileRecordFormat.Reader}, which is created based on an
+ * input stream in the {@link #createReader(Configuration, Path, long, long)} method
+ * and restored (from checkpointed positions) in the method
+ * {@link #restoreReader(Configuration, Path, long, long, long)}.
+ *
+ * <h2>Splitting</h2>
+ *
+ * <p>File splitting means dividing a file into multiple regions that can be read independently.
+ * Whether a format supports splitting is indicated via the {@link #isSplittable()} method.
+ *
+ * <p>Splitting has the potential to increase parallelism and performance, but poses additional
+ * constraints on the format readers: Readers need to be able to find a consistent starting point
+ * within the file near the offset where the split starts, (like the next record delimiter, or a
+ * block start or a sync marker). This is not necessarily possible for all formats, which is why
+ * splitting is optional.
+ *
+ * <h2>Checkpointing</h2>
+ *
+ * <p>Readers can optionally return the current position of the reader, via the
+ * {@link FileRecordFormat.Reader#getCheckpointedPosition()}. This can improve recovery speed from
+ * a checkpoint.
+ *
+ * <p>By default (if that method is not overridden or returns null), then recovery from a checkpoint
+ * works by reading the split again and skipping the number of records that were processed before
+ * the checkpoint. Implementing this method allows formats to directly seek to that position, rather
+ * than read and discard a number or records.
+ *
+ * <p>The position is a combination of offset in the file and a number of records to skip after
+ * this offset (see {@link CheckpointedPosition}). This helps formats that cannot describe all
+ * record positions by an offset, for example because records are compressed in batches or stored
+ * in a columnar layout (e.g., ORC, Parquet).
+ * The default behavior can be viewed as returning a {@code CheckpointedPosition} where the offset
+ * is always zero and only the {@link CheckpointedPosition#getRecordsAfterOffset()} is incremented
+ * with each emitted record.
+ *
+ * <h2>Serializable</h2>
+ *
+ * <p>Like many other API classes in Flink, the outer class is serializable to support sending instances
+ * to distributed workers for parallel execution. This is purely short-term serialization for RPC and
+ * no instance of this will be long-term persisted in a serialized form.
+ *
+ * <h2>Record Batching</h2>
+ *
+ * <p>Internally in the file source, the readers pass batches of records from the reading
+ * threads (that perform the typically blocking I/O operations) to the async mailbox threads that
+ * do the streaming and batch data processing. Passing records in batches (rather than one-at-a-time)
+ * much reduce the thread-to-thread handover overhead.
+ *
+ * <p>This batching is by default based a number of records. See {@link FileRecordFormat#RECORDS_PER_FETCH}
+ * to configure that handover batch size.
+ */
+@PublicEvolving
+public interface FileRecordFormat<T> extends Serializable, ResultTypeQueryable<T> {
+
+ /**
+ * Creates a new reader to read in this format. This method is called when a fresh reader is
+ * created for a split that was assigned from the enumerator. This method may also be called on
+ * recovery from a checkpoint, if the reader never stored an offset in the checkpoint
+ * (see {@link #restoreReader(Configuration, Path, long, long, long)} for details.
+ *
+ * <p>The {@code fileLen} is the length of the entire file, while {@code splitEnd} is the offset of
+ * the first byte after the split end boundary (exclusive end boundary). For non-splittable formats,
+ * both values are identical.
+ */
+ FileRecordFormat.Reader<T> createReader(
+ Configuration config,
+ Path filePath,
+ long splitOffset,
+ long splitLength) throws IOException;
+
+ /**
+ * Restores a reader from a checkpointed position. This method is called when the reader is recovered
+ * from a checkpoint and the reader has previously stored an offset into the checkpoint, by returning
+ * from the {@link FileRecordFormat.Reader#getCheckpointedPosition()} a value with non-negative
+ * {@link CheckpointedPosition#getOffset() offset}. That value is supplied as the {@code restoredOffset}.
+ *
+ * <p>If the reader never produced a {@code CheckpointedPosition} with a non-negative offset before, then
+ * this method is not called, and the reader is created in the same way as a fresh reader via the method
+ * {@link #createReader(Configuration, Path, long, long)} and the appropriate number of
+ * records are read and discarded, to position to reader to the checkpointed position.
+ *
+ * <p>The {@code fileLen} is the length of the entire file, while {@code splitEnd} is the offset of
+ * the first byte after the split end boundary (exclusive end boundary). For non-splittable formats,
+ * both values are identical.
+ */
+ FileRecordFormat.Reader<T> restoreReader(
+ Configuration config,
+ Path filePath,
+ long restoredOffset,
+ long splitOffset,
+ long splitLength) throws IOException;
+
+ /**
+ * Checks whether this format is splittable. Splittable formats allow Flink to create multiple splits
+ * per file, so that Flink can read multiple regions of the file concurrently.
+ *
+ * <p>See {@link StreamFormat top-level JavaDocs} (section "Splitting") for details.
+ */
+ boolean isSplittable();
+
+ /**
+ * Gets the type produced by this format. This type will be the type produced by the file
+ * source as a whole.
+ */
+ @Override
+ TypeInformation<T> getProducedType();
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * Config option for the number of records to hand over in each fetch.
+ *
+ * <p>The number should be large enough so that the thread-to-thread handover overhead
+ * is amortized across the records, but small enough so that the these records together do
+ * not consume too memory to be feasible.
+ */
+ ConfigOption<Integer> RECORDS_PER_FETCH = ConfigOptions
+ .key("source.file.records.fetch-size")
+ .intType()
+ .defaultValue(128)
+ .withDescription("The number of records to hand over from the I/O thread to file reader in one unit.");
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * The actual reader that reads the records.
+ */
+ interface Reader<T> extends Closeable {
+
+ /**
+ * Reads the next record. Returns {@code null} when the input has reached its end.
+ */
+ @Nullable
+ T read() throws IOException;
+
+ /**
+ * Closes the reader to release all resources.
+ */
+ @Override
+ void close() throws IOException;
+
+ /**
+ * Optionally returns the current position of the reader. This can be implemented by readers that
+ * want to speed up recovery from a checkpoint.
+ *
+ * <p>The current position of the reader is the position of the next record that will be returned
+ * in a call to {@link #read()}. This can be implemented by readers that want to speed up recovery
+ * from a checkpoint.
+ *
+ * <p>See the {@link FileRecordFormat top-level class comment} (section "Checkpointing") for details.
+ */
+ @Nullable
+ default CheckpointedPosition getCheckpointedPosition() {
+ return null;
+ }
+ }
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/SimpleStreamFormat.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/SimpleStreamFormat.java
new file mode 100644
index 0000000..938b40e
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/SimpleStreamFormat.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.reader;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.util.CheckpointedPosition;
+import org.apache.flink.core.fs.FSDataInputStream;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A simple version of the {@link StreamFormat}, for formats that are not splittable.
+ *
+ * <p>This format makes no difference between creating readers from scratch (new file) or from a
+ * checkpoint. Because of that, if the reader actively checkpoints its position
+ * (via the {@link Reader#getCheckpointedPosition()} method) then the checkpointed offset must be
+ * a byte offset in the file from which the stream can be resumed as if it were te beginning of the file.
+ *
+ * <p>For all other details, please check the docs of {@link StreamFormat}.
+ *
+ * @param <T> The type of records created by this format reader.
+ */
+@PublicEvolving
+public abstract class SimpleStreamFormat<T> implements StreamFormat<T> {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Creates a new reader. This method is called both for the creation of new reader (from the beginning
+ * of a file) and for restoring checkpointed readers.
+ *
+ * <p>If the reader previously checkpointed an offset, then the input stream will be positioned to
+ * that particular offset. Readers checkpoint an offset by returning a value from the method
+ * {@link Reader#getCheckpointedPosition()} method with an offset other than
+ * {@link CheckpointedPosition#NO_OFFSET}).
+ */
+ public abstract Reader<T> createReader(Configuration config, FSDataInputStream stream) throws IOException;
+
+ /**
+ * Gets the type produced by this format. This type will be the type produced by the file
+ * source as a whole.
+ */
+ @Override
+ public abstract TypeInformation<T> getProducedType();
+
+ // ------------------------------------------------------------------------
+ // pre-defined methods from Stream Format
+ // ------------------------------------------------------------------------
+
+ /**
+ * This format is always not splittable.
+ */
+ @Override
+ public final boolean isSplittable() {
+ return false;
+ }
+
+ @Override
+ public final Reader<T> createReader(
+ Configuration config,
+ FSDataInputStream stream,
+ long fileLen,
+ long splitEnd) throws IOException {
+
+ checkNotSplit(fileLen, splitEnd);
+
+ final long streamPos = stream.getPos();
+ checkArgument(streamPos == 0L,
+ "SimpleStreamFormat is not splittable, but found non-zero stream position (%s)",
+ streamPos);
+
+ return createReader(config, stream);
+ }
+
+ @Override
+ public final Reader<T> restoreReader(
+ final Configuration config,
+ final FSDataInputStream stream,
+ final long restoredOffset,
+ final long fileLen,
+ final long splitEnd) throws IOException {
+
+ checkNotSplit(fileLen, splitEnd);
+ stream.seek(restoredOffset);
+ return createReader(config, stream);
+ }
+
+ private static void checkNotSplit(long fileLen, long splitEnd) {
+ if (splitEnd != fileLen) {
+ throw new IllegalArgumentException(String.format(
+ "SimpleStreamFormat is not splittable, but found split end (%d) different from file length (%d)",
+ splitEnd, fileLen));
+ }
+ }
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/StreamFormat.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/StreamFormat.java
new file mode 100644
index 0000000..57424b2
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/StreamFormat.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.reader;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.connector.file.src.util.CheckpointedPosition;
+import org.apache.flink.core.fs.FSDataInputStream;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * A reader format that reads individual records from a stream.
+ *
+ * <p>The outer class {@code StreamFormat} acts mainly as a configuration holder and factory for the reader.
+ * The actual reading is done by the {@link StreamFormat.Reader}, which is created based on an
+ * input stream in the {@link #createReader(Configuration, FSDataInputStream, long, long)} method
+ * and restored (from checkpointed positions) in the method
+ * {@link #restoreReader(Configuration, FSDataInputStream, long, long, long)}.
+ *
+ * <p>Compared to the {@link BulkFormat}, the stream format handles a few things out-of-the-box, like
+ * deciding how to batch records or dealing with compression.
+ *
+ * <p>For a simpler version of this interface, for format that do not support splitting or logical record
+ * offsets during checkpointing, see {@link SimpleStreamFormat}.
+ *
+ * <h2>Splitting</h2>
+ *
+ * <p>File splitting means dividing a file into multiple regions that can be read independently.
+ * Whether a format supports splitting is indicated via the {@link #isSplittable()} method.
+ *
+ * <p>Splitting has the potential to increase parallelism and performance, but poses additional
+ * constraints on the format readers: Readers need to be able to find a consistent starting point
+ * within the file near the offset where the split starts, (like the next record delimiter, or a
+ * block start or a sync marker). This is not necessarily possible for all formats, which is why
+ * splitting is optional.
+ *
+ * <h2>Checkpointing</h2>
+ *
+ * <p>Readers can optionally return the current position of the reader, via the
+ * {@link StreamFormat.Reader#getCheckpointedPosition()}. This can improve recovery speed from
+ * a checkpoint.
+ *
+ * <p>By default (if that method is not overridden or returns null), then recovery from a checkpoint
+ * works by reading the split again and skipping the number of records that were processed before
+ * the checkpoint. Implementing this method allows formats to directly seek to that position, rather
+ * than read and discard a number or records.
+ *
+ * <p>The position is a combination of offset in the file and a number of records to skip after
+ * this offset (see {@link CheckpointedPosition}). This helps formats that cannot describe all
+ * record positions by an offset, for example because records are compressed in batches or stored
+ * in a columnar layout (e.g., ORC, Parquet).
+ * The default behavior can be viewed as returning a {@code CheckpointedPosition} where the offset
+ * is always zero and only the {@link CheckpointedPosition#getRecordsAfterOffset()} is incremented
+ * with each emitted record.
+ *
+ * <h2>Serializable</h2>
+ *
+ * <p>Like many other API classes in Flink, the outer class is serializable to support sending instances
+ * to distributed workers for parallel execution. This is purely short-term serialization for RPC and
+ * no instance of this will be long-term persisted in a serialized form.
+ *
+ * <h2>Record Batching</h2>
+ *
+ * <p>Internally in the file source, the readers pass batches of records from the reading
+ * threads (that perform the typically blocking I/O operations) to the async mailbox threads that
+ * do the streaming and batch data processing. Passing records in batches (rather than one-at-a-time)
+ * much reduces the thread-to-thread handover overhead.
+ *
+ * <p>This batching is by default based on I/O fetch size for the {@code StreamFormat}, meaning
+ * the set of records derived from one I/O buffer will be handed over as one.
+ * See {@link StreamFormat#FETCH_IO_SIZE} to configure that fetch size.
+ *
+ * @param <T> The type of records created by this format reader.
+ */
+@PublicEvolving
+public interface StreamFormat<T> extends Serializable, ResultTypeQueryable<T> {
+
+ /**
+ * Creates a new reader to read in this format. This method is called when a fresh reader is
+ * created for a split that was assigned from the enumerator. This method may also be called on
+ * recovery from a checkpoint, if the reader never stored an offset in the checkpoint
+ * (see {@link #restoreReader(Configuration, FSDataInputStream, long, long, long)} for details.
+ *
+ * <p>If the format is {@link #isSplittable() splittable}, then the {@code stream} is positioned
+ * to the beginning of the file split, otherwise it will be at position zero.
+ *
+ * <p>The {@code fileLen} is the length of the entire file, while {@code splitEnd} is the offset of
+ * the first byte after the split end boundary (exclusive end boundary). For non-splittable formats,
+ * both values are identical.
+ */
+ Reader<T> createReader(
+ Configuration config,
+ FSDataInputStream stream,
+ long fileLen,
+ long splitEnd) throws IOException;
+
+ /**
+ * Restores a reader from a checkpointed position. This method is called when the reader is recovered
+ * from a checkpoint and the reader has previously stored an offset into the checkpoint, by returning
+ * from the {@link Reader#getCheckpointedPosition()} a value with non-negative
+ * {@link CheckpointedPosition#getOffset() offset}. That value is supplied as the {@code restoredOffset}.
+ *
+ * <p>If the format is {@link #isSplittable() splittable}, then the {@code stream} is positioned
+ * to the beginning of the file split, otherwise it will be at position zero. The stream is NOT
+ * positioned to the checkpointed offset, because the format is free to interpret this offset in
+ * a different way than the byte offset in the file (for example as a record index).
+ *
+ * <p>If the reader never produced a {@code CheckpointedPosition} with a non-negative offset before, then
+ * this method is not called, and the reader is created in the same way as a fresh reader via the method
+ * {@link #createReader(Configuration, FSDataInputStream, long, long)} and the appropriate number of
+ * records are read and discarded, to position to reader to the checkpointed position.
+ *
+ * <p>Having a different method for restoring readers to a checkpointed position allows readers to
+ * seek to the start position differently in that case, compared to when the reader is created from
+ * a split offset generated at the enumerator. In the latter case, the offsets are commonly "approximate",
+ * because the enumerator typically generates splits based only on metadata. Reader then have to skip some
+ * bytes while searching for the next position to start from (based on a delimiter, sync marker, block
+ * offset, etc.). In contrast, checkpointed offsets are often precise, because they were recorded as the
+ * reader when through the data stream. Starting a reader from a checkpointed offset may hence not
+ * require and search for the next delimiter/block/marker.
+ *
+ * <p>The {@code fileLen} is the length of the entire file, while {@code splitEnd} is the offset of
+ * the first byte after the split end boundary (exclusive end boundary). For non-splittable formats,
+ * both values are identical.
+ */
+ Reader<T> restoreReader(
+ Configuration config,
+ FSDataInputStream stream,
+ long restoredOffset,
+ long fileLen,
+ long splitEnd) throws IOException;
+
+ /**
+ * Checks whether this format is splittable. Splittable formats allow Flink to create multiple splits
+ * per file, so that Flink can read multiple regions of the file concurrently.
+ *
+ * <p>See {@link StreamFormat top-level JavaDocs} (section "Splitting") for details.
+ */
+ boolean isSplittable();
+
+ /**
+ * Gets the type produced by this format. This type will be the type produced by the file
+ * source as a whole.
+ */
+ @Override
+ TypeInformation<T> getProducedType();
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * The config option to define how many bytes to be read by the I/O thread in one fetch operation.
+ */
+ ConfigOption<MemorySize> FETCH_IO_SIZE = ConfigOptions
+ .key("source.file.stream.io-fetch-size")
+ .memoryType()
+ .defaultValue(MemorySize.ofMebiBytes(1L))
+ .withDescription("The approximate of bytes per fetch that is passed from the I/O thread to file reader.");
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * The actual reader that reads the records.
+ */
+ interface Reader<T> extends Closeable {
+
+ /**
+ * Reads the next record. Returns {@code null} when the input has reached its end.
+ */
+ @Nullable
+ T read() throws IOException;
+
+ /**
+ * Closes the reader to release all resources.
+ */
+ @Override
+ void close() throws IOException;
+
+ /**
+ * Optionally returns the current position of the reader. This can be implemented by readers that
+ * want to speed up recovery from a checkpoint.
+ *
+ * <p>The current position of the reader is the position of the next record that will be returned
+ * in a call to {@link #read()}. This can be implemented by readers that want to speed up recovery
+ * from a checkpoint.
+ *
+ * <p>See the {@link StreamFormat top-level class comment} (section "Checkpointing") for details.
+ */
+ @Nullable
+ default CheckpointedPosition getCheckpointedPosition() {
+ return null;
+ }
+ }
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/TextLineFormat.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/TextLineFormat.java
new file mode 100644
index 0000000..4eccd36
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/TextLineFormat.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.reader;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FSDataInputStream;
+
+import javax.annotation.Nullable;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+/**
+ * A reader format that text lines from a file.
+ *
+ * <p>The reader uses Java's built-in {@link InputStreamReader} to decode the byte stream using various
+ * supported charset encodings.
+ *
+ * <p>This format does not support optimized recovery from checkpoints. On recovery, it will re-read and
+ * discard the number of lined that were processed before the last checkpoint. That is due to the fact
+ * that the offsets of lines in the file cannot be tracked through the charset decoders with their
+ * internal buffering of stream input and charset decoder state.
+ */
+@PublicEvolving
+public class TextLineFormat extends SimpleStreamFormat<String> {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final String DEFAULT_CHARSET_NAME = "UTF-8";
+
+ private final String charsetName;
+
+ public TextLineFormat() {
+ this(DEFAULT_CHARSET_NAME);
+ }
+
+ public TextLineFormat(String charsetName) {
+ this.charsetName = charsetName;
+ }
+
+ @Override
+ public Reader createReader(Configuration config, FSDataInputStream stream) throws IOException {
+ final BufferedReader reader = new BufferedReader(new InputStreamReader(stream, charsetName));
+ return new Reader(reader);
+ }
+
+ @Override
+ public TypeInformation<String> getProducedType() {
+ return Types.STRING;
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * The actual reader for the {@code TextLineFormat}.
+ */
+ public static final class Reader implements StreamFormat.Reader<String> {
+
+ private final BufferedReader reader;
+
+ Reader(final BufferedReader reader) {
+ this.reader = reader;
+ }
+
+ @Nullable
+ @Override
+ public String read() throws IOException {
+ return reader.readLine();
+ }
+
+ @Override
+ public void close() throws IOException {
+ reader.close();
+ }
+ }
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/ArrayResultIterator.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/ArrayResultIterator.java
new file mode 100644
index 0000000..af500c5
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/ArrayResultIterator.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.util;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+
+import javax.annotation.Nullable;
+
+/**
+ * A simple {@link BulkFormat.RecordIterator} that returns the elements of an array, one after the other.
+ * The iterator is mutable to support object reuse and supports recycling.
+ *
+ * @param <E> The type of the record returned by the iterator.
+ */
+@PublicEvolving
+public final class ArrayResultIterator<E> extends RecyclableIterator<E> implements BulkFormat.RecordIterator<E> {
+
+ private E[] records;
+ private int num;
+ private int pos;
+
+ private final MutableRecordAndPosition<E> recordAndPosition;
+
+ /**
+ * Creates a new, initially empty, {@code ArrayResultIterator}.
+ */
+ public ArrayResultIterator() {
+ this(null);
+ }
+
+ /**
+ * Creates a new, initially empty, {@code ArrayResultIterator}.
+ * The iterator calls the given {@code recycler} when the {@link #releaseBatch()} method is called.
+ */
+ public ArrayResultIterator(@Nullable Runnable recycler) {
+ super(recycler);
+ this.recordAndPosition = new MutableRecordAndPosition<>();
+ }
+
+ // -------------------------------------------------------------------------
+ // Setting
+ // -------------------------------------------------------------------------
+
+ /**
+ * Sets the records to be returned by this iterator.
+ * Each record's {@link RecordAndPosition} will have the same offset (for {@link RecordAndPosition#getOffset()}.
+ * The first returned record will have a records-to-skip count of {@code skipCountOfFirst + 1}, following
+ * the contract that each record needs to point to the position AFTER itself
+ * (because a checkpoint taken after the record was emitted needs to resume from after that record).
+ */
+ public void set(final E[] records, final int num, final long offset, final long skipCountOfFirst) {
+ this.records = records;
+ this.num = num;
+ this.pos = 0;
+ this.recordAndPosition.set(null, offset, skipCountOfFirst);
+ }
+
+ // -------------------------------------------------------------------------
+ // Result Iterator Methods
+ // -------------------------------------------------------------------------
+
+ @Nullable
+ @Override
+ public RecordAndPosition<E> next() {
+ if (pos < num) {
+ recordAndPosition.setNext(records[pos++]);
+ return recordAndPosition;
+ }
+ else {
+ return null;
+ }
+ }
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/CheckpointedPosition.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/CheckpointedPosition.java
new file mode 100644
index 0000000..bbdd09c
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/CheckpointedPosition.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.util;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * The position of a reader, to be stored in a checkpoint. The position consists of a record offset
+ * and a number of records to skip after that offset. The offset is optional, it may take the value
+ * {@link CheckpointedPosition#NO_OFFSET}, in which case only the records-to-skip count is used.
+ *
+ * <p>The combination of offset and records-to-skip makes it possible to represent the position of a wide
+ * variety of readers. In the simplest case, readers might store no offset and only store how many
+ * records they previously returned. On the other hand, readers that can precisely point to each
+ * record via a position can store that in the checkpoint. Readers that have occasional addressable positions
+ * (like sync markers, block starts, etc.) can store those together with the records skipped after the
+ * last marker.
+ */
+@PublicEvolving
+public final class CheckpointedPosition implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Constant for the offset, reflecting that the position does not contain any offset information.
+ * It is used in positions that are defined only by a number of records to skip.
+ */
+ public static final long NO_OFFSET = -1L;
+
+ private final long offset;
+ private final long recordsAfterOffset;
+
+ /**
+ * Creates a new CheckpointedPosition for given offset and records-to-skip.
+ *
+ * @param offset The offset that the reader will seek to when restored from this checkpoint.
+ * @param recordsAfterOffset The records to skip after the offset.
+ */
+ public CheckpointedPosition(long offset, long recordsAfterOffset) {
+ checkArgument(offset >= -1, "offset must be >= 0 or NO_OFFSET");
+ checkArgument(recordsAfterOffset >= 0, "recordsAfterOffset must be >= 0");
+ this.offset = offset;
+ this.recordsAfterOffset = recordsAfterOffset;
+ }
+
+ /**
+ * Gets the offset that the reader will seek to when restored from this checkpoint.
+ */
+ public long getOffset() {
+ return offset;
+ }
+
+ /**
+ * Gets the records to skip after the offset.
+ */
+ public long getRecordsAfterOffset() {
+ return recordsAfterOffset;
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final CheckpointedPosition that = (CheckpointedPosition) o;
+ return offset == that.offset &&
+ recordsAfterOffset == that.recordsAfterOffset;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(offset, recordsAfterOffset);
+ }
+
+ @Override
+ public String toString() {
+ return String.format("CheckpointedPosition: offset=%s, recordsToSkip=%d",
+ offset == NO_OFFSET ? "NO_OFFSET" : String.valueOf(offset),
+ recordsAfterOffset);
+ }
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/IteratorResultIterator.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/IteratorResultIterator.java
new file mode 100644
index 0000000..5369e54
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/IteratorResultIterator.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.util;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+
+import javax.annotation.Nullable;
+
+import java.util.Iterator;
+
+/**
+ * A simple {@link BulkFormat.RecordIterator} that returns the elements of an iterator,
+ * augmented with position information.
+ *
+ * @param <E> The type of the record returned by the iterator.
+ */
+@PublicEvolving
+public final class IteratorResultIterator<E> extends RecyclableIterator<E> implements BulkFormat.RecordIterator<E> {
+
+ private final Iterator<E> records;
+
+ private final MutableRecordAndPosition<E> recordAndPosition;
+
+ /**
+ * Creates a new {@code RecordIterator} returning the records from the given iterator, augmented
+ * with their position information.
+ *
+ * <p>Each record's {@link RecordAndPosition} will have the same offset value for
+ * {@link RecordAndPosition#getOffset()}. The first returned record will have a records-to-skip count
+ * of {@code startingSkipCount + 1}, following the contract that each record needs to point to the
+ * position AFTER itself (because a checkpoint taken after the record was emitted needs to resume
+ * from after that record).
+ */
+ public IteratorResultIterator(
+ final Iterator<E> records,
+ final long offset,
+ final long startingSkipCount) {
+ this(records, offset, startingSkipCount, null);
+ }
+
+ /**
+ * Creates a new {@code RecordIterator} returning the records from the given iterator, augmented
+ * with their position information. When the iterator is marked as done (via {@link #releaseBatch()}, the
+ * given {@code recycler} is called.
+ *
+ * <p>Each record's {@link RecordAndPosition} will have the same offset value for
+ * {@link RecordAndPosition#getOffset()}. The first returned record will have a records-to-skip count
+ * of {@code startingSkipCount + 1}, following the contract that each record needs to point to the
+ * position AFTER itself (because a checkpoint taken after the record was emitted needs to resume
+ * from after that record).
+ */
+ public IteratorResultIterator(
+ final Iterator<E> records,
+ final long offset,
+ final long startingSkipCount,
+ final @Nullable Runnable recycler) {
+
+ super(recycler);
+ this.records = records;
+ this.recordAndPosition = new MutableRecordAndPosition<>();
+ this.recordAndPosition.setPosition(offset, startingSkipCount);
+ }
+
+ // -------------------------------------------------------------------------
+ // Result Iterator Methods
+ // -------------------------------------------------------------------------
+
+ @Nullable
+ @Override
+ public RecordAndPosition<E> next() {
+ if (records.hasNext()) {
+ recordAndPosition.setNext(records.next());
+ return recordAndPosition;
+ } else {
+ return null;
+ }
+ }
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/MutableRecordAndPosition.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/MutableRecordAndPosition.java
new file mode 100644
index 0000000..07de831
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/MutableRecordAndPosition.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.util;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+
+/**
+ * A mutable version of the {@link RecordAndPosition}.
+ *
+ * <p>This mutable object is useful in cases where only once instance of a {@code RecordAndPosition}
+ * is needed at a time, like for the result values of the {@link BulkFormat.RecordIterator}.
+ */
+@PublicEvolving
+public class MutableRecordAndPosition<E> extends RecordAndPosition<E> {
+
+ /**
+ * Updates the record and position in this object.
+ */
+ public void set(E record, long offset, long recordSkipCount) {
+ this.record = record;
+ this.offset = offset;
+ this.recordSkipCount = recordSkipCount;
+ }
+
+ /**
+ * Sets the position without setting a record.
+ */
+ public void setPosition(long offset, long recordSkipCount) {
+ this.offset = offset;
+ this.recordSkipCount = recordSkipCount;
+ }
+
+ /**
+ * Sets the next record of a sequence. This increments the {@code recordSkipCount} by one.
+ */
+ public void setNext(E record) {
+ this.record = record;
+ this.recordSkipCount++;
+ }
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/Pool.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/Pool.java
new file mode 100644
index 0000000..274e775
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/Pool.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.util;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.ArrayBlockingQueue;
+
+/**
+ * A pool to cache and recycle heavyweight objects, to reduce object allocation.
+ *
+ * <p>This pool can be used in the {@link BulkFormat.Reader}, when the returned objects are heavyweight
+ * and need to be reused for efficiency. Because the reading happens in I/O threads while the record
+ * processing happens in Flink's main processing threads, these objects cannot be reused immediately
+ * after being returned. They can be reused, once they are recycled back to the pool.
+ *
+ * @param <T> The type of object cached in the pool.
+ */
+@PublicEvolving
+public class Pool<T> {
+
+ private final ArrayBlockingQueue<T> pool;
+
+ private final Recycler<T> recycler;
+
+ private final int poolCapacity;
+ private int poolSize;
+
+ /**
+ * Creates a pool with the given capacity. No more than that many elements may be added to the pool.
+ */
+ public Pool(int poolCapacity) {
+ this.pool = new ArrayBlockingQueue<>(poolCapacity);
+ this.recycler = this::addBack;
+ this.poolCapacity = poolCapacity;
+ this.poolSize = 0;
+ }
+
+ /**
+ * Gets the recycler for this pool. The recycler returns its given objects back to this pool.
+ */
+ public Recycler<T> recycler() {
+ return recycler;
+ }
+
+ /**
+ * Adds an entry to the pool with an optional payload.
+ * This method fails if called more often than the pool capacity specified during construction.
+ */
+ public synchronized void add(T object) {
+ if (poolSize >= poolCapacity) {
+ throw new IllegalStateException("No space left in pool");
+ }
+ poolSize++;
+
+ addBack(object);
+ }
+
+ /**
+ * Gets the next cached entry. This blocks until the next entry is available.
+ */
+ public T pollEntry() throws InterruptedException {
+ return pool.take();
+ }
+
+ /**
+ * Tries to get the next cached entry. If the pool is empty, this method returns null.
+ */
+ @Nullable
+ public T tryPollEntry() {
+ return pool.poll();
+ }
+
+ /**
+ * Internal callback to put an entry back to the pool.
+ */
+ void addBack(T object) {
+ pool.add(object);
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * A Recycler puts objects into the pool that the recycler is associated with.
+ *
+ * @param <T> The pooled and recycled type.
+ */
+ @FunctionalInterface
+ public interface Recycler<T> {
+
+ /**
+ * Recycles the given object to the pool that this recycler works with.
+ */
+ void recycle(T object);
+ }
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/RecordAndPosition.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/RecordAndPosition.java
new file mode 100644
index 0000000..4c5079c
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/RecordAndPosition.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.util;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * A record, together with the reader position to be stored in the checkpoint.
+ *
+ * <p>The position defines the point in the reader AFTER the record. Record processing and
+ * updating checkpointed state happens atomically. The position points to where the reader should
+ * resume after this record is processed.
+ *
+ * <p>For example, the very first record in a file split could have an {@code offset} of zero and a
+ * {@code recordSkipCount} of one.
+ *
+ * <p>This class is immutable for safety. Use {@link MutableRecordAndPosition} if you need a mutable
+ * version for efficiency.
+ *
+ * <p>Note on this design: Why do we not make the position point to the current record and always skip
+ * one record after recovery (the just processed record)? We need to be able to support formats where
+ * skipping records (even one) is not an option. For example formats that execute (pushed down) filters
+ * may want to avoid a skip-record-count all together, so that they don't skip the wrong records when
+ * the filter gets updated around a checkpoint/savepoint.
+ */
+@PublicEvolving
+public class RecordAndPosition<E> {
+
+ /**
+ * Constant for the offset, reflecting that the position does not contain any offset information.
+ * It is used in positions that are defined only by a number of records to skip.
+ */
+ public static final long NO_OFFSET = CheckpointedPosition.NO_OFFSET;
+
+ // these are package private and non-final so we can mutate them from the MutableRecordAndPosition
+ E record;
+ long offset;
+ long recordSkipCount;
+
+ /**
+ * Creates a new {@code RecordAndPosition} with the given record and position info.
+ */
+ public RecordAndPosition(E record, long offset, long recordSkipCount) {
+ this.record = record;
+ this.offset = offset;
+ this.recordSkipCount = recordSkipCount;
+ }
+
+ /**
+ * Package private constructor for the {@link MutableRecordAndPosition}.
+ */
+ RecordAndPosition() {}
+
+ // ------------------------------------------------------------------------
+
+ public E getRecord() {
+ return record;
+ }
+
+ public long getOffset() {
+ return offset;
+ }
+
+ public long getRecordSkipCount() {
+ return recordSkipCount;
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public String toString() {
+ return String.format("%s @ %d + %d", record, offset, recordSkipCount);
+ }
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/RecyclableIterator.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/RecyclableIterator.java
new file mode 100644
index 0000000..47cf967
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/RecyclableIterator.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+
+import javax.annotation.Nullable;
+
+/**
+ * Utility base class for iterators that accept a recycler.
+ *
+ * @param <E> The type of the records returned by the iterator.
+ */
+@Internal
+abstract class RecyclableIterator<E> implements BulkFormat.RecordIterator<E> {
+
+ @Nullable
+ private final Runnable recycler;
+
+ /**
+ * Creates a {@code RecyclableIterator} with the given optional recycler.
+ */
+ protected RecyclableIterator(@Nullable Runnable recycler) {
+ this.recycler = recycler;
+ }
+
+ @Override
+ public void releaseBatch() {
+ if (recycler != null) {
+ recycler.run();
+ }
+ }
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/SingletonResultIterator.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/SingletonResultIterator.java
new file mode 100644
index 0000000..7b56e5c
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/SingletonResultIterator.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.util;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+
+import javax.annotation.Nullable;
+
+/**
+ * A simple {@link BulkFormat.RecordIterator} that returns a single value.
+ * The iterator is mutable to support object reuse and supports recycling.
+ *
+ * @param <E> The type of the record returned by the iterator.
+ */
+@PublicEvolving
+public final class SingletonResultIterator<E> extends RecyclableIterator<E> implements BulkFormat.RecordIterator<E> {
+
+ @Nullable
+ private RecordAndPosition<E> element;
+
+ private final MutableRecordAndPosition<E> recordAndPosition;
+
+ public SingletonResultIterator() {
+ this(null);
+ }
+
+ public SingletonResultIterator(@Nullable Runnable recycler) {
+ super(recycler);
+ this.recordAndPosition = new MutableRecordAndPosition<>();
+ }
+
+ // -------------------------------------------------------------------------
+ // Setting
+ // -------------------------------------------------------------------------
+
+ /**
+ * Sets the single record to be returned by this iterator.
+ * The offset and records-to-skip count will be used as provided here for the returned
+ * {@link RecordAndPosition}, meaning they need to point to AFTER this specific record
+ * (because a checkpoint taken after the record was processed needs to resume from after this record).
+ */
+ public void set(final E element, final long offset, final long skipCount) {
+ this.recordAndPosition.set(element, offset, skipCount);
+ this.element = this.recordAndPosition;
+ }
+
+ // -------------------------------------------------------------------------
+ // Result Iterator Methods
+ // -------------------------------------------------------------------------
+
+ @Nullable
+ @Override
+ public RecordAndPosition<E> next() {
+ final RecordAndPosition<E> next = this.element;
+ this.element = null;
+ return next;
+ }
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/Utils.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/Utils.java
new file mode 100644
index 0000000..bdadc1b
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/Utils.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.util;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Miscellaneous utilities for the file source.
+ */
+@PublicEvolving
+public final class Utils {
+
+ /**
+ * Runs the given {@code SupplierWithException} (a piece of code producing a result).
+ * If an exception happens during that, the given closable is quietly closed.
+ */
+ public static <E> E doWithCleanupOnException(
+ final Closeable toCleanUp,
+ final SupplierWithException<E, IOException> code) throws IOException {
+
+ try {
+ return code.get();
+ }
+ catch (Throwable t) {
+ IOUtils.closeQuietly(toCleanUp);
+ ExceptionUtils.rethrowIOException(t);
+ return null; // silence the compiler
+ }
+ }
+
+ /**
+ * Runs the given {@code Runnable}. If an exception happens during that, the given closable
+ * is quietly closed.
+ */
+ public static void doWithCleanupOnException(
+ final Closeable toCleanUp,
+ final ThrowingRunnable<IOException> code) throws IOException {
+
+ doWithCleanupOnException(toCleanUp, (SupplierWithException<Void, IOException>) () -> {
+ code.run();
+ return null;
+ });
+ }
+
+ // ------------------------------------------------------------------------
+
+ /** This class is not meant to be instantiated. */
+ private Utils() {}
+}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceHeavyThroughputTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceHeavyThroughputTest.java
new file mode 100644
index 0000000..278d214
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceHeavyThroughputTest.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src;
+
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.event.NoMoreSplitsEvent;
+import org.apache.flink.connector.file.src.reader.SimpleStreamFormat;
+import org.apache.flink.connector.file.src.reader.StreamFormat;
+import org.apache.flink.connector.file.src.testutils.TestingFileSystem;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+
+import org.junit.After;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * This test simulates readers that just produce byte arrays very fast. The test
+ * is meant to check that this does not break the system in terms of object allocations, etc.
+ */
+public class FileSourceHeavyThroughputTest {
+
+ /** Testing file system reference, to be cleaned up in an @After method. That way it also gets
+ * cleaned up on a test failure, without needing finally clauses in every test. */
+ private TestingFileSystem testFs;
+
+ @After
+ public void unregisterTestFs() throws Exception {
+ if (testFs != null) {
+ testFs.unregister();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Test
+ public void testHeavyThroughput() throws Exception {
+ final Path path = new Path("testfs:///testpath");
+ final long fileSize = 20L << 30; // 20 GB
+ final FileSourceSplit split = new FileSourceSplit("testsplitId", path, 0, fileSize);
+
+ testFs = TestingFileSystem.createForFileStatus(
+ path.toUri().getScheme(),
+ TestingFileSystem.TestFileStatus.forFileWithStream(path, fileSize, new GeneratingInputStream(fileSize)));
+ testFs.register();
+
+ final FileSource<byte[]> source = FileSource.forRecordStreamFormat(new ArrayReaderFormat(), path).build();
+ final SourceReader<byte[], FileSourceSplit> reader = source.createReader(new NoOpReaderContext());
+ reader.addSplits(Collections.singletonList(split));
+ reader.handleSourceEvents(new NoMoreSplitsEvent());
+
+ final ReaderOutput<byte[]> out = new NoOpReaderOutput<>();
+
+ InputStatus status;
+ while ((status = reader.pollNext(out)) != InputStatus.END_OF_INPUT) {
+ // if nothing is available currently, wait for more
+ if (status == InputStatus.NOTHING_AVAILABLE) {
+ reader.isAvailable().get();
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // test mocks
+ // ------------------------------------------------------------------------
+
+ private static final class ArrayReader implements StreamFormat.Reader<byte[]> {
+
+ private static final int ARRAY_SIZE = 1 << 20; // 1 MiByte
+
+ private final FSDataInputStream in;
+
+ ArrayReader(FSDataInputStream in) {
+ this.in = in;
+ }
+
+ @Nullable
+ @Override
+ public byte[] read() throws IOException {
+ final byte[] array = new byte[ARRAY_SIZE];
+ final int read = in.read(array);
+ if (read == array.length) {
+ return array;
+ } else if (read == -1) {
+ return null;
+ } else {
+ return Arrays.copyOf(array, read);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ in.close();
+ }
+ }
+
+ private static final class ArrayReaderFormat extends SimpleStreamFormat<byte[]> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Reader<byte[]> createReader(Configuration config, FSDataInputStream stream) throws IOException {
+ return new ArrayReader(stream);
+ }
+
+ @Override
+ public TypeInformation<byte[]> getProducedType() {
+ return PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO;
+ }
+ }
+
+ private static final class GeneratingInputStream extends FSDataInputStream {
+
+ private final long length;
+ private long pos;
+
+ GeneratingInputStream(long length) {
+ this.length = length;
+ }
+
+ @Override
+ public void seek(long desired) throws IOException {
+ checkArgument(desired >= 0 && desired <= length);
+ pos = desired;
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return pos;
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (pos < length) {
+ pos++;
+ return 0;
+ } else {
+ return -1;
+ }
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ if (pos < length) {
+ final int remaining = (int) Math.min(len, length - pos);
+ pos += remaining;
+ return remaining;
+ } else {
+ return -1;
+ }
+ }
+ }
+
+ private static final class NoOpReaderContext implements SourceReaderContext {
+
+ @Override
+ public MetricGroup metricGroup() {
+ return new UnregisteredMetricsGroup();
+ }
+
+ @Override
+ public Configuration getConfiguration() {
+ return new Configuration();
+ }
+
+ @Override
+ public String getLocalHostName() {
+ return "localhost";
+ }
+
+ @Override
+ public void sendSourceEventToCoordinator(SourceEvent sourceEvent) {}
+ }
+
+ private static final class NoOpReaderOutput<E> implements ReaderOutput<E> {
+
+ @Override
+ public void collect(E record) {}
+
+ @Override
+ public void collect(E record, long timestamp) {}
+
+ @Override
+ public void emitWatermark(Watermark watermark) {}
+
+ @Override
+ public void markIdle() {}
+
+ @Override
+ public SourceOutput<E> createOutputForSplit(String splitId) {
+ return this;
+ }
+
+ @Override
+ public void releaseOutputForSplit(String splitId) {}
+ }
+}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceSplitSerializerTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceSplitSerializerTest.java
new file mode 100644
index 0000000..16f5720
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceSplitSerializerTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src;
+
+import org.apache.flink.connector.file.src.util.CheckpointedPosition;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.SimpleVersionedSerialization;
+
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+
+/**
+ * Unit tests for the {@link FileSourceSplitSerializer}.
+ */
+public class FileSourceSplitSerializerTest {
+
+ @Test
+ public void serializeSplitWithHosts() throws Exception {
+ final FileSourceSplit split = new FileSourceSplit(
+ "random-id",
+ new Path("hdfs://namenode:14565/some/path/to/a/file"),
+ 100_000_000,
+ 64_000_000,
+ "host1", "host2", "host3");
+
+ final FileSourceSplit deSerialized = serializeAndDeserialize(split);
+
+ assertSplitsEqual(split, deSerialized);
+ }
+
+ @Test
+ public void serializeSplitWithoutHosts() throws Exception {
+ final FileSourceSplit split = new FileSourceSplit(
+ "some-id",
+ new Path("file:/some/path/to/a/file"),
+ 0,
+ 0);
+
+ final FileSourceSplit deSerialized = serializeAndDeserialize(split);
+
+ assertSplitsEqual(split, deSerialized);
+ }
+
+ @Test
+ public void serializeSplitWithReaderPosition() throws Exception {
+ final FileSourceSplit split = new FileSourceSplit(
+ "random-id",
+ new Path("hdfs://namenode:14565/some/path/to/a/file"),
+ 100_000_000,
+ 64_000_000,
+ new String[] {"host1", "host2", "host3"},
+ new CheckpointedPosition(7665391L, 100L));
+
+ final FileSourceSplit deSerialized = serializeAndDeserialize(split);
+
+ assertSplitsEqual(split, deSerialized);
+ }
+
+ @Test
+ public void repeatedSerialization() throws Exception {
+ final FileSourceSplit split = new FileSourceSplit(
+ "an-id",
+ new Path("s3://some-bucket/key/to/the/object"),
+ 0,
+ 1234567);
+
+ serializeAndDeserialize(split);
+ serializeAndDeserialize(split);
+ final FileSourceSplit deSerialized = serializeAndDeserialize(split);
+
+ assertSplitsEqual(split, deSerialized);
+ }
+
+ @Test
+ public void repeatedSerializationCaches() throws Exception {
+ final FileSourceSplit split = new FileSourceSplit(
+ "random-id",
+ new Path("hdfs://namenode:14565/some/path/to/a/file"),
+ 100_000_000,
+ 64_000_000,
+ "host1", "host2", "host3");
+
+ final byte[] ser1 = FileSourceSplitSerializer.INSTANCE.serialize(split);
+ final byte[] ser2 = FileSourceSplitSerializer.INSTANCE.serialize(split);
+
+ assertSame(ser1, ser2);
+ }
+
+ // ------------------------------------------------------------------------
+ // test utils
+ // ------------------------------------------------------------------------
+
+ private static FileSourceSplit serializeAndDeserialize(FileSourceSplit split) throws IOException {
+ final FileSourceSplitSerializer serializer = new FileSourceSplitSerializer();
+ final byte[] bytes = SimpleVersionedSerialization.writeVersionAndSerialize(serializer, split);
+ return SimpleVersionedSerialization.readVersionAndDeSerialize(serializer, bytes);
+ }
+
+ static void assertSplitsEqual(FileSourceSplit expected, FileSourceSplit actual) {
+ assertEquals(expected.splitId(), actual.splitId());
+ assertEquals(expected.path(), actual.path());
+ assertEquals(expected.offset(), actual.offset());
+ assertEquals(expected.length(), actual.length());
+ assertArrayEquals(expected.hostnames(), actual.hostnames());
+ assertEquals(expected.getReaderPosition(), actual.getReaderPosition());
+ }
+}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceSplitStateTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceSplitStateTest.java
new file mode 100644
index 0000000..95c704a
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceSplitStateTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src;
+
+import org.apache.flink.connector.file.src.util.CheckpointedPosition;
+import org.apache.flink.core.fs.Path;
+
+import org.junit.Test;
+
+import java.util.Optional;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unit tests for the {@link FileSourceSplitState}.
+ */
+public class FileSourceSplitStateTest {
+
+ @Test
+ public void testRoundTripWithoutModification() {
+ final FileSourceSplit split = getTestSplit();
+ final FileSourceSplitState state = new FileSourceSplitState(split);
+
+ final FileSourceSplit resultSplit = state.toFileSourceSplit();
+
+ assertEquals(split.getReaderPosition(), resultSplit.getReaderPosition());
+ }
+
+ @Test
+ public void testStateStartsWithSplitValues() {
+ final FileSourceSplit split = getTestSplit(new CheckpointedPosition(123L, 456L));
+ final FileSourceSplitState state = new FileSourceSplitState(split);
+
+ assertEquals(123L, state.getOffset());
+ assertEquals(456L, state.getRecordsToSkipAfterOffset());
+ }
+
+ @Test
+ public void testNewSplitTakesModifiedOffsetAndCount() {
+ final FileSourceSplit split = getTestSplit();
+ final FileSourceSplitState state = new FileSourceSplitState(split);
+
+ state.setPosition(1234L, 7566L);
+ final Optional<CheckpointedPosition> position = state.toFileSourceSplit().getReaderPosition();
+
+ assertTrue(position.isPresent());
+ assertEquals(new CheckpointedPosition(1234L, 7566L), position.get());
+ }
+
+ // ------------------------------------------------------------------------
+
+ private static FileSourceSplit getTestSplit() {
+ return getTestSplit(null);
+ }
+
+ private static FileSourceSplit getTestSplit(CheckpointedPosition position) {
+ return new FileSourceSplit(
+ "test-id",
+ new Path("file:/some/random/path"),
+ 17,
+ 121,
+ new String[] {"localhost"},
+ position);
+ }
+}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceSplitTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceSplitTest.java
new file mode 100644
index 0000000..e58ef6f
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceSplitTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src;
+
+import org.apache.flink.core.fs.Path;
+
+import org.junit.Test;
+
+/**
+ * Unit tests for the {@link FileSourceSplit}.
+ */
+public class FileSourceSplitTest {
+
+ @Test(expected = IllegalArgumentException.class)
+ public void noNullHostsAllowed() {
+ new FileSourceSplit("id", new Path("file:/some/random/path"), 0, 10, "host1", null, "host2");
+ }
+}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
new file mode 100644
index 0000000..8876e7e
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.connector.file.src.reader.TextLineFormat;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * MiniCluster-based integration test for the {@link FileSource}.
+ */
+public class FileSourceTextLinesITCase extends TestLogger {
+
+ private static final int PARALLELISM = 4;
+
+ @ClassRule
+ public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
+
+ @ClassRule
+ public static final MiniClusterWithClientResource MINI_CLUSTER = new MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberTaskManagers(1)
+ .setNumberSlotsPerTaskManager(PARALLELISM)
+ .build());
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * This test runs a job reading bounded input with a stream record format (text lines).
+ */
+ @Test
+ public void testBoundedTextFileSource() throws Exception {
+ final File testDir = TMP_FOLDER.newFolder();
+
+ // our main test data
+ writeAllFiles(testDir);
+
+ // write some junk to hidden files test that common hidden file patterns are filtered by default
+ writeHiddenJunkFiles(testDir);
+
+ final FileSource<String> source = FileSource
+ .forRecordStreamFormat(new TextLineFormat(), Path.fromLocalFile(testDir))
+ .build();
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(PARALLELISM);
+
+ final DataStream<String> stream = env.fromSource(
+ source,
+ WatermarkStrategy.noWatermarks(),
+ "file-source");
+
+ final List<String> result = DataStreamUtils.collectBoundedStream(stream, "Bounded TextFiles Test");
+
+ verifyResult(result);
+ }
+
+ /**
+ * This test runs a job reading continuous input (files appearing over time)
+ * with a stream record format (text lines).
+ */
+ @Test
+ public void testContinuousTextFileSource() throws Exception {
+ final File testDir = TMP_FOLDER.newFolder();
+
+ final FileSource<String> source = FileSource
+ .forRecordStreamFormat(new TextLineFormat(), Path.fromLocalFile(testDir))
+ .monitorContinuously(Duration.ofMillis(5))
+ .build();
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(PARALLELISM);
+
+ final DataStream<String> stream = env.fromSource(
+ source,
+ WatermarkStrategy.noWatermarks(),
+ "file-source");
+
+ final DataStreamUtils.ClientAndIterator<String> client =
+ DataStreamUtils.collectWithClient(stream, "Continuous TextFiles Monitoring Test");
+
+ // write one file, execute, and wait for its result
+ // that way we know that the application was running and the source has
+ // done its first chunk of work already
+
+ final int numLinesFirst = LINES_PER_FILE[0].length;
+ final int numLinesAfter = LINES.length - numLinesFirst;
+
+ writeFile(testDir, 0);
+ final List<String> result1 = DataStreamUtils.collectRecordsFromUnboundedStream(client, numLinesFirst);
+
+ // write the remaining files over time, after that collect the final result
+ for (int i = 1; i < LINES_PER_FILE.length; i++) {
+ Thread.sleep(10);
+ writeFile(testDir, i);
+ }
+
+ final List<String> result2 = DataStreamUtils.collectRecordsFromUnboundedStream(client, numLinesAfter);
+
+ // shut down the job, now that we have all the results we expected.
+ client.client.cancel().get();
+
+ result1.addAll(result2);
+ verifyResult(result1);
+ }
+
+ // ------------------------------------------------------------------------
+ // verification
+ // ------------------------------------------------------------------------
+
+ private static void verifyResult(List<String> lines) {
+ final String[] expected = Arrays.copyOf(LINES, LINES.length);
+ final String[] actual = lines.toArray(new String[0]);
+
+ Arrays.sort(expected);
+ Arrays.sort(actual);
+
+ assertThat(actual, equalTo(expected));
+ }
+
+ // ------------------------------------------------------------------------
+ // test data
+ // ------------------------------------------------------------------------
+
+ private static final String[] FILE_PATHS = new String[] {
+ "text.2",
+ "nested1/text.1",
+ "text.1",
+ "text.3",
+ "nested2/nested21/text",
+ "nested1/text.2",
+ "nested2/text"};
+
+ private static final String[] HIDDEN_JUNK_PATHS = new String[] {
+ // all file names here start with '.' or '_'
+ "_something",
+ ".junk",
+ "nested1/.somefile",
+ "othernested/_ignoredfile",
+ "_nested/file",
+ "nested1/.intermediate/somefile"};
+
+ private static final String[] LINES = new String[] {
+ "To be, or not to be,--that is the question:--",
+ "Whether 'tis nobler in the mind to suffer",
+ "The slings and arrows of outrageous fortune",
+ "Or to take arms against a sea of troubles,",
+ "And by opposing end them?--To die,--to sleep,--",
+ "No more; and by a sleep to say we end",
+ "The heartache, and the thousand natural shocks",
+ "That flesh is heir to,--'tis a consummation",
+ "Devoutly to be wish'd. To die,--to sleep;--",
+ "To sleep! perchance to dream:--ay, there's the rub;",
+ "For in that sleep of death what dreams may come,",
+ "When we have shuffled off this mortal coil,",
+ "Must give us pause: there's the respect",
+ "That makes calamity of so long life;",
+ "For who would bear the whips and scorns of time,",
+ "The oppressor's wrong, the proud man's contumely,",
+ "The pangs of despis'd love, the law's delay,",
+ "The insolence of office, and the spurns",
+ "That patient merit of the unworthy takes,",
+ "When he himself might his quietus make",
+ "With a bare bodkin? who would these fardels bear,",
+ "To grunt and sweat under a weary life,",
+ "But that the dread of something after death,--",
+ "The undiscover'd country, from whose bourn",
+ "No traveller returns,--puzzles the will,",
+ "And makes us rather bear those ills we have",
+ "Than fly to others that we know not of?",
+ "Thus conscience does make cowards of us all;",
+ "And thus the native hue of resolution",
+ "Is sicklied o'er with the pale cast of thought;",
+ "And enterprises of great pith and moment,",
+ "With this regard, their currents turn awry,",
+ "And lose the name of action.--Soft you now!",
+ "The fair Ophelia!--Nymph, in thy orisons",
+ "Be all my sins remember'd."
+ };
+
+ private static final String[][] LINES_PER_FILE = splitLinesForFiles();
+
+ private static String[][] splitLinesForFiles() {
+ final String[][] result = new String[FILE_PATHS.length][];
+
+ final int linesPerFile = LINES.length / FILE_PATHS.length;
+ final int linesForLastFile = LINES.length - ((FILE_PATHS.length - 1) * linesPerFile);
+
+ int pos = 0;
+ for (int i = 0; i < FILE_PATHS.length - 1; i++) {
+ String[] lines = new String[linesPerFile];
+ result[i] = lines;
+ for (int k = 0; k < lines.length; k++) {
+ lines[k] = LINES[pos++];
+ }
+ }
+ String[] lines = new String[linesForLastFile];
+ result[result.length - 1] = lines;
+ for (int k = 0; k < lines.length; k++) {
+ lines[k] = LINES[pos++];
+ }
+ return result;
+ }
+
+ private static void writeFile(File testDir, int num) throws IOException {
+ final File file = new File(testDir, FILE_PATHS[num]);
+ final File parent = file.getParentFile();
+ assertTrue(parent.mkdirs() || parent.exists());
+
+ try (PrintWriter writer = new PrintWriter(new FileWriter(file))) {
+ for (String line : LINES_PER_FILE[num]) {
+ writer.println(line);
+ }
+ }
+ }
+
+ private static void writeAllFiles(File testDir) throws IOException {
+ for (int i = 0; i < FILE_PATHS.length; i++) {
+ writeFile(testDir, i);
+ }
+ }
+
+ private static void writeHiddenJunkFiles(File testDir) throws IOException {
+ for (String junkPath : HIDDEN_JUNK_PATHS) {
+ final File file = new File(testDir, junkPath);
+ final File parent = file.getParentFile();
+ assertTrue(parent.mkdirs() || parent.exists());
+
+ try (PrintWriter writer = new PrintWriter(new FileWriter(file))) {
+ writer.println("This should not end up in the test result.");
+ writer.println("Foo bar bazzl junk");
+ }
+ }
+ }
+}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/PendingSplitsCheckpointSerializerTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/PendingSplitsCheckpointSerializerTest.java
new file mode 100644
index 0000000..bc89dce
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/PendingSplitsCheckpointSerializerTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.SimpleVersionedSerialization;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.function.BiConsumer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+
+/**
+ * Unit tests for the {@link FileSourceSplitSerializer}.
+ */
+public class PendingSplitsCheckpointSerializerTest {
+
+ @Test
+ public void serializeEmptyCheckpoint() throws Exception {
+ final PendingSplitsCheckpoint checkpoint =
+ PendingSplitsCheckpoint.fromCollectionSnapshot(Collections.emptyList());
+
+ final PendingSplitsCheckpoint deSerialized = serializeAndDeserialize(checkpoint);
+
+ assertCheckpointsEqual(checkpoint, deSerialized);
+ }
+
+ @Test
+ public void serializeSomeSplits() throws Exception {
+ final PendingSplitsCheckpoint checkpoint = PendingSplitsCheckpoint.fromCollectionSnapshot(
+ Arrays.asList(testSplit1(), testSplit2(), testSplit3()));
+
+ final PendingSplitsCheckpoint deSerialized = serializeAndDeserialize(checkpoint);
+
+ assertCheckpointsEqual(checkpoint, deSerialized);
+ }
+
+ @Test
+ public void serializeSplitsAndProcessedPaths() throws Exception {
+ final PendingSplitsCheckpoint checkpoint = PendingSplitsCheckpoint.fromCollectionSnapshot(
+ Arrays.asList(testSplit1(), testSplit2(), testSplit3()),
+ Arrays.asList(new Path("file:/some/path"), new Path("s3://bucket/key/and/path"), new Path("hdfs://namenode:12345/path")));
+
+ final PendingSplitsCheckpoint deSerialized = serializeAndDeserialize(checkpoint);
+
+ assertCheckpointsEqual(checkpoint, deSerialized);
+ }
+
+ @Test
+ public void repeatedSerialization() throws Exception {
+ final PendingSplitsCheckpoint checkpoint = PendingSplitsCheckpoint.fromCollectionSnapshot(
+ Arrays.asList(testSplit3(), testSplit1()));
+
+ serializeAndDeserialize(checkpoint);
+ serializeAndDeserialize(checkpoint);
+ final PendingSplitsCheckpoint deSerialized = serializeAndDeserialize(checkpoint);
+
+ assertCheckpointsEqual(checkpoint, deSerialized);
+ }
+
+ @Test
+ public void repeatedSerializationCaches() throws Exception {
+ final PendingSplitsCheckpoint checkpoint = PendingSplitsCheckpoint.fromCollectionSnapshot(
+ Collections.singletonList(testSplit2()));
+
+ final byte[] ser1 = PendingSplitsCheckpointSerializer.INSTANCE.serialize(checkpoint);
+ final byte[] ser2 = PendingSplitsCheckpointSerializer.INSTANCE.serialize(checkpoint);
+
+ assertSame(ser1, ser2);
+ }
+
+ // ------------------------------------------------------------------------
+ // test utils
+ // ------------------------------------------------------------------------
+
+ private static FileSourceSplit testSplit1() {
+ return new FileSourceSplit(
+ "random-id",
+ new Path("hdfs://namenode:14565/some/path/to/a/file"),
+ 100_000_000,
+ 64_000_000,
+ "host1", "host2", "host3");
+ }
+
+ private static FileSourceSplit testSplit2() {
+ return new FileSourceSplit(
+ "some-id",
+ new Path("file:/some/path/to/a/file"),
+ 0,
+ 0);
+ }
+
+ private static FileSourceSplit testSplit3() {
+ return new FileSourceSplit(
+ "an-id",
+ new Path("s3://some-bucket/key/to/the/object"),
+ 0,
+ 1234567);
+ }
+
+ private static PendingSplitsCheckpoint serializeAndDeserialize(PendingSplitsCheckpoint split) throws IOException {
+ final PendingSplitsCheckpointSerializer serializer = new PendingSplitsCheckpointSerializer();
+ final byte[] bytes = SimpleVersionedSerialization.writeVersionAndSerialize(serializer, split);
+ return SimpleVersionedSerialization.readVersionAndDeSerialize(serializer, bytes);
+ }
+
+ private static void assertCheckpointsEqual(PendingSplitsCheckpoint expected, PendingSplitsCheckpoint actual) {
+ assertOrderedCollectionEquals(expected.getSplits(), actual.getSplits(), FileSourceSplitSerializerTest::assertSplitsEqual);
+
+ assertOrderedCollectionEquals(
+ expected.getAlreadyProcessedPaths(), actual.getAlreadyProcessedPaths(), Assert::assertEquals);
+ }
+
+ private static <E> void assertOrderedCollectionEquals(
+ Collection<E> expected,
+ Collection<E> actual,
+ BiConsumer<E, E> equalityAsserter) {
+
+ assertEquals(expected.size(), actual.size());
+ final Iterator<E> expectedIter = expected.iterator();
+ final Iterator<E> actualIter = actual.iterator();
+ while (expectedIter.hasNext()) {
+ equalityAsserter.accept(expectedIter.next(), actualIter.next());
+ }
+ }
+}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/enumerate/BlockSplittingRecursiveEnumeratorTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/enumerate/BlockSplittingRecursiveEnumeratorTest.java
new file mode 100644
index 0000000..24d0414
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/enumerate/BlockSplittingRecursiveEnumeratorTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.enumerate;
+
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.testutils.TestingFileSystem;
+import org.apache.flink.core.fs.Path;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+/**
+ * Unit tests for the {@link BlockSplittingRecursiveEnumerator}.
+ */
+public class BlockSplittingRecursiveEnumeratorTest extends NonSplittingRecursiveEnumeratorTest {
+
+ @Test
+ @Override
+ public void testFileWithMultipleBlocks() throws Exception {
+ final Path testPath = new Path("testfs:///dir/file");
+ testFs = TestingFileSystem.createForFileStatus(
+ "testfs",
+ TestingFileSystem.TestFileStatus.forFileWithBlocks(testPath, 1000L,
+ new TestingFileSystem.TestBlockLocation(0L, 100L, "host1", "host2"),
+ new TestingFileSystem.TestBlockLocation(100L, 520L, "host2", "host3"),
+ new TestingFileSystem.TestBlockLocation(620L, 380L, "host3", "host4")));
+ testFs.register();
+
+ final BlockSplittingRecursiveEnumerator enumerator = createEnumerator();
+ final Collection<FileSourceSplit> splits = enumerator.enumerateSplits(
+ new Path[] { new Path("testfs:///dir")}, 0);
+
+ final Collection<FileSourceSplit> expected = Arrays.asList(
+ new FileSourceSplit("ignoredId", testPath, 0L, 100L, "host1", "host2"),
+ new FileSourceSplit("ignoredId", testPath, 100L, 520L, "host1", "host2"),
+ new FileSourceSplit("ignoredId", testPath, 620L, 380L, "host1", "host2"));
+
+ assertSplitsEqual(expected, splits);
+ }
+
+ protected BlockSplittingRecursiveEnumerator createEnumerator() {
+ return new BlockSplittingRecursiveEnumerator();
+ }
+}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/enumerate/NonSplittingRecursiveEnumeratorTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/enumerate/NonSplittingRecursiveEnumeratorTest.java
new file mode 100644
index 0000000..9837975
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/enumerate/NonSplittingRecursiveEnumeratorTest.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.enumerate;
+
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.testutils.TestingFileSystem;
+import org.apache.flink.core.fs.Path;
+
+import org.junit.After;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Unit tests for the {@link NonSplittingRecursiveEnumerator}.
+ */
+public class NonSplittingRecursiveEnumeratorTest {
+
+ /** Testing file system reference, to be cleaned up in an @After method. That way it also gets
+ * cleaned up on a test failure, without needing finally clauses in every test. */
+ protected TestingFileSystem testFs;
+
+ @After
+ public void unregisterTestFs() throws Exception {
+ if (testFs != null) {
+ testFs.unregister();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Test
+ public void testIncludeFilesFromNestedDirectories() throws Exception {
+ final Path[] testPaths = new Path[] {
+ new Path("testfs:///dir/file1"),
+ new Path("testfs:///dir/nested/file.out"),
+ new Path("testfs:///dir/nested/anotherfile.txt")};
+ testFs = TestingFileSystem.createWithFiles("testfs", testPaths);
+ testFs.register();
+
+ final NonSplittingRecursiveEnumerator enumerator = createEnumerator();
+ final Collection<FileSourceSplit> splits = enumerator.enumerateSplits(
+ new Path[] { new Path("testfs:///dir")}, 1);
+
+ assertThat(toPaths(splits), containsInAnyOrder(testPaths));
+ }
+
+ @Test
+ public void testDefaultHiddenFilesFilter() throws Exception {
+ final Path[] testPaths = new Path[] {
+ new Path("testfs:///visiblefile"),
+ new Path("testfs:///.hiddenfile1"),
+ new Path("testfs:///_hiddenfile2")};
+ testFs = TestingFileSystem.createWithFiles("testfs", testPaths);
+ testFs.register();
+
+ final NonSplittingRecursiveEnumerator enumerator = createEnumerator();
+ final Collection<FileSourceSplit> splits = enumerator.enumerateSplits(
+ new Path[] { new Path("testfs:///")}, 1);
+
+ assertEquals(Collections.singletonList(new Path("testfs:///visiblefile")), toPaths(splits));
+ }
+
+ @Test
+ public void testHiddenDirectories() throws Exception {
+ final Path[] testPaths = new Path[] {
+ new Path("testfs:///dir/visiblefile"),
+ new Path("testfs:///dir/.hiddendir/file"),
+ new Path("testfs:///_notvisible/afile")};
+ testFs = TestingFileSystem.createWithFiles("testfs", testPaths);
+ testFs.register();
+
+ final NonSplittingRecursiveEnumerator enumerator = createEnumerator();
+ final Collection<FileSourceSplit> splits = enumerator.enumerateSplits(
+ new Path[] { new Path("testfs:///")}, 1);
+
+ assertEquals(Collections.singletonList(new Path("testfs:///dir/visiblefile")), toPaths(splits));
+ }
+
+ @Test
+ public void testFilesWithNoBlockInfo() throws Exception {
+ final Path testPath = new Path("testfs:///dir/file1");
+ testFs = TestingFileSystem.createForFileStatus(
+ "testfs",
+ TestingFileSystem.TestFileStatus.forFileWithBlocks(testPath, 12345L));
+ testFs.register();
+
+ final NonSplittingRecursiveEnumerator enumerator = createEnumerator();
+ final Collection<FileSourceSplit> splits = enumerator.enumerateSplits(
+ new Path[] { new Path("testfs:///dir")}, 0);
+
+ assertEquals(1, splits.size());
+ assertSplitsEqual(
+ new FileSourceSplit("ignoredId", testPath, 0L, 12345L),
+ splits.iterator().next());
+ }
+
+ @Test
+ public void testFileWithIncorrectBlocks() throws Exception {
+ final Path testPath = new Path("testfs:///testdir/testfile");
+
+ testFs = TestingFileSystem.createForFileStatus(
+ "testfs",
+ TestingFileSystem.TestFileStatus.forFileWithBlocks(testPath, 10000L,
+ new TestingFileSystem.TestBlockLocation(0L, 1000L),
+ new TestingFileSystem.TestBlockLocation(2000L, 1000L)));
+ testFs.register();
+
+ final NonSplittingRecursiveEnumerator enumerator = createEnumerator();
+ final Collection<FileSourceSplit> splits = enumerator.enumerateSplits(
+ new Path[] { new Path("testfs:///testdir")}, 0);
+
+ assertEquals(1, splits.size());
+ assertSplitsEqual(
+ new FileSourceSplit("ignoredId", testPath, 0L, 10000L),
+ splits.iterator().next());
+ }
+
+ @Test
+ public void testFileWithMultipleBlocks() throws Exception {
+ final Path testPath = new Path("testfs:///dir/file");
+ testFs = TestingFileSystem.createForFileStatus(
+ "testfs",
+ TestingFileSystem.TestFileStatus.forFileWithBlocks(testPath, 1000L,
+ new TestingFileSystem.TestBlockLocation(0L, 100L, "host1", "host2"),
+ new TestingFileSystem.TestBlockLocation(100L, 520L, "host2", "host3"),
+ new TestingFileSystem.TestBlockLocation(620L, 380L, "host3", "host4")));
+ testFs.register();
+
+ final NonSplittingRecursiveEnumerator enumerator = createEnumerator();
+ final Collection<FileSourceSplit> splits = enumerator.enumerateSplits(
+ new Path[] { new Path("testfs:///dir")}, 0);
+
+ assertSplitsEqual(
+ new FileSourceSplit("ignoredId", testPath, 0L, 1000L, "host1", "host2", "host3", "host4"),
+ splits.iterator().next());
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * The instantiation of the enumerator is overridable so that we can reuse these tests for sub-classes.
+ */
+ protected NonSplittingRecursiveEnumerator createEnumerator() {
+ return new NonSplittingRecursiveEnumerator();
+ }
+
+ // ------------------------------------------------------------------------
+
+ protected static void assertSplitsEqual(final FileSourceSplit expected, final FileSourceSplit actual) {
+ assertEquals(expected.path(), actual.path());
+ assertEquals(expected.offset(), actual.offset());
+ assertEquals(expected.length(), actual.length());
+ assertArrayEquals(expected.hostnames(), actual.hostnames());
+ }
+
+ protected static void assertSplitsEqual(
+ final Collection<FileSourceSplit> expected,
+ final Collection<FileSourceSplit> actual) {
+
+ assertEquals(expected.size(), actual.size());
+
+ final ArrayList<FileSourceSplit> expectedCopy = new ArrayList<>(expected);
+ final ArrayList<FileSourceSplit> actualCopy = new ArrayList<>(expected);
+ expectedCopy.sort(NonSplittingRecursiveEnumeratorTest::compareFileSourceSplit);
+ actualCopy.sort(NonSplittingRecursiveEnumeratorTest::compareFileSourceSplit);
+
+ for (int i = 0; i < expectedCopy.size(); i++) {
+ assertSplitsEqual(expectedCopy.get(i), actualCopy.get(i));
+ }
+ }
+
+ protected static Collection<Path> toPaths(Collection<FileSourceSplit> splits) {
+ return splits.stream().map(FileSourceSplit::path).collect(Collectors.toList());
+ }
+
+ private static int compareFileSourceSplit(FileSourceSplit a, FileSourceSplit b) {
+ return Long.compare(a.offset(), b.offset());
+ }
+}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/AdapterTestBase.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/AdapterTestBase.java
new file mode 100644
index 0000000..ea59379
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/AdapterTestBase.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.impl;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connector.file.src.reader.StreamFormat;
+import org.apache.flink.connector.file.src.testutils.TestingFileSystem;
+import org.apache.flink.connector.file.src.util.CheckpointedPosition;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.Path;
+
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Queue;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Base class for adapters, as used by {@link StreamFormatAdapterTest} and {@link FileRecordFormatAdapterTest}.
+ */
+public abstract class AdapterTestBase<FormatT> {
+
+ @ClassRule
+ public static final TemporaryFolder TMP_DIR = new TemporaryFolder();
+
+ protected static final int NUM_NUMBERS = 100;
+ protected static final long FILE_LEN = 4 * NUM_NUMBERS;
+
+ protected static Path testPath;
+
+ @BeforeClass
+ public static void writeTestFile() throws IOException {
+ final File testFile = new File(TMP_DIR.getRoot(), "testFile");
+ testPath = Path.fromLocalFile(testFile);
+
+ try (DataOutputStream out = new DataOutputStream(new FileOutputStream(testFile))) {
+ for (int i = 0; i < NUM_NUMBERS; i++) {
+ out.writeInt(i);
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // format specific instantiation
+ // ------------------------------------------------------------------------
+
+ protected abstract FormatT createCheckpointedFormat();
+
+ protected abstract FormatT createNonCheckpointedFormat();
+
+ protected abstract FormatT createFormatFailingInInstantiation();
+
+ protected abstract BulkFormat<Integer> wrapWithAdapter(FormatT format);
+
+ // ------------------------------------------------------------------------
+ // shared tests
+ // ------------------------------------------------------------------------
+
+ @Test
+ public void testRecoverCheckpointedFormatOneSplit() throws IOException {
+ testReading(createCheckpointedFormat(), 1, 5, 44);
+ }
+
+ @Test
+ public void testRecoverCheckpointedFormatMultipleSplits() throws IOException {
+ testReading(createCheckpointedFormat(), 3, 11, 33, 56);
+ }
+
+ @Test
+ public void testRecoverNonCheckpointedFormatOneSplit() throws IOException {
+ testReading(createNonCheckpointedFormat(), 1, 5, 44);
+ }
+
+ private void testReading(FormatT format, int numSplits, int... recoverAfterRecords) throws IOException {
+ // add the end boundary for recovery
+ final int[] boundaries = Arrays.copyOf(recoverAfterRecords, recoverAfterRecords.length + 1);
+ boundaries[boundaries.length - 1] = NUM_NUMBERS;
+
+ // set a fetch size so that we get three records per fetch
+ final Configuration config = new Configuration();
+ config.set(StreamFormat.FETCH_IO_SIZE, new MemorySize(10));
+
+ final BulkFormat<Integer> adapter = wrapWithAdapter(format);
+ final Queue<FileSourceSplit> splits = buildSplits(numSplits);
+ final List<Integer> result = new ArrayList<>();
+
+ FileSourceSplit currentSplit = null;
+ BulkFormat.Reader<Integer> currentReader = null;
+
+ for (int nextRecordToRecover : boundaries) {
+ final Tuple2<FileSourceSplit, CheckpointedPosition> toRecoverFrom = readNumbers(
+ currentReader, currentSplit,
+ adapter, splits, config,
+ result,
+ nextRecordToRecover - result.size());
+
+ currentSplit = toRecoverFrom.f0;
+ currentReader = adapter.restoreReader(config, currentSplit.path(), currentSplit.offset(), currentSplit.length(), toRecoverFrom.f1);
+ }
+
+ verifyIntListResult(result);
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Test
+ public void testClosesStreamIfReaderCreationFails() throws Exception {
+ // setup
+ final Path testPath = new Path("testFs:///testpath-1");
+ final CloseTestingInputStream in = new CloseTestingInputStream();
+ final TestingFileSystem testFs = TestingFileSystem.createForFileStatus("testFs",
+ TestingFileSystem.TestFileStatus.forFileWithStream(testPath, 1024, in));
+ testFs.register();
+
+ // test
+ final BulkFormat<Integer> adapter = wrapWithAdapter(createFormatFailingInInstantiation());
+ try {
+ adapter.createReader(new Configuration(), testPath, 0, 1024);
+ } catch (IOException ignored) {}
+
+ // assertions
+ assertTrue(in.closed);
+
+ // cleanup
+ testFs.unregister();
+ }
+
+ @Test
+ public void testClosesStreamIfReaderRestoreFails() throws Exception {
+ // setup
+ final Path testPath = new Path("testFs:///testpath-1");
+ final CloseTestingInputStream in = new CloseTestingInputStream();
+ final TestingFileSystem testFs = TestingFileSystem.createForFileStatus("testFs",
+ TestingFileSystem.TestFileStatus.forFileWithStream(testPath, 1024, in));
+ testFs.register();
+
+ // test
+ final BulkFormat<Integer> adapter = wrapWithAdapter(createFormatFailingInInstantiation());
+ try {
+ adapter.restoreReader(
+ new Configuration(), testPath, 0, 1024,
+ new CheckpointedPosition(0L, 5L));
+ } catch (IOException ignored) {}
+
+ // assertions
+ assertTrue(in.closed);
+
+ // cleanup
+ testFs.unregister();
+ }
+
+ // ------------------------------------------------------------------------
+ // test helpers
+ // ------------------------------------------------------------------------
+
+ protected static void verifyIntListResult(List<Integer> result) {
+ assertEquals("wrong result size", NUM_NUMBERS, result.size());
+ int nextExpected = 0;
+ for (int next : result) {
+ if (next != nextExpected++) {
+ fail("Wrong result: " + result);
+ }
+ }
+ }
+
+ protected static void readNumbers(BulkFormat.Reader<Integer> reader, List<Integer> result, int num) throws IOException {
+ readNumbers(reader, null, null, null, null, result, num);
+ }
+
+ protected static Tuple2<FileSourceSplit, CheckpointedPosition> readNumbers(
+ BulkFormat.Reader<Integer> currentReader,
+ FileSourceSplit currentSplit,
+ BulkFormat<Integer> format,
+ Queue<FileSourceSplit> moreSplits,
+ Configuration config,
+ List<Integer> result,
+ int num) throws IOException {
+
+ long offset = Long.MIN_VALUE;
+ long skip = Long.MIN_VALUE;
+
+ // loop across splits
+ while (num > 0) {
+ if (currentReader == null) {
+ currentSplit = moreSplits.poll();
+ assertNotNull(currentSplit);
+ currentReader = format.createReader(config, currentSplit.path(), currentSplit.offset(), currentSplit.length());
+ }
+
+ // loop across batches
+ BulkFormat.RecordIterator<Integer> nextBatch;
+ while (num > 0 && (nextBatch = currentReader.readBatch()) != null) {
+
+ // loop across record in batch
+ RecordAndPosition<Integer> next;
+ while (num > 0 && (next = nextBatch.next()) != null) {
+ num--;
+ result.add(next.getRecord());
+ offset = next.getOffset();
+ skip = next.getRecordSkipCount();
+ }
+ }
+
+ currentReader.close();
+ currentReader = null;
+ }
+
+ return new Tuple2<>(currentSplit, new CheckpointedPosition(offset, skip));
+ }
+
+ static Queue<FileSourceSplit> buildSplits(int numSplits) {
+ final Queue<FileSourceSplit> splits = new ArrayDeque<>();
+ final long rangeForSplit = FILE_LEN / numSplits;
+
+ for (int i = 0; i < numSplits - 1; i++) {
+ splits.add(new FileSourceSplit("ID-" + i, testPath, i * rangeForSplit, rangeForSplit));
+ }
+ final long startOfLast = (numSplits - 1) * rangeForSplit;
+ splits.add(new FileSourceSplit("ID-" + (numSplits - 1), testPath, startOfLast, FILE_LEN - startOfLast));
+ return splits;
+ }
+
+ // ------------------------------------------------------------------------
+ // Test Mocks and Stubs
+ // ------------------------------------------------------------------------
+
+ private static class CloseTestingInputStream extends FSDataInputStream {
+
+ boolean closed;
+
+ @Override
+ public void seek(long desired) throws IOException {}
+
+ @Override
+ public long getPos() throws IOException {
+ return 0;
+ }
+
+ @Override
+ public int read() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void close() throws IOException {
+ closed = true;
+ }
+ }
+}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/FileRecordFormatAdapterTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/FileRecordFormatAdapterTest.java
new file mode 100644
index 0000000..779f38e
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/FileRecordFormatAdapterTest.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.impl;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connector.file.src.reader.FileRecordFormat;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Unit and behavior tests for the {@link FileRecordFormatAdapter}.
+ */
+@SuppressWarnings("serial")
+public class FileRecordFormatAdapterTest extends AdapterTestBase<FileRecordFormat<Integer>> {
+
+ @Override
+ protected FileRecordFormat<Integer> createCheckpointedFormat() {
+ return new IntFileRecordFormat(true);
+ }
+
+ @Override
+ protected FileRecordFormat<Integer> createNonCheckpointedFormat() {
+ return new IntFileRecordFormat(false);
+ }
+
+ @Override
+ protected FileRecordFormat<Integer> createFormatFailingInInstantiation() {
+ return new FailingInstantiationFormat();
+ }
+
+ @Override
+ protected BulkFormat<Integer> wrapWithAdapter(FileRecordFormat<Integer> format) {
+ return new FileRecordFormatAdapter<>(format);
+ }
+
+ // ------------------------------------------------------------------------
+ // not applicable tests
+ // ------------------------------------------------------------------------
+
+ @Override
+ public void testClosesStreamIfReaderCreationFails() throws Exception {
+ // ignore this test
+ }
+
+
+ // ------------------------------------------------------------------------
+ // test mocks
+ // ------------------------------------------------------------------------
+
+ private static final class IntFileRecordFormat implements FileRecordFormat<Integer> {
+
+ private final boolean checkpointed;
+
+ IntFileRecordFormat(boolean checkpointed) {
+ this.checkpointed = checkpointed;
+ }
+
+ @Override
+ public Reader<Integer> createReader(
+ Configuration config,
+ Path filePath,
+ long splitOffset,
+ long splitLength) throws IOException {
+
+ final FileSystem fs = filePath.getFileSystem();
+ final FileStatus status = fs.getFileStatus(filePath);
+ final FSDataInputStream in = fs.open(filePath);
+
+ final long fileLen = status.getLen();
+ final long splitEnd = splitOffset + splitLength;
+
+ assertEquals("invalid file length", 0, fileLen % 4);
+
+ // round all positions to the next integer boundary
+ // to simulate common split behavior, we round up to the next int boundary even when we
+ // are at a perfect boundary. exceptions are if we are start or end.
+ final long start = splitOffset == 0L ? 0L : splitOffset + 4 - splitOffset % 4;
+ final long end = splitEnd == fileLen ? fileLen : splitEnd + 4 - splitEnd % 4;
+ in.seek(start);
+
+ return new TestIntReader(in, end, checkpointed);
+ }
+
+ @Override
+ public Reader<Integer> restoreReader(
+ Configuration config,
+ Path filePath,
+ long restoredOffset,
+ long splitOffset,
+ long splitLength) throws IOException {
+
+ final FileSystem fs = filePath.getFileSystem();
+ final FileStatus status = fs.getFileStatus(filePath);
+ final FSDataInputStream in = fs.open(filePath);
+
+ final long fileLen = status.getLen();
+ final long splitEnd = splitOffset + splitLength;
+
+ assertEquals("invalid file length", 0, fileLen % 4);
+
+ // round end position to the next integer boundary
+ final long end = splitEnd == fileLen ? fileLen : splitEnd + 4 - splitEnd % 4;
+ // no rounding of checkpointed offset
+ in.seek(restoredOffset);
+ return new TestIntReader(in, end, checkpointed);
+ }
+
+ @Override
+ public boolean isSplittable() {
+ return true;
+ }
+
+ @Override
+ public TypeInformation<Integer> getProducedType() {
+ return Types.INT;
+ }
+ }
+
+ private static final class FailingInstantiationFormat implements FileRecordFormat<Integer> {
+
+ @Override
+ public Reader<Integer> createReader(
+ Configuration config,
+ Path filePath,
+ long splitOffset,
+ long splitLength) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Reader<Integer> restoreReader(
+ Configuration config,
+ Path filePath,
+ long restoredOffset,
+ long splitOffset,
+ long splitLength) throws IOException {
+
+ final FSDataInputStream in = filePath.getFileSystem().open(filePath);
+ return new FailingReader(in);
+ }
+
+ @Override
+ public boolean isSplittable() {
+ return false;
+ }
+
+ @Override
+ public TypeInformation<Integer> getProducedType() {
+ return Types.INT;
+ }
+
+ private static final class FailingReader implements FileRecordFormat.Reader<Integer> {
+
+ private final FSDataInputStream stream;
+
+ FailingReader(FSDataInputStream stream) {
+ this.stream = stream;
+ }
+
+ @Nullable
+ @Override
+ public Integer read() throws IOException {
+ throw new IOException("test exception");
+ }
+
+ @Override
+ public void close() throws IOException {
+ stream.close();
+ }
+ }
+ }
+}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/FileRecordsTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/FileRecordsTest.java
new file mode 100644
index 0000000..42bc443
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/FileRecordsTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.impl;
+
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+import org.apache.flink.connector.file.src.util.SingletonResultIterator;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unit tests for the {@link FileRecords} class.
+ */
+public class FileRecordsTest {
+
+ @Test
+ public void testEmptySplits() {
+ final String split = "empty";
+ final FileRecords<Object> records = FileRecords.finishedSplit(split);
+
+ assertEquals(Collections.singleton(split), records.finishedSplits());
+ }
+
+ @Test
+ public void testMoveToFirstSplit() {
+ final String splitId = "splitId";
+ final FileRecords<Object> records = FileRecords.forRecords(splitId, new SingletonResultIterator<>());
+
+ final String firstSplitId = records.nextSplit();
+
+ assertEquals(splitId, firstSplitId);
+ }
+
+ @Test
+ public void testMoveToSecondSplit() {
+ final FileRecords<Object> records = FileRecords.forRecords("splitId", new SingletonResultIterator<>());
+ records.nextSplit();
+
+ final String secondSplitId = records.nextSplit();
+
+ assertNull(secondSplitId);
+ }
+
+ @Test
+ public void testRecordsFromFirstSplit() {
+ final SingletonResultIterator<String> iter = new SingletonResultIterator<>();
+ iter.set("test", 18, 99);
+ final FileRecords<String> records = FileRecords.forRecords("splitId", iter);
+ records.nextSplit();
+
+ final RecordAndPosition<String> recAndPos = records.nextRecordFromSplit();
+
+ assertEquals("test", recAndPos.getRecord());
+ assertEquals(18, recAndPos.getOffset());
+ assertEquals(99, recAndPos.getRecordSkipCount());
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testRecordsInitiallyIllegal() {
+ final FileRecords<Object> records = FileRecords.forRecords("splitId", new SingletonResultIterator<>());
+
+ records.nextRecordFromSplit();
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testRecordsOnSecondSplitIllegal() {
+ final FileRecords<Object> records = FileRecords.forRecords("splitId", new SingletonResultIterator<>());
+ records.nextSplit();
+ records.nextSplit();
+
+ records.nextRecordFromSplit();
+ }
+
+ @Test
+ public void testRecycleExhaustedBatch() {
+ final AtomicBoolean recycled = new AtomicBoolean(false);
+ final SingletonResultIterator<Object> iter = new SingletonResultIterator<>(() -> recycled.set(true));
+ iter.set(new Object(), 1L, 2L);
+
+ final FileRecords<Object> records = FileRecords.forRecords("test split", iter);
+ records.nextSplit();
+ records.nextRecordFromSplit();
+
+ // make sure we exhausted the iterator
+ assertNull(records.nextRecordFromSplit());
+ assertNull(records.nextSplit());
+
+ records.recycle();
+ assertTrue(recycled.get());
+ }
+
+ @Test
+ public void testRecycleNonExhaustedBatch() {
+ final AtomicBoolean recycled = new AtomicBoolean(false);
+ final SingletonResultIterator<Object> iter = new SingletonResultIterator<>(() -> recycled.set(true));
+ iter.set(new Object(), 1L, 2L);
+
+ final FileRecords<Object> records = FileRecords.forRecords("test split", iter);
+ records.nextSplit();
+
+ records.recycle();
+ assertTrue(recycled.get());
+ }
+}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/StreamFormatAdapterTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/StreamFormatAdapterTest.java
new file mode 100644
index 0000000..29e74b1
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/StreamFormatAdapterTest.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.impl;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connector.file.src.reader.SimpleStreamFormat;
+import org.apache.flink.connector.file.src.reader.StreamFormat;
+import org.apache.flink.core.fs.FSDataInputStream;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Unit and behavior tests for the {@link StreamFormatAdapter}.
+ */
+@SuppressWarnings("serial")
+public class StreamFormatAdapterTest extends AdapterTestBase<StreamFormat<Integer>> {
+
+ // ------------------------------------------------------------------------
+ // Factories for Shared Tests
+ // ------------------------------------------------------------------------
+
+ @Override
+ protected StreamFormat<Integer> createCheckpointedFormat() {
+ return new CheckpointedIntFormat();
+ }
+
+ @Override
+ protected StreamFormat<Integer> createNonCheckpointedFormat() {
+ return new NonCheckpointedIntFormat();
+ }
+
+ @Override
+ protected StreamFormat<Integer> createFormatFailingInInstantiation() {
+ return new FailingInstantiationFormat();
+ }
+
+ @Override
+ protected BulkFormat<Integer> wrapWithAdapter(StreamFormat<Integer> format) {
+ return new StreamFormatAdapter<>(format);
+ }
+
+ // ------------------------------------------------------------------------
+ // Additional Unit Tests
+ // ------------------------------------------------------------------------
+
+ @Test
+ public void testReadSmallBatchSize() throws IOException {
+ simpleReadTest(1);
+ }
+
+ @Test
+ public void testBatchSizeMatchesOneRecord() throws IOException {
+ simpleReadTest(4);
+ }
+
+ @Test
+ public void testBatchSizeIsRecordMultiple() throws IOException {
+ simpleReadTest(20);
+ }
+
+ private void simpleReadTest(int batchSize) throws IOException {
+ final Configuration config = new Configuration();
+ config.set(StreamFormat.FETCH_IO_SIZE, new MemorySize(batchSize));
+ final StreamFormatAdapter<Integer> format = new StreamFormatAdapter<>(new CheckpointedIntFormat());
+ final BulkFormat.Reader<Integer> reader = format.createReader(config, testPath, 0L, FILE_LEN);
+
+ final List<Integer> result = new ArrayList<>();
+ readNumbers(reader, result, NUM_NUMBERS);
+
+ verifyIntListResult(result);
+ }
+
+ // ------------------------------------------------------------------------
+ // test mocks
+ // ------------------------------------------------------------------------
+
+ private static final class CheckpointedIntFormat implements StreamFormat<Integer> {
+
+ @Override
+ public Reader<Integer> createReader(
+ Configuration config,
+ FSDataInputStream stream,
+ long fileLen,
+ long splitEnd) throws IOException {
+
+ assertEquals("invalid file length", 0, fileLen % 4);
+
+ // round all positions to the next integer boundary
+ // to simulate common split behavior, we round up to the next int boundary even when we
+ // are at a perfect boundary. exceptions are if we are start or end.
+ final long currPos = stream.getPos();
+ final long start = currPos == 0L ? 0L : currPos + 4 - currPos % 4;
+ final long end = splitEnd == fileLen ? fileLen : splitEnd + 4 - splitEnd % 4;
+ stream.seek(start);
+
+ return new TestIntReader(stream, end, true);
+ }
+
+ @Override
+ public Reader<Integer> restoreReader(
+ Configuration config,
+ FSDataInputStream stream,
+ long restoredOffset,
+ long fileLen,
+ long splitEnd) throws IOException {
+
+ assertEquals("invalid file length", 0, fileLen % 4);
+
+ // round end position to the next integer boundary
+ final long end = splitEnd == fileLen ? fileLen : splitEnd + 4 - splitEnd % 4;
+ // no rounding of checkpointed offset
+ stream.seek(restoredOffset);
+ return new TestIntReader(stream, end, true);
+ }
+
+ @Override
+ public boolean isSplittable() {
+ return true;
+ }
+
+ @Override
+ public TypeInformation<Integer> getProducedType() {
+ return Types.INT;
+ }
+ }
+
+ private static final class NonCheckpointedIntFormat extends SimpleStreamFormat<Integer> {
+
+ @Override
+ public Reader<Integer> createReader(Configuration config, FSDataInputStream stream) throws IOException {
+ return new TestIntReader(stream, Long.MAX_VALUE, false);
+ }
+
+ @Override
+ public TypeInformation<Integer> getProducedType() {
+ return Types.INT;
+ }
+ }
+
+ private static final class FailingInstantiationFormat implements StreamFormat<Integer> {
+
+ @Override
+ public Reader<Integer> createReader(
+ Configuration config,
+ FSDataInputStream stream,
+ long fileLen,
+ long splitEnd) throws IOException {
+ throw new IOException("test exception");
+ }
+
+ @Override
+ public Reader<Integer> restoreReader(
+ Configuration config,
+ FSDataInputStream stream,
+ long restoredOffset,
+ long fileLen,
+ long splitEnd) throws IOException {
+ throw new IOException("test exception");
+ }
+
+ @Override
+ public boolean isSplittable() {
+ return false;
+ }
+
+ @Override
+ public TypeInformation<Integer> getProducedType() {
+ return Types.INT;
+ }
+ }
+}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/TestIntReader.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/TestIntReader.java
new file mode 100644
index 0000000..b37e02e
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/TestIntReader.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.impl;
+
+import org.apache.flink.connector.file.src.reader.FileRecordFormat;
+import org.apache.flink.connector.file.src.reader.StreamFormat;
+import org.apache.flink.connector.file.src.util.CheckpointedPosition;
+import org.apache.flink.core.fs.FSDataInputStream;
+
+import javax.annotation.Nullable;
+
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+
+/**
+ * Simple reader for integers, that is both a {@link StreamFormat.Reader} and a {@link FileRecordFormat.Reader}.
+ */
+class TestIntReader implements StreamFormat.Reader<Integer>, FileRecordFormat.Reader<Integer> {
+
+ private static final int SKIPS_PER_OFFSET = 7;
+
+ private final FSDataInputStream in;
+ private final DataInputStream din;
+
+ private final long endOffset;
+ private long currentOffset;
+ private long currentSkipCount;
+
+ private final boolean checkpointed;
+
+ TestIntReader(FSDataInputStream in, long endOffset, boolean checkpointsPosition) throws IOException {
+ this.in = in;
+ this.endOffset = endOffset;
+ this.currentOffset = in.getPos();
+ this.din = new DataInputStream(in);
+ this.checkpointed = checkpointsPosition;
+ }
+
+ @Nullable
+ @Override
+ public Integer read() throws IOException {
+ if (in.getPos() >= endOffset) {
+ return null;
+ }
+
+ try {
+ final int next = din.readInt();
+ incrementPosition();
+ return next;
+ } catch (EOFException e) {
+ return null;
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ in.close();
+ }
+
+ @Nullable
+ @Override
+ public CheckpointedPosition getCheckpointedPosition() {
+ return checkpointed ? new CheckpointedPosition(currentOffset, currentSkipCount) : null;
+ }
+
+ private void incrementPosition() {
+ currentSkipCount++;
+ if (currentSkipCount >= SKIPS_PER_OFFSET) {
+ currentOffset += 4 * currentSkipCount;
+ currentSkipCount = 0;
+ }
+ }
+}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/testutils/TestingFileSystem.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/testutils/TestingFileSystem.java
new file mode 100644
index 0000000..3596024
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/testutils/TestingFileSystem.java
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.testutils;
+
+import org.apache.flink.core.fs.BlockLocation;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemKind;
+import org.apache.flink.core.fs.Path;
+
+import javax.annotation.Nullable;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link FileSystem} for tests containing a pre-defined set of files and directories.
+ *
+ * <p>The file system can be registered under its schema at the file system registry, so that
+ * the {@link Path#getFileSystem()} method finds this test file system. Use the
+ * {@link TestingFileSystem#register()} method to do that, and don't forget to
+ * {@link TestingFileSystem#unregister()} when the test is done.
+ */
+public class TestingFileSystem extends FileSystem {
+
+ private final String scheme;
+
+ private final Map<Path, Collection<FileStatus>> directories;
+
+ private final Map<Path, TestFileStatus> files;
+
+ private TestingFileSystem(
+ final String scheme,
+ final Map<Path, Collection<FileStatus>> directories,
+ final Map<Path, TestFileStatus> files) {
+ this.scheme = scheme;
+ this.directories = directories;
+ this.files = files;
+ }
+
+ // ------------------------------------------------------------------------
+ // Factories
+ // ------------------------------------------------------------------------
+
+ public static TestingFileSystem createWithFiles(final String scheme, final Path... files) {
+ return createWithFiles(scheme, Arrays.asList(files));
+ }
+
+ public static TestingFileSystem createWithFiles(final String scheme, final Collection<Path> files) {
+ checkNotNull(scheme, "scheme");
+ checkNotNull(files, "files");
+
+ final Collection<TestFileStatus> status = files.stream()
+ .map((path) -> TestFileStatus.forFileWithDefaultBlock(path, 10L << 20))
+ .collect(Collectors.toList());
+
+ return createForFileStatus(scheme, status);
+ }
+
+ public static TestingFileSystem createForFileStatus(final String scheme, final TestFileStatus... files) {
+ return createForFileStatus(scheme, Arrays.asList(files));
+ }
+
+ public static TestingFileSystem createForFileStatus(final String scheme, final Collection<TestFileStatus> files) {
+ checkNotNull(scheme, "scheme");
+ checkNotNull(files, "files");
+
+ final HashMap<Path, TestFileStatus> fileMap = new HashMap<>(files.size());
+ final HashMap<Path, Collection<FileStatus>> directories = new HashMap<>();
+
+ for (TestFileStatus file : files) {
+ if (fileMap.putIfAbsent(file.getPath(), file) != null) {
+ throw new IllegalStateException("Already have a status for path " + file.getPath());
+ }
+ addParentDirectories(file, fileMap, directories);
+ }
+
+ return new TestingFileSystem(scheme, directories, fileMap);
+ }
+
+ private static void addParentDirectories(
+ final TestFileStatus file,
+ final Map<Path, TestFileStatus> files,
+ final Map<Path, Collection<FileStatus>> directories) {
+
+ final Path parentPath = file.getPath().getParent();
+ if (parentPath == null) {
+ return;
+ }
+
+ final TestFileStatus parentStatus = TestFileStatus.forDirectory(parentPath);
+ directories.computeIfAbsent(parentPath, (key) -> new ArrayList<>()).add(file);
+
+ final TestFileStatus existingParent = files.putIfAbsent(parentPath, parentStatus);
+ if (existingParent == null) {
+ addParentDirectories(parentStatus, files, directories);
+ } else {
+ checkArgument(existingParent.isDir(), "have a file already for a directory path");
+ }
+ }
+
+
+ // ------------------------------------------------------------------------
+ // Test Utility
+ // ------------------------------------------------------------------------
+
+ public void register() throws Exception {
+ final Object key = createFsKey(scheme);
+ final Map<Object, Object> fsMap = getFsRegistry();
+ fsMap.put(key, this);
+ }
+
+ public void unregister() throws Exception {
+ final Object key = createFsKey(scheme);
+ final Map<Object, Object> fsMap = getFsRegistry();
+ fsMap.remove(key);
+ }
+
+ private static Object createFsKey(String scheme) throws Exception {
+ final Class<?> fsKeyClass = Class.forName("org.apache.flink.core.fs.FileSystem$FSKey");
+ final Constructor<?> ctor = fsKeyClass.getConstructor(String.class, String.class);
+ ctor.setAccessible(true);
+ return ctor.newInstance(scheme, null);
+ }
+
+ @SuppressWarnings("unchecked")
+ private static Map<Object, Object> getFsRegistry() throws Exception {
+ final Field cacheField = FileSystem.class.getDeclaredField("CACHE");
+ cacheField.setAccessible(true);
+ return (Map<Object, Object>) cacheField.get(null);
+ }
+
+ // ------------------------------------------------------------------------
+ // File System Methods
+ // ------------------------------------------------------------------------
+
+ @Override
+ public Path getWorkingDirectory() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Path getHomeDirectory() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public URI getUri() {
+ return URI.create(scheme + "://");
+ }
+
+ @Override
+ public FileStatus getFileStatus(Path f) throws IOException {
+ final FileStatus status = files.get(f);
+ if (status != null) {
+ return status;
+ } else {
+ throw new FileNotFoundException("File not found: " + f);
+ }
+ }
+
+ @Override
+ public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException {
+ final TestFileStatus status = file instanceof TestFileStatus
+ ? (TestFileStatus) file
+ : files.get(file.getPath());
+
+ if (status == null) {
+ throw new FileNotFoundException(file.getPath().toString());
+ }
+
+ return status.getBlocks();
+ }
+
+ @Override
+ public FSDataInputStream open(Path f, int bufferSize) throws IOException {
+ return open(f);
+ }
+
+ @Override
+ public FSDataInputStream open(Path f) throws IOException {
+ final TestFileStatus status = (TestFileStatus) getFileStatus(f);
+ if (status.stream != null) {
+ return status.stream;
+ } else {
+ throw new UnsupportedOperationException("No stream registered for this file");
+ }
+ }
+
+ @Override
+ public FileStatus[] listStatus(Path f) throws IOException {
+ final Collection<FileStatus> dirContents = directories.get(f);
+ if (dirContents != null) {
+ return dirContents.toArray(new FileStatus[dirContents.size()]);
+ } else {
+ throw new FileNotFoundException("Directory not found: " + f);
+ }
+ }
+
+ @Override
+ public boolean delete(Path f, boolean recursive) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean mkdirs(Path f) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public FSDataOutputStream create(Path f, WriteMode overwriteMode) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean rename(Path src, Path dst) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isDistributedFS() {
+ return false;
+ }
+
+ @Override
+ public FileSystemKind getKind() {
+ return FileSystemKind.FILE_SYSTEM;
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * Test implementation of a {@link FileStatus}.
+ */
+ public static final class TestFileStatus implements FileStatus {
+
+ private static final long TIME = System.currentTimeMillis();
+
+ public static TestFileStatus forFileWithDefaultBlock(final Path path, final long len) {
+ return forFileWithBlocks(path, len, new TestBlockLocation(0L, len));
+ }
+
+ public static TestFileStatus forFileWithBlocks(final Path path, final long len, final BlockLocation... blocks) {
+ checkNotNull(blocks);
+ return new TestFileStatus(path, len, false, blocks, null);
+ }
+
+ public static TestFileStatus forFileWithStream(final Path path, final long len, final FSDataInputStream stream) {
+ return new TestFileStatus(path, len, false, new BlockLocation[] {new TestBlockLocation(0L, len)}, stream);
+ }
+
+ public static TestFileStatus forDirectory(final Path path) {
+ return new TestFileStatus(path, 4096L, true, null, null);
+ }
+
+ // ------------------------------------------------
+
+ private final Path path;
+ private final long len;
+ private final boolean isDir;
+ private @Nullable final BlockLocation[] blocks;
+ @Nullable final FSDataInputStream stream;
+
+ private TestFileStatus(
+ final Path path,
+ final long len,
+ final boolean isDir,
+ @Nullable final BlockLocation[] blocks,
+ @Nullable final FSDataInputStream stream) {
+ this.path = path;
+ this.len = len;
+ this.isDir = isDir;
+ this.blocks = blocks;
+ this.stream = stream;
+ }
+
+ @Override
+ public long getLen() {
+ return len;
+ }
+
+ @Override
+ public long getBlockSize() {
+ return 64 << 20;
+ }
+
+ @Override
+ public short getReplication() {
+ return 1;
+ }
+
+ @Override
+ public long getModificationTime() {
+ return TIME;
+ }
+
+ @Override
+ public long getAccessTime() {
+ return TIME;
+ }
+
+ @Override
+ public boolean isDir() {
+ return isDir;
+ }
+
+ @Override
+ public Path getPath() {
+ return path;
+ }
+
+ public BlockLocation[] getBlocks() {
+ return blocks;
+ }
+
+ @Override
+ public String toString() {
+ return path.toString() + (isDir ? " [DIR]" : " [FILE]");
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * Test implementation of a {@link BlockLocation}.
+ */
+ public static final class TestBlockLocation implements BlockLocation {
+
+ private final String[] hosts;
+ private final long offset;
+ private final long length;
+
+ public TestBlockLocation(long offset, long length, String... hosts) {
+ checkArgument(offset >= 0);
+ checkArgument(length >= 0);
+ this.offset = offset;
+ this.length = length;
+ this.hosts = checkNotNull(hosts);
+ }
+
+ @Override
+ public String[] getHosts() throws IOException {
+ return hosts;
+ }
+
+ @Override
+ public long getOffset() {
+ return offset;
+ }
+
+ @Override
+ public long getLength() {
+ return length;
+ }
+
+ @Override
+ public int compareTo(BlockLocation o) {
+ return 0;
+ }
+ }
+}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/util/ArrayResultIteratorTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/util/ArrayResultIteratorTest.java
new file mode 100644
index 0000000..a46ffcd
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/util/ArrayResultIteratorTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.util;
+
+import org.junit.Test;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unit tests for the {@link ArrayResultIterator}.
+ */
+public class ArrayResultIteratorTest {
+
+ @Test
+ public void testEmptyConstruction() {
+ final ArrayResultIterator<Object> iter = new ArrayResultIterator<>();
+ assertNull(iter.next());
+ }
+
+ @Test
+ public void testGetElements() {
+ final String[] elements = new String[] { "1", "2", "3", "4"};
+ final long initialPos = 1422;
+ final long initialSkipCount = 17;
+
+ final ArrayResultIterator<String> iter = new ArrayResultIterator<>();
+ iter.set(elements, elements.length, initialPos, initialSkipCount);
+
+ for (int i = 0; i < elements.length; i++) {
+ final RecordAndPosition<String> recAndPos = iter.next();
+ assertEquals(elements[i], recAndPos.getRecord());
+ assertEquals(initialPos, recAndPos.getOffset());
+ assertEquals(initialSkipCount + i + 1, recAndPos.getRecordSkipCount());
+ }
+ }
+
+ @Test
+ public void testExhausted() {
+ final ArrayResultIterator<String> iter = new ArrayResultIterator<>();
+ iter.set(new String[] { "1", "2"}, 2, 0L, 0L);
+
+ iter.next();
+ iter.next();
+
+ assertNull(iter.next());
+ }
+
+ @Test
+ public void testArraySubRange() {
+ final ArrayResultIterator<String> iter = new ArrayResultIterator<>();
+ iter.set(new String[] { "1", "2", "3"}, 2, 0L, 0L);
+
+ assertNotNull(iter.next());
+ assertNotNull(iter.next());
+ assertNull(iter.next());
+ }
+
+ @Test
+ public void testNoRecycler() {
+ final ArrayResultIterator<Object> iter = new ArrayResultIterator<>();
+ iter.releaseBatch();
+ }
+
+ @Test
+ public void testRecycler() {
+ final AtomicBoolean recycled = new AtomicBoolean();
+ final ArrayResultIterator<Object> iter = new ArrayResultIterator<>(() -> recycled.set(true));
+
+ iter.releaseBatch();
+
+ assertTrue(recycled.get());
+ }
+}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/util/IteratorResultIteratorTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/util/IteratorResultIteratorTest.java
new file mode 100644
index 0000000..e355e79
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/util/IteratorResultIteratorTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.util;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+/**
+ * Unit tests for the {@link IteratorResultIterator}.
+ */
+public class IteratorResultIteratorTest {
+
+ @Test
+ public void testGetElements() {
+ final String[] elements = new String[] { "1", "2", "3", "4"};
+ final long initialPos = 1422;
+ final long initialSkipCount = 17;
+
+ final IteratorResultIterator<String> iter = new IteratorResultIterator<>(
+ Arrays.asList(elements).iterator(), initialPos, initialSkipCount);
+
+ for (int i = 0; i < elements.length; i++) {
+ final RecordAndPosition<String> recAndPos = iter.next();
+ assertEquals(elements[i], recAndPos.getRecord());
+ assertEquals(initialPos, recAndPos.getOffset());
+ assertEquals(initialSkipCount + i + 1, recAndPos.getRecordSkipCount());
+ }
+ }
+
+ @Test
+ public void testExhausted() {
+ final IteratorResultIterator<String> iter = new IteratorResultIterator<>(
+ Arrays.asList("1", "2").iterator(), 0L, 0L);
+
+ iter.next();
+ iter.next();
+
+ assertNull(iter.next());
+ }
+}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/util/SingletonResultIteratorTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/util/SingletonResultIteratorTest.java
new file mode 100644
index 0000000..e1bbcd4
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/util/SingletonResultIteratorTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.util;
+
+import org.junit.Test;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unit tests for the {@link SingletonResultIterator}.
+ */
+public class SingletonResultIteratorTest {
+
+ @Test
+ public void testEmptyConstruction() {
+ final SingletonResultIterator<Object> iter = new SingletonResultIterator<>();
+ assertNull(iter.next());
+ }
+
+ @Test
+ public void testGetElement() {
+ final Object element = new Object();
+ final long pos = 1422;
+ final long skipCount = 17;
+
+ final SingletonResultIterator<Object> iter = new SingletonResultIterator<>();
+ iter.set(element, pos, skipCount);
+
+ final RecordAndPosition<Object> record = iter.next();
+ assertNotNull(record);
+ assertEquals(element, record.getRecord());
+ assertEquals(pos, record.getOffset());
+ assertEquals(skipCount, record.getRecordSkipCount());
+ }
+
+ @Test
+ public void testExhausted() {
+ final SingletonResultIterator<Object> iter = new SingletonResultIterator<>();
+ iter.set(new Object(), 1, 2);
+ iter.next();
+
+ assertNull(iter.next());
+ }
+
+ @Test
+ public void testNoRecycler() {
+ final SingletonResultIterator<Object> iter = new SingletonResultIterator<>();
+ iter.releaseBatch();
+ }
+
+ @Test
+ public void testRecycler() {
+ final AtomicBoolean recycled = new AtomicBoolean();
+ final SingletonResultIterator<Object> iter = new SingletonResultIterator<>(() -> recycled.set(true));
+
+ iter.releaseBatch();
+
+ assertTrue(recycled.get());
+ }
+}
diff --git a/flink-connectors/flink-connector-files/src/test/resources/log4j2-test.properties b/flink-connectors/flink-connector-files/src/test/resources/log4j2-test.properties
new file mode 100644
index 0000000..b4d2ba9
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/resources/log4j2-test.properties
@@ -0,0 +1,28 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = OFF
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%t] %-5p %c - %m%n
diff --git a/flink-connectors/pom.xml b/flink-connectors/pom.xml
index 8a0d867..9401351 100644
--- a/flink-connectors/pom.xml
+++ b/flink-connectors/pom.xml
@@ -55,6 +55,7 @@ under the License.
<module>flink-connector-gcp-pubsub</module>
<module>flink-connector-kinesis</module>
<module>flink-connector-base</module>
+ <module>flink-connector-files</module>
</modules>
<!-- override these root dependencies as 'provided', so they don't end up
diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml
index b8a005c..d22b0f1 100644
--- a/flink-dist/pom.xml
+++ b/flink-dist/pom.xml
@@ -144,6 +144,12 @@ under the License.
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-files</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
<!-- Default file system support. The Hadoop and MapR dependencies -->
<!-- are optional, so not being added to the dist jar -->
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index 1af572c..05b3216 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -173,11 +173,12 @@ public class SourceOperator<OUT, SplitT extends SourceSplit>
sourceReader.addSplits(splits);
}
- // Start the reader.
- sourceReader.start();
// Register the reader to the coordinator.
registerReader();
+ // Start the reader after registration, sending messages in start is allowed.
+ sourceReader.start();
+
eventTimeLogic.startPeriodicWatermarkEmits();
}