You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/10/29 23:45:03 UTC

[GitHub] [flink] StephanEwen opened a new pull request #13847: [FLINK-19799][connector files] FileSource supports extensions with subclasses of FileSourceSplit

StephanEwen opened a new pull request #13847:
URL: https://github.com/apache/flink/pull/13847


   ## What is the purpose of the change
   
   This refactors several parts of the new File Source API for classes that need to subclass `FileSourceSplit` to add extra information to the splits.
   
   Examples of that are the Hive Source and the Iceberg Source.
   
   The changes here are purely in the yet unreleased File Source API and hence have no compatibility issues.
   
   ## Brief change log
   
     - [FLINK-19800] Make the interaction between `FileSourceSplit` and `FileSourceSplitState` extensible, to support different split types in the reader.
     - [FLINK-19803] Make `PendingSplitCheckpoint` and its Serializer generic to support sub-classes of `FileSourceSplit`
     - [FLINK-19802] Changes `BulkFormat.createReader(...)` and `BulkFormat.restoreReader(...)` to accept `FileSourceSplit` (or its subclasses) directly, to allow passing extra information via subclasses of `FileSourceSplit` 
     - [FINK-19804] Introduces an `AbstractFileSource` that is generic with respect to split types and changes `FileSource` to extend from that for the specific case of using `FileSourceSplit`. This makes the generics handling simpler for the common case (using `FileSource`).
   
   ## Verifying this change
   
   This is a refactoring that does not change any functionality.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): **no**
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **yes** (but unreleased)
     - The serializers: **no**
     - The runtime per-record code paths (performance sensitive): **no**
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: **no**
     - The S3 file system connector: **no**
   
   ## Documentation
   
     - Does this pull request introduce a new feature? **no**
     - If yes, how is the feature documented? **not applicable**
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] StephanEwen commented on a change in pull request #13847: [FLINK-19799][connector files] FileSource supports extensions with subclasses of FileSourceSplit

Posted by GitBox <gi...@apache.org>.
StephanEwen commented on a change in pull request #13847:
URL: https://github.com/apache/flink/pull/13847#discussion_r516196659



##########
File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/BulkFormat.java
##########
@@ -92,30 +91,22 @@
  * handed over as one.
  */
 @PublicEvolving
-public interface BulkFormat<T> extends Serializable, ResultTypeQueryable<T> {
+public interface BulkFormat<T, SplitT extends FileSourceSplit> extends Serializable, ResultTypeQueryable<T> {
 
 	/**
-	 * Creates a new reader that reads from {@code filePath} starting at {@code offset} and reads
-	 * until {@code length} bytes after the offset.
+	 * Creates a new reader that reads from the {@link FileSourceSplit#path() split's path}
+	 * starting at the {@link FileSourceSplit#offset()} split's offset} and reads
+	 * {@link FileSourceSplit#length() length} bytes after the offset.
 	 */
-	BulkFormat.Reader<T> createReader(
-			Configuration config,
-			Path filePath,
-			long splitOffset,
-			long splitLength) throws IOException;
+	BulkFormat.Reader<T> createReader(Configuration config, SplitT split) throws IOException;

Review comment:
       I was torn between the two options as well:
     - Have only one method, to allow consolidating/unifying the code
     - Have two methods, to make it explicit when restore happens. Less chance for users to not be aware of that restores need to be taken into account.
   
   I decided for the second variant, because ensuring that users do not accidentally overlook the restore aspects seemed more important to me here.

##########
File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/AbstractFileSource.java
##########
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.connector.file.src.assigners.FileSplitAssigner;
+import org.apache.flink.connector.file.src.enumerate.FileEnumerator;
+import org.apache.flink.connector.file.src.impl.ContinuousFileSplitEnumerator;
+import org.apache.flink.connector.file.src.impl.FileSourceReader;
+import org.apache.flink.connector.file.src.impl.StaticFileSplitEnumerator;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connector.file.src.reader.FileRecordFormat;
+import org.apache.flink.connector.file.src.reader.StreamFormat;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The base class for File Sources. The main implementation to use is the {@link FileSource},
+ * which also has the majority of the documentation.
+ *
+ * <p>To read new formats, one commonly does NOT need to extend this class, but should implement
+ * a new Format Reader (like {@link StreamFormat}, {@link BulkFormat}, {@link FileRecordFormat})
+ * and use it with the {@code FileSource}.
+ *
+ * <p>The only reason to extend this class is when a source needs a different type of
+ * <i>split</i>, meaning an extension of the {@link FileSourceSplit} to carry additional
+ * information.
+ *
+ * @param <T> The type of the events/records produced by this source.
+ * @param <SplitT> The subclass type of the FileSourceSplit used by the source implementation.
+ */
+@PublicEvolving
+public abstract class AbstractFileSource<T, SplitT extends FileSourceSplit>
+		implements Source<T, SplitT, PendingSplitsCheckpoint<SplitT>>, ResultTypeQueryable<T> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final Path[] inputPaths;
+
+	private final FileEnumerator.Provider enumeratorFactory;
+
+	private final FileSplitAssigner.Provider assignerFactory;
+
+	private final BulkFormat<T, SplitT> readerFormat;
+
+	@Nullable
+	private final ContinuousEnumerationSettings continuousEnumerationSettings;
+
+	// ------------------------------------------------------------------------
+
+	protected AbstractFileSource(
+			final Path[] inputPaths,
+			final FileEnumerator.Provider fileEnumerator,
+			final FileSplitAssigner.Provider splitAssigner,
+			final BulkFormat<T, SplitT> readerFormat,
+			@Nullable final ContinuousEnumerationSettings continuousEnumerationSettings) {
+
+		checkArgument(inputPaths.length > 0);
+		this.inputPaths = inputPaths;
+		this.enumeratorFactory = checkNotNull(fileEnumerator);
+		this.assignerFactory = checkNotNull(splitAssigner);
+		this.readerFormat = checkNotNull(readerFormat);
+		this.continuousEnumerationSettings = continuousEnumerationSettings;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Source API Methods
+	// ------------------------------------------------------------------------
+
+	@Override
+	public Boundedness getBoundedness() {
+		return continuousEnumerationSettings == null ? Boundedness.BOUNDED : Boundedness.CONTINUOUS_UNBOUNDED;
+	}
+
+	@Override
+	public SourceReader<T, SplitT> createReader(SourceReaderContext readerContext) {
+		return new FileSourceReader<>(readerContext, readerFormat, readerContext.getConfiguration());
+	}
+
+	@Override
+	public SplitEnumerator<SplitT, PendingSplitsCheckpoint<SplitT>> createEnumerator(

Review comment:
       Agree, that can work.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] StephanEwen commented on pull request #13847: [FLINK-19799][connector files] FileSource supports extensions with subclasses of FileSourceSplit

Posted by GitBox <gi...@apache.org>.
StephanEwen commented on pull request #13847:
URL: https://github.com/apache/flink/pull/13847#issuecomment-719087234


   @lirui-apache and @JingsongLi here is the extensible file source proposal.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13847: [FLINK-19799][connector files] FileSource supports extensions with subclasses of FileSourceSplit

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13847:
URL: https://github.com/apache/flink/pull/13847#issuecomment-719094875


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "2046761529b63e6506b8a5448f740a65fb23a339",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8630",
       "triggerID" : "2046761529b63e6506b8a5448f740a65fb23a339",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 2046761529b63e6506b8a5448f740a65fb23a339 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8630) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on pull request #13847: [FLINK-19799][connector files] FileSource supports extensions with subclasses of FileSourceSplit

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on pull request #13847:
URL: https://github.com/apache/flink/pull/13847#issuecomment-719158332


   e25ae2e0f95b03abad1c0a4e72e293476902ef20 Looks good to me.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on pull request #13847: [FLINK-19799][connector files] FileSource supports extensions with subclasses of FileSourceSplit

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on pull request #13847:
URL: https://github.com/apache/flink/pull/13847#issuecomment-719157999


   Thanks for your work, @StephanEwen !


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on pull request #13847: [FLINK-19799][connector files] FileSource supports extensions with subclasses of FileSourceSplit

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on pull request #13847:
URL: https://github.com/apache/flink/pull/13847#issuecomment-720887126


   > Do you think we should merge this now and further evolve it from in the master branch, if we find out we need more?
   
   +1 to merge this. I think all in this PR are good improvement.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] StephanEwen commented on pull request #13847: [FLINK-19799][connector files] FileSource supports extensions with subclasses of FileSourceSplit

Posted by GitBox <gi...@apache.org>.
StephanEwen commented on pull request #13847:
URL: https://github.com/apache/flink/pull/13847#issuecomment-720669864


   Do you think we should merge this now and further evolve it from in the master branch, if we find out we need more?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #13847: [FLINK-19799][connector files] FileSource supports extensions with subclasses of FileSourceSplit

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #13847:
URL: https://github.com/apache/flink/pull/13847#issuecomment-719094875


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "2046761529b63e6506b8a5448f740a65fb23a339",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "2046761529b63e6506b8a5448f740a65fb23a339",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 2046761529b63e6506b8a5448f740a65fb23a339 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #13847: [FLINK-19799][connector files] FileSource supports extensions with subclasses of FileSourceSplit

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #13847:
URL: https://github.com/apache/flink/pull/13847#discussion_r514847781



##########
File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/AbstractFileSource.java
##########
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.connector.file.src.assigners.FileSplitAssigner;
+import org.apache.flink.connector.file.src.enumerate.FileEnumerator;
+import org.apache.flink.connector.file.src.impl.ContinuousFileSplitEnumerator;
+import org.apache.flink.connector.file.src.impl.FileSourceReader;
+import org.apache.flink.connector.file.src.impl.StaticFileSplitEnumerator;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connector.file.src.reader.FileRecordFormat;
+import org.apache.flink.connector.file.src.reader.StreamFormat;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The base class for File Sources. The main implementation to use is the {@link FileSource},
+ * which also has the majority of the documentation.
+ *
+ * <p>To read new formats, one commonly does NOT need to extend this class, but should implement
+ * a new Format Reader (like {@link StreamFormat}, {@link BulkFormat}, {@link FileRecordFormat})
+ * and use it with the {@code FileSource}.
+ *
+ * <p>The only reason to extend this class is when a source needs a different type of
+ * <i>split</i>, meaning an extension of the {@link FileSourceSplit} to carry additional
+ * information.
+ *
+ * @param <T> The type of the events/records produced by this source.
+ * @param <SplitT> The subclass type of the FileSourceSplit used by the source implementation.
+ */
+@PublicEvolving
+public abstract class AbstractFileSource<T, SplitT extends FileSourceSplit>
+		implements Source<T, SplitT, PendingSplitsCheckpoint<SplitT>>, ResultTypeQueryable<T> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final Path[] inputPaths;
+
+	private final FileEnumerator.Provider enumeratorFactory;
+
+	private final FileSplitAssigner.Provider assignerFactory;
+
+	private final BulkFormat<T, SplitT> readerFormat;
+
+	@Nullable
+	private final ContinuousEnumerationSettings continuousEnumerationSettings;
+
+	// ------------------------------------------------------------------------
+
+	protected AbstractFileSource(
+			final Path[] inputPaths,
+			final FileEnumerator.Provider fileEnumerator,
+			final FileSplitAssigner.Provider splitAssigner,
+			final BulkFormat<T, SplitT> readerFormat,
+			@Nullable final ContinuousEnumerationSettings continuousEnumerationSettings) {
+
+		checkArgument(inputPaths.length > 0);
+		this.inputPaths = inputPaths;
+		this.enumeratorFactory = checkNotNull(fileEnumerator);
+		this.assignerFactory = checkNotNull(splitAssigner);
+		this.readerFormat = checkNotNull(readerFormat);
+		this.continuousEnumerationSettings = continuousEnumerationSettings;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Source API Methods
+	// ------------------------------------------------------------------------
+
+	@Override
+	public Boundedness getBoundedness() {
+		return continuousEnumerationSettings == null ? Boundedness.BOUNDED : Boundedness.CONTINUOUS_UNBOUNDED;
+	}
+
+	@Override
+	public SourceReader<T, SplitT> createReader(SourceReaderContext readerContext) {
+		return new FileSourceReader<>(readerContext, readerFormat, readerContext.getConfiguration());
+	}
+
+	@Override
+	public SplitEnumerator<SplitT, PendingSplitsCheckpoint<SplitT>> createEnumerator(

Review comment:
       I can image sub class override here:
   Iceberg: Override all, `return new IcebergEnumerator`.
   Hive: 
   ```
   if (continuousEnumerationSettings != null && isPartitionedTable) {
       return new HiveSplitEnumerator;
   } else {
      return super.createEnumerator();
   }
   ```

##########
File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/AbstractFileSource.java
##########
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.connector.file.src.assigners.FileSplitAssigner;
+import org.apache.flink.connector.file.src.enumerate.FileEnumerator;
+import org.apache.flink.connector.file.src.impl.ContinuousFileSplitEnumerator;
+import org.apache.flink.connector.file.src.impl.FileSourceReader;
+import org.apache.flink.connector.file.src.impl.StaticFileSplitEnumerator;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connector.file.src.reader.FileRecordFormat;
+import org.apache.flink.connector.file.src.reader.StreamFormat;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The base class for File Sources. The main implementation to use is the {@link FileSource},
+ * which also has the majority of the documentation.
+ *
+ * <p>To read new formats, one commonly does NOT need to extend this class, but should implement
+ * a new Format Reader (like {@link StreamFormat}, {@link BulkFormat}, {@link FileRecordFormat})
+ * and use it with the {@code FileSource}.
+ *
+ * <p>The only reason to extend this class is when a source needs a different type of
+ * <i>split</i>, meaning an extension of the {@link FileSourceSplit} to carry additional
+ * information.
+ *
+ * @param <T> The type of the events/records produced by this source.
+ * @param <SplitT> The subclass type of the FileSourceSplit used by the source implementation.
+ */
+@PublicEvolving
+public abstract class AbstractFileSource<T, SplitT extends FileSourceSplit>
+		implements Source<T, SplitT, PendingSplitsCheckpoint<SplitT>>, ResultTypeQueryable<T> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final Path[] inputPaths;
+
+	private final FileEnumerator.Provider enumeratorFactory;
+
+	private final FileSplitAssigner.Provider assignerFactory;
+
+	private final BulkFormat<T, SplitT> readerFormat;
+
+	@Nullable
+	private final ContinuousEnumerationSettings continuousEnumerationSettings;
+
+	// ------------------------------------------------------------------------
+
+	protected AbstractFileSource(
+			final Path[] inputPaths,
+			final FileEnumerator.Provider fileEnumerator,
+			final FileSplitAssigner.Provider splitAssigner,
+			final BulkFormat<T, SplitT> readerFormat,
+			@Nullable final ContinuousEnumerationSettings continuousEnumerationSettings) {
+
+		checkArgument(inputPaths.length > 0);
+		this.inputPaths = inputPaths;
+		this.enumeratorFactory = checkNotNull(fileEnumerator);
+		this.assignerFactory = checkNotNull(splitAssigner);
+		this.readerFormat = checkNotNull(readerFormat);
+		this.continuousEnumerationSettings = continuousEnumerationSettings;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Source API Methods
+	// ------------------------------------------------------------------------
+
+	@Override
+	public Boundedness getBoundedness() {
+		return continuousEnumerationSettings == null ? Boundedness.BOUNDED : Boundedness.CONTINUOUS_UNBOUNDED;
+	}
+
+	@Override
+	public SourceReader<T, SplitT> createReader(SourceReaderContext readerContext) {
+		return new FileSourceReader<>(readerContext, readerFormat, readerContext.getConfiguration());
+	}
+
+	@Override
+	public SplitEnumerator<SplitT, PendingSplitsCheckpoint<SplitT>> createEnumerator(

Review comment:
       I can image sub classes override here:
   Iceberg: Override all, `return new IcebergEnumerator`.
   Hive: 
   ```
   if (continuousEnumerationSettings != null && isPartitionedTable) {
       return new HiveSplitEnumerator;
   } else {
      return super.createEnumerator();
   }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] asfgit closed pull request #13847: [FLINK-19799][connector files] FileSource supports extensions with subclasses of FileSourceSplit

Posted by GitBox <gi...@apache.org>.
asfgit closed pull request #13847:
URL: https://github.com/apache/flink/pull/13847


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #13847: [FLINK-19799][connector files] FileSource supports extensions with subclasses of FileSourceSplit

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #13847:
URL: https://github.com/apache/flink/pull/13847#discussion_r514822513



##########
File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/BulkFormat.java
##########
@@ -92,30 +91,22 @@
  * handed over as one.
  */
 @PublicEvolving
-public interface BulkFormat<T> extends Serializable, ResultTypeQueryable<T> {
+public interface BulkFormat<T, SplitT extends FileSourceSplit> extends Serializable, ResultTypeQueryable<T> {
 
 	/**
-	 * Creates a new reader that reads from {@code filePath} starting at {@code offset} and reads
-	 * until {@code length} bytes after the offset.
+	 * Creates a new reader that reads from the {@link FileSourceSplit#path() split's path}
+	 * starting at the {@link FileSourceSplit#offset()} split's offset} and reads
+	 * {@link FileSourceSplit#length() length} bytes after the offset.
 	 */
-	BulkFormat.Reader<T> createReader(
-			Configuration config,
-			Path filePath,
-			long splitOffset,
-			long splitLength) throws IOException;
+	BulkFormat.Reader<T> createReader(Configuration config, SplitT split) throws IOException;

Review comment:
       Looks like we can merge `createReader` and `restoreReader`, Format can be judged by `FileSourceSplit.getReaderPosition` like `FileSourceSplitReader.checkSplitOrStartNext`.
   
   I don't have a strong tendency: A separate `restore` looks clearer. But may bring some `assert` like `StreamFormatAdapter` do.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #13847: [FLINK-19799][connector files] FileSource supports extensions with subclasses of FileSourceSplit

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #13847:
URL: https://github.com/apache/flink/pull/13847#issuecomment-719086783


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 2046761529b63e6506b8a5448f740a65fb23a339 (Thu Oct 29 23:47:03 UTC 2020)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on pull request #13847: [FLINK-19799][connector files] FileSource supports extensions with subclasses of FileSourceSplit

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on pull request #13847:
URL: https://github.com/apache/flink/pull/13847#issuecomment-719158075


   5ada6ad44dca511b1ef4b22fd3d692eb7f86577d Looks good to me.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13847: [FLINK-19799][connector files] FileSource supports extensions with subclasses of FileSourceSplit

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13847:
URL: https://github.com/apache/flink/pull/13847#issuecomment-719094875


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "2046761529b63e6506b8a5448f740a65fb23a339",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8630",
       "triggerID" : "2046761529b63e6506b8a5448f740a65fb23a339",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 2046761529b63e6506b8a5448f740a65fb23a339 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8630) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org