You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/10/13 06:19:45 UTC

[GitHub] [flink] JingsongLi opened a new pull request #13605: [FLINK-19599][table] Introduce BulkFormatFactory&BulkWriterFactory to integrate new FileSource to table

JingsongLi opened a new pull request #13605:
URL: https://github.com/apache/flink/pull/13605


   
   ## What is the purpose of the change & Brief change log
   
   Introduce BulkFormatFactory: BulkFormat<T> createBulkFormat(context).
   Filesystem connector use this factory to create BulkFormat, and use new FileSource to read files.
   `FileSystemSource` takes precedence over `BulkFormatFactory` to read files.
   
   Introduce BulkWriterFactory: BulkWriter.Factory<T> createBulkWriter(context).
   Filesystem connector use this factory to create BulkWriter, and use streaming file sink to write files.
   `FileSystemSink` takes precedence over `BulkWriterFactory` to write files.
   
   Introduce `ParquetBulkFormatFactory` to implement new interfaces.
   
   ## Verifying this change
   
   This change is already covered by existing tests, such as:
   - `ParquetFsStreamingSinkITCase`
   - `ParquetFileSystemITCase`
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: no
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes)
     - If yes, how is the feature documented? (JavaDocs)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #13605: [FLINK-19599][table] Introduce BulkFormatFactory&BulkWriterFactory to integrate new FileSource to table

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #13605:
URL: https://github.com/apache/flink/pull/13605#issuecomment-707521684


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a9df8a1384eb6306656b4fd952edd4be5d7a857d",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "a9df8a1384eb6306656b4fd952edd4be5d7a857d",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a9df8a1384eb6306656b4fd952edd4be5d7a857d UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13605: [FLINK-19599][table] Introduce Filesystem format factories to integrate new FileSource to table

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13605:
URL: https://github.com/apache/flink/pull/13605#issuecomment-707521684


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a9df8a1384eb6306656b4fd952edd4be5d7a857d",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "a9df8a1384eb6306656b4fd952edd4be5d7a857d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8beabe2e1b04afe449610c44f4c376909b463e10",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7493",
       "triggerID" : "8beabe2e1b04afe449610c44f4c376909b463e10",
       "triggerType" : "PUSH"
     }, {
       "hash" : "76ff23e27070dc54e6af85dd91c2742457621aa9",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7565",
       "triggerID" : "76ff23e27070dc54e6af85dd91c2742457621aa9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "315b69fafef254ff273bc36c713d28166260e4c6",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8362",
       "triggerID" : "315b69fafef254ff273bc36c713d28166260e4c6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "05116d4768052821a37adfa725e60e40f4f71176",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "05116d4768052821a37adfa725e60e40f4f71176",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a9df8a1384eb6306656b4fd952edd4be5d7a857d UNKNOWN
   * 315b69fafef254ff273bc36c713d28166260e4c6 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8362) 
   * 05116d4768052821a37adfa725e60e40f4f71176 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13605: [FLINK-19599][table] Introduce Filesystem format factories to integrate new FileSource to table

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13605:
URL: https://github.com/apache/flink/pull/13605#issuecomment-707521684


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a9df8a1384eb6306656b4fd952edd4be5d7a857d",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "a9df8a1384eb6306656b4fd952edd4be5d7a857d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8beabe2e1b04afe449610c44f4c376909b463e10",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7493",
       "triggerID" : "8beabe2e1b04afe449610c44f4c376909b463e10",
       "triggerType" : "PUSH"
     }, {
       "hash" : "76ff23e27070dc54e6af85dd91c2742457621aa9",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7565",
       "triggerID" : "76ff23e27070dc54e6af85dd91c2742457621aa9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "315b69fafef254ff273bc36c713d28166260e4c6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8362",
       "triggerID" : "315b69fafef254ff273bc36c713d28166260e4c6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "05116d4768052821a37adfa725e60e40f4f71176",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8403",
       "triggerID" : "05116d4768052821a37adfa725e60e40f4f71176",
       "triggerType" : "PUSH"
     }, {
       "hash" : "61fed9445fbd820423b73ef520d5f9ffc9e0606d",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "61fed9445fbd820423b73ef520d5f9ffc9e0606d",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a9df8a1384eb6306656b4fd952edd4be5d7a857d UNKNOWN
   * 05116d4768052821a37adfa725e60e40f4f71176 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8403) 
   * 61fed9445fbd820423b73ef520d5f9ffc9e0606d UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #13605: [FLINK-19599][table] Introduce BulkFormatFactory&BulkWriterFactory to integrate new FileSource to table

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #13605:
URL: https://github.com/apache/flink/pull/13605#issuecomment-707517857


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit a9df8a1384eb6306656b4fd952edd4be5d7a857d (Tue Oct 13 06:22:59 UTC 2020)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi merged pull request #13605: [FLINK-19599][table] Introduce Filesystem format factories to integrate new FileSource to table

Posted by GitBox <gi...@apache.org>.
JingsongLi merged pull request #13605:
URL: https://github.com/apache/flink/pull/13605


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #13605: [FLINK-19599][table] Introduce Filesystem format factories to integrate new FileSource to table

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #13605:
URL: https://github.com/apache/flink/pull/13605#discussion_r513200210



##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java
##########
@@ -199,15 +202,32 @@ private Path toStagingPath() {
 	}
 
 	@SuppressWarnings("unchecked")
-	private OutputFormatFactory<RowData> createOutputFormatFactory() {
-		Object writer = createWriter();
+	private OutputFormatFactory<RowData> createOutputFormatFactory(Context sinkContext) {
+		Object writer = createWriter(sinkContext);
 		return writer instanceof Encoder ?
 				path -> createEncoderOutputFormat((Encoder<RowData>) writer, path) :
 				path -> createBulkWriterOutputFormat((BulkWriter.Factory<RowData>) writer, path);
 	}
 
-	private Object createWriter() {
-		FileSystemFormatFactory formatFactory = createFormatFactory(tableOptions);
+	private DataType getFormatDataType() {
+		TableSchema.Builder builder = TableSchema.builder();
+		schema.getTableColumns().forEach(column -> {
+			if (!partitionKeys.contains(column.getName())) {
+				builder.add(column);
+			}
+		});
+		return builder.build().toRowDataType();
+	}
+
+	private Object createWriter(Context sinkContext) {
+		@SuppressWarnings("rawtypes")
+		Optional<EncodingFormat> encodingFormat = discoverOptionalEncodingFormat(BulkWriterFactory.class)
+				.map(Optional::of).orElseGet(() -> discoverOptionalEncodingFormat(EncoderFactory.class));
+		if (encodingFormat.isPresent()) {
+			return encodingFormat.get().createRuntimeEncoder(sinkContext, getFormatDataType());
+		}
+
+		FileSystemFormatFactory formatFactory = createFormatFactory();

Review comment:
       https://issues.apache.org/jira/browse/FLINK-19845




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13605: [FLINK-19599][table] Introduce Filesystem format factories to integrate new FileSource to table

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13605:
URL: https://github.com/apache/flink/pull/13605#issuecomment-707521684


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a9df8a1384eb6306656b4fd952edd4be5d7a857d",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "a9df8a1384eb6306656b4fd952edd4be5d7a857d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8beabe2e1b04afe449610c44f4c376909b463e10",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7493",
       "triggerID" : "8beabe2e1b04afe449610c44f4c376909b463e10",
       "triggerType" : "PUSH"
     }, {
       "hash" : "76ff23e27070dc54e6af85dd91c2742457621aa9",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7565",
       "triggerID" : "76ff23e27070dc54e6af85dd91c2742457621aa9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "315b69fafef254ff273bc36c713d28166260e4c6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8362",
       "triggerID" : "315b69fafef254ff273bc36c713d28166260e4c6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "05116d4768052821a37adfa725e60e40f4f71176",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8403",
       "triggerID" : "05116d4768052821a37adfa725e60e40f4f71176",
       "triggerType" : "PUSH"
     }, {
       "hash" : "61fed9445fbd820423b73ef520d5f9ffc9e0606d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8475",
       "triggerID" : "61fed9445fbd820423b73ef520d5f9ffc9e0606d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fa0dc913f56aa2567c30073c044f688f9ed74fee",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8485",
       "triggerID" : "fa0dc913f56aa2567c30073c044f688f9ed74fee",
       "triggerType" : "PUSH"
     }, {
       "hash" : "96468e31f7614ef314c655f97a63fcabe83505a8",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8499",
       "triggerID" : "96468e31f7614ef314c655f97a63fcabe83505a8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5ef88c8f400b6c2ef5f5ca6bbd750c17f27137c7",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "5ef88c8f400b6c2ef5f5ca6bbd750c17f27137c7",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a9df8a1384eb6306656b4fd952edd4be5d7a857d UNKNOWN
   * 96468e31f7614ef314c655f97a63fcabe83505a8 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8499) 
   * 5ef88c8f400b6c2ef5f5ca6bbd750c17f27137c7 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13605: [FLINK-19599][table] Introduce BulkFormatFactory&BulkWriterFactory to integrate new FileSource to table

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13605:
URL: https://github.com/apache/flink/pull/13605#issuecomment-707521684


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a9df8a1384eb6306656b4fd952edd4be5d7a857d",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "a9df8a1384eb6306656b4fd952edd4be5d7a857d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8beabe2e1b04afe449610c44f4c376909b463e10",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7493",
       "triggerID" : "8beabe2e1b04afe449610c44f4c376909b463e10",
       "triggerType" : "PUSH"
     }, {
       "hash" : "76ff23e27070dc54e6af85dd91c2742457621aa9",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7565",
       "triggerID" : "76ff23e27070dc54e6af85dd91c2742457621aa9",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a9df8a1384eb6306656b4fd952edd4be5d7a857d UNKNOWN
   * 8beabe2e1b04afe449610c44f4c376909b463e10 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7493) 
   * 76ff23e27070dc54e6af85dd91c2742457621aa9 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7565) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #13605: [FLINK-19599][table] Introduce Filesystem format factories to integrate new FileSource to table

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #13605:
URL: https://github.com/apache/flink/pull/13605#discussion_r513200897



##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java
##########
@@ -88,7 +92,16 @@ private FileSystemTableSource(
 	}
 
 	@Override
-	public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
+	public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
+		Optional<BulkDecodingFormat<RowData>> bulkDecodingFormat = discoverBulkDecodingFormat();
+
+		if (!partitionKeys.isEmpty() && getOrFetchPartitions().isEmpty()) {
+			// When this table has no partition, just return a empty source.
+			return InputFormatProvider.of(new CollectionInputFormat<>(new ArrayList<>(), null));
+		} else if (bulkDecodingFormat.isPresent()) {
+			return SourceProvider.of(createBulkFormatSource(bulkDecodingFormat.get(), scanContext));
+		}
+
 		return new DataStreamScanProvider() {
 			@Override
 			public DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv) {

Review comment:
       The `ContinuousFileMonitoringFunction` can not accept multiple paths. Default `StreamEnv.createInput` will create continuous function.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on pull request #13605: [FLINK-19599][table] Introduce BulkFormatFactory&BulkWriterFactory to integrate new FileSource to table

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on pull request #13605:
URL: https://github.com/apache/flink/pull/13605#issuecomment-708865098


   After discussed with Jark, I think we should try to `EncodingFormatFactory` and `DecodingFormatFactory`.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13605: [FLINK-19599][table] Introduce BulkFormatFactory&BulkWriterFactory to integrate new FileSource to table

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13605:
URL: https://github.com/apache/flink/pull/13605#issuecomment-707521684


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a9df8a1384eb6306656b4fd952edd4be5d7a857d",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "a9df8a1384eb6306656b4fd952edd4be5d7a857d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8beabe2e1b04afe449610c44f4c376909b463e10",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7493",
       "triggerID" : "8beabe2e1b04afe449610c44f4c376909b463e10",
       "triggerType" : "PUSH"
     }, {
       "hash" : "76ff23e27070dc54e6af85dd91c2742457621aa9",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7565",
       "triggerID" : "76ff23e27070dc54e6af85dd91c2742457621aa9",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a9df8a1384eb6306656b4fd952edd4be5d7a857d UNKNOWN
   * 76ff23e27070dc54e6af85dd91c2742457621aa9 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7565) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13605: [FLINK-19599][table] Introduce Filesystem format factories to integrate new FileSource to table

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13605:
URL: https://github.com/apache/flink/pull/13605#issuecomment-707521684


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a9df8a1384eb6306656b4fd952edd4be5d7a857d",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "a9df8a1384eb6306656b4fd952edd4be5d7a857d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8beabe2e1b04afe449610c44f4c376909b463e10",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7493",
       "triggerID" : "8beabe2e1b04afe449610c44f4c376909b463e10",
       "triggerType" : "PUSH"
     }, {
       "hash" : "76ff23e27070dc54e6af85dd91c2742457621aa9",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7565",
       "triggerID" : "76ff23e27070dc54e6af85dd91c2742457621aa9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "315b69fafef254ff273bc36c713d28166260e4c6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8362",
       "triggerID" : "315b69fafef254ff273bc36c713d28166260e4c6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "05116d4768052821a37adfa725e60e40f4f71176",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8403",
       "triggerID" : "05116d4768052821a37adfa725e60e40f4f71176",
       "triggerType" : "PUSH"
     }, {
       "hash" : "61fed9445fbd820423b73ef520d5f9ffc9e0606d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8475",
       "triggerID" : "61fed9445fbd820423b73ef520d5f9ffc9e0606d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fa0dc913f56aa2567c30073c044f688f9ed74fee",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8485",
       "triggerID" : "fa0dc913f56aa2567c30073c044f688f9ed74fee",
       "triggerType" : "PUSH"
     }, {
       "hash" : "96468e31f7614ef314c655f97a63fcabe83505a8",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8499",
       "triggerID" : "96468e31f7614ef314c655f97a63fcabe83505a8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5ef88c8f400b6c2ef5f5ca6bbd750c17f27137c7",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8549",
       "triggerID" : "5ef88c8f400b6c2ef5f5ca6bbd750c17f27137c7",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a9df8a1384eb6306656b4fd952edd4be5d7a857d UNKNOWN
   * 96468e31f7614ef314c655f97a63fcabe83505a8 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8499) 
   * 5ef88c8f400b6c2ef5f5ca6bbd750c17f27137c7 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8549) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13605: [FLINK-19599][table] Introduce Filesystem format factories to integrate new FileSource to table

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13605:
URL: https://github.com/apache/flink/pull/13605#issuecomment-707521684


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a9df8a1384eb6306656b4fd952edd4be5d7a857d",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "a9df8a1384eb6306656b4fd952edd4be5d7a857d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8beabe2e1b04afe449610c44f4c376909b463e10",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7493",
       "triggerID" : "8beabe2e1b04afe449610c44f4c376909b463e10",
       "triggerType" : "PUSH"
     }, {
       "hash" : "76ff23e27070dc54e6af85dd91c2742457621aa9",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7565",
       "triggerID" : "76ff23e27070dc54e6af85dd91c2742457621aa9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "315b69fafef254ff273bc36c713d28166260e4c6",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "315b69fafef254ff273bc36c713d28166260e4c6",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a9df8a1384eb6306656b4fd952edd4be5d7a857d UNKNOWN
   * 76ff23e27070dc54e6af85dd91c2742457621aa9 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7565) 
   * 315b69fafef254ff273bc36c713d28166260e4c6 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13605: [FLINK-19599][table] Introduce BulkFormatFactory&BulkWriterFactory to integrate new FileSource to table

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13605:
URL: https://github.com/apache/flink/pull/13605#issuecomment-707521684


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a9df8a1384eb6306656b4fd952edd4be5d7a857d",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "a9df8a1384eb6306656b4fd952edd4be5d7a857d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8beabe2e1b04afe449610c44f4c376909b463e10",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "8beabe2e1b04afe449610c44f4c376909b463e10",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a9df8a1384eb6306656b4fd952edd4be5d7a857d UNKNOWN
   * 8beabe2e1b04afe449610c44f4c376909b463e10 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #13605: [FLINK-19599][table] Introduce Filesystem format factories to integrate new FileSource to table

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #13605:
URL: https://github.com/apache/flink/pull/13605#discussion_r513201380



##########
File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetFileSystemFormatFactory.java
##########
@@ -93,124 +117,17 @@ private static Configuration getParquetConfiguration(ReadableConfig options) {
 	}
 
 	@Override
-	public InputFormat<RowData, ?> createReader(ReaderContext context) {
-		return new ParquetInputFormat(
-				context.getPaths(),
-				context.getSchema().getFieldNames(),
-				context.getSchema().getFieldDataTypes(),
-				context.getProjectFields(),
-				context.getDefaultPartName(),
-				context.getPushedDownLimit(),
-				getParquetConfiguration(context.getFormatOptions()),
-				context.getFormatOptions().get(UTC_TIMEZONE));
+	public String factoryIdentifier() {
+		return "parquet";
 	}
 
 	@Override
-	public Optional<BulkWriter.Factory<RowData>> createBulkWriterFactory(WriterContext context) {
-		return Optional.of(ParquetRowDataBuilder.createWriterFactory(
-				RowType.of(Arrays.stream(context.getFormatFieldTypes())
-								.map(DataType::getLogicalType)
-								.toArray(LogicalType[]::new),
-						context.getFormatFieldNames()),
-				getParquetConfiguration(context.getFormatOptions()),
-				context.getFormatOptions().get(UTC_TIMEZONE)));
+	public Set<ConfigOption<?>> requiredOptions() {
+		return new HashSet<>();
 	}
 
 	@Override
-	public Optional<Encoder<RowData>> createEncoder(WriterContext context) {
-		return Optional.empty();
-	}
-
-	/**
-	 * An implementation of {@link ParquetInputFormat} to read {@link RowData} records
-	 * from Parquet files.
-	 */
-	public static class ParquetInputFormat extends FileInputFormat<RowData> {
-
-		private static final long serialVersionUID = 1L;
-
-		private final String[] fullFieldNames;
-		private final DataType[] fullFieldTypes;
-		private final int[] selectedFields;
-		private final String partDefaultName;
-		private final boolean utcTimestamp;
-		private final SerializableConfiguration conf;
-		private final long limit;
-
-		private transient ParquetColumnarRowSplitReader reader;
-		private transient long currentReadCount;
-
-		public ParquetInputFormat(
-				Path[] paths,
-				String[] fullFieldNames,
-				DataType[] fullFieldTypes,
-				int[] selectedFields,
-				String partDefaultName,
-				long limit,
-				Configuration conf,
-				boolean utcTimestamp) {
-			super.setFilePaths(paths);
-			this.limit = limit;
-			this.partDefaultName = partDefaultName;
-			this.fullFieldNames = fullFieldNames;
-			this.fullFieldTypes = fullFieldTypes;
-			this.selectedFields = selectedFields;
-			this.conf = new SerializableConfiguration(conf);
-			this.utcTimestamp = utcTimestamp;
-		}
-
-		@Override
-		public void open(FileInputSplit fileSplit) throws IOException {
-			// generate partition specs.
-			List<String> fieldNameList = Arrays.asList(fullFieldNames);
-			LinkedHashMap<String, String> partSpec = PartitionPathUtils.extractPartitionSpecFromPath(
-					fileSplit.getPath());
-			LinkedHashMap<String, Object> partObjects = new LinkedHashMap<>();
-			partSpec.forEach((k, v) -> partObjects.put(k, restorePartValueFromType(
-					partDefaultName.equals(v) ? null : v,
-					fullFieldTypes[fieldNameList.indexOf(k)])));
-
-			this.reader = ParquetSplitReaderUtil.genPartColumnarRowReader(
-					utcTimestamp,
-					true,
-					conf.conf(),
-					fullFieldNames,
-					fullFieldTypes,
-					partObjects,
-					selectedFields,
-					DEFAULT_SIZE,
-					new Path(fileSplit.getPath().toString()),
-					fileSplit.getStart(),
-					fileSplit.getLength());
-			this.currentReadCount = 0L;
-		}
-
-		@Override
-		public boolean supportsMultiPaths() {
-			return true;
-		}
-
-		@Override
-		public boolean reachedEnd() throws IOException {
-			if (currentReadCount >= limit) {
-				return true;
-			} else {
-				return reader.reachedEnd();
-			}
-		}
-
-		@Override
-		public RowData nextRecord(RowData reuse) {
-			currentReadCount++;
-			return reader.nextRecord();
-		}
-
-		@Override
-		public void close() throws IOException {
-			if (reader != null) {
-				this.reader.close();
-			}
-			this.reader = null;
-		}
+	public Set<ConfigOption<?>> optionalOptions() {
+		return new HashSet<>();

Review comment:
       We can't enumerate all of its options, so we can't validate them, so we simply don't add any




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on pull request #13605: [FLINK-19599][table] Introduce Filesystem format factories to integrate new FileSource to table

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on pull request #13605:
URL: https://github.com/apache/flink/pull/13605#issuecomment-717196631


   CC: @wuchong @lirui-apache 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #13605: [FLINK-19599][table] Introduce Filesystem format factories to integrate new FileSource to table

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #13605:
URL: https://github.com/apache/flink/pull/13605#discussion_r513208619



##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java
##########
@@ -108,13 +121,39 @@ public boolean isBounded() {
 		};
 	}
 
-	private InputFormat<RowData, ?> getInputFormat() {
-		// When this table has no partition, just return a empty source.
-		if (!partitionKeys.isEmpty() && getOrFetchPartitions().isEmpty()) {
-			return new CollectionInputFormat<>(new ArrayList<>(), null);
+	private FileSource<RowData> createBulkFormatSource(
+			BulkDecodingFormat<RowData> decodingFormat, ScanContext scanContext) {
+		decodingFormat.applyLimit(pushedDownLimit());
+		decodingFormat.applyFilters(pushedDownFilters());
+		BulkFormat<RowData> bulkFormat = decodingFormat.createRuntimeDecoder(
+				scanContext, getProducedDataType());
+		FileSource.FileSourceBuilder<RowData> builder = FileSource
+				.forBulkFileFormat(bulkFormat, paths());
+		return builder.build();
+	}
+
+	private Path[] paths() {
+		if (partitionKeys.isEmpty()) {
+			return new Path[] {path};
+		} else {
+			return getOrFetchPartitions().stream()
+					.map(FileSystemTableSource.this::toFullLinkedPartSpec)
+					.map(PartitionPathUtils::generatePartitionPath)
+					.map(n -> new Path(path, n))
+					.toArray(Path[]::new);
 		}
+	}
+
+	private long pushedDownLimit() {
+		return limit == null ? Long.MAX_VALUE : limit;

Review comment:
       We can do something like this:
   ```
   		if (limit != null) {
   			decodingFormat.applyLimit(limit);
   		}
   		if (filters != null && filters.size() > 0) {
   			decodingFormat.applyFilters(filters);
   		}
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13605: [FLINK-19599][table] Introduce Filesystem format factories to integrate new FileSource to table

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13605:
URL: https://github.com/apache/flink/pull/13605#issuecomment-707521684


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a9df8a1384eb6306656b4fd952edd4be5d7a857d",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "a9df8a1384eb6306656b4fd952edd4be5d7a857d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8beabe2e1b04afe449610c44f4c376909b463e10",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7493",
       "triggerID" : "8beabe2e1b04afe449610c44f4c376909b463e10",
       "triggerType" : "PUSH"
     }, {
       "hash" : "76ff23e27070dc54e6af85dd91c2742457621aa9",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7565",
       "triggerID" : "76ff23e27070dc54e6af85dd91c2742457621aa9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "315b69fafef254ff273bc36c713d28166260e4c6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8362",
       "triggerID" : "315b69fafef254ff273bc36c713d28166260e4c6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "05116d4768052821a37adfa725e60e40f4f71176",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8403",
       "triggerID" : "05116d4768052821a37adfa725e60e40f4f71176",
       "triggerType" : "PUSH"
     }, {
       "hash" : "61fed9445fbd820423b73ef520d5f9ffc9e0606d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8475",
       "triggerID" : "61fed9445fbd820423b73ef520d5f9ffc9e0606d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fa0dc913f56aa2567c30073c044f688f9ed74fee",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8485",
       "triggerID" : "fa0dc913f56aa2567c30073c044f688f9ed74fee",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a9df8a1384eb6306656b4fd952edd4be5d7a857d UNKNOWN
   * fa0dc913f56aa2567c30073c044f688f9ed74fee Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8485) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13605: [FLINK-19599][table] Introduce Filesystem format factories to integrate new FileSource to table

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13605:
URL: https://github.com/apache/flink/pull/13605#issuecomment-707521684


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a9df8a1384eb6306656b4fd952edd4be5d7a857d",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "a9df8a1384eb6306656b4fd952edd4be5d7a857d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8beabe2e1b04afe449610c44f4c376909b463e10",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7493",
       "triggerID" : "8beabe2e1b04afe449610c44f4c376909b463e10",
       "triggerType" : "PUSH"
     }, {
       "hash" : "76ff23e27070dc54e6af85dd91c2742457621aa9",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7565",
       "triggerID" : "76ff23e27070dc54e6af85dd91c2742457621aa9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "315b69fafef254ff273bc36c713d28166260e4c6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8362",
       "triggerID" : "315b69fafef254ff273bc36c713d28166260e4c6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "05116d4768052821a37adfa725e60e40f4f71176",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8403",
       "triggerID" : "05116d4768052821a37adfa725e60e40f4f71176",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a9df8a1384eb6306656b4fd952edd4be5d7a857d UNKNOWN
   * 05116d4768052821a37adfa725e60e40f4f71176 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8403) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #13605: [FLINK-19599][table] Introduce Filesystem format factories to integrate new FileSource to table

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #13605:
URL: https://github.com/apache/flink/pull/13605#discussion_r513201092



##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java
##########
@@ -108,13 +121,39 @@ public boolean isBounded() {
 		};
 	}
 
-	private InputFormat<RowData, ?> getInputFormat() {
-		// When this table has no partition, just return a empty source.
-		if (!partitionKeys.isEmpty() && getOrFetchPartitions().isEmpty()) {
-			return new CollectionInputFormat<>(new ArrayList<>(), null);
+	private FileSource<RowData> createBulkFormatSource(
+			BulkDecodingFormat<RowData> decodingFormat, ScanContext scanContext) {
+		decodingFormat.applyLimit(pushedDownLimit());
+		decodingFormat.applyFilters(pushedDownFilters());
+		BulkFormat<RowData> bulkFormat = decodingFormat.createRuntimeDecoder(
+				scanContext, getProducedDataType());
+		FileSource.FileSourceBuilder<RowData> builder = FileSource
+				.forBulkFileFormat(bulkFormat, paths());
+		return builder.build();
+	}
+
+	private Path[] paths() {
+		if (partitionKeys.isEmpty()) {
+			return new Path[] {path};
+		} else {
+			return getOrFetchPartitions().stream()
+					.map(FileSystemTableSource.this::toFullLinkedPartSpec)
+					.map(PartitionPathUtils::generatePartitionPath)
+					.map(n -> new Path(path, n))
+					.toArray(Path[]::new);
 		}
+	}
+
+	private long pushedDownLimit() {
+		return limit == null ? Long.MAX_VALUE : limit;

Review comment:
       We can keep null value




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13605: [FLINK-19599][table] Introduce Filesystem format factories to integrate new FileSource to table

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13605:
URL: https://github.com/apache/flink/pull/13605#issuecomment-707521684


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a9df8a1384eb6306656b4fd952edd4be5d7a857d",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "a9df8a1384eb6306656b4fd952edd4be5d7a857d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8beabe2e1b04afe449610c44f4c376909b463e10",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7493",
       "triggerID" : "8beabe2e1b04afe449610c44f4c376909b463e10",
       "triggerType" : "PUSH"
     }, {
       "hash" : "76ff23e27070dc54e6af85dd91c2742457621aa9",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7565",
       "triggerID" : "76ff23e27070dc54e6af85dd91c2742457621aa9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "315b69fafef254ff273bc36c713d28166260e4c6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8362",
       "triggerID" : "315b69fafef254ff273bc36c713d28166260e4c6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "05116d4768052821a37adfa725e60e40f4f71176",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8403",
       "triggerID" : "05116d4768052821a37adfa725e60e40f4f71176",
       "triggerType" : "PUSH"
     }, {
       "hash" : "61fed9445fbd820423b73ef520d5f9ffc9e0606d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8475",
       "triggerID" : "61fed9445fbd820423b73ef520d5f9ffc9e0606d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fa0dc913f56aa2567c30073c044f688f9ed74fee",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8485",
       "triggerID" : "fa0dc913f56aa2567c30073c044f688f9ed74fee",
       "triggerType" : "PUSH"
     }, {
       "hash" : "96468e31f7614ef314c655f97a63fcabe83505a8",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8499",
       "triggerID" : "96468e31f7614ef314c655f97a63fcabe83505a8",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a9df8a1384eb6306656b4fd952edd4be5d7a857d UNKNOWN
   * fa0dc913f56aa2567c30073c044f688f9ed74fee Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8485) 
   * 96468e31f7614ef314c655f97a63fcabe83505a8 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8499) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wuchong commented on a change in pull request #13605: [FLINK-19599][table] Introduce Filesystem format factories to integrate new FileSource to table

Posted by GitBox <gi...@apache.org>.
wuchong commented on a change in pull request #13605:
URL: https://github.com/apache/flink/pull/13605#discussion_r513164938



##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/BulkFormatFactory.java
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.table.connector.format.BulkDecodingFormat;
+import org.apache.flink.table.data.RowData;
+
+/**
+ * Base interface for configuring a {@link BulkFormat} for file system connector.
+ *
+ * @see FactoryUtil#createTableFactoryHelper(DynamicTableFactory, DynamicTableFactory.Context)
+ */
+@Internal
+public interface BulkFormatFactory extends DecodingFormatFactory<BulkFormat<RowData>> {
+	// interface is used for discovery but is already fully specified by the generics

Review comment:
       Remove this?

##########
File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetFileSystemFormatFactory.java
##########
@@ -93,124 +117,17 @@ private static Configuration getParquetConfiguration(ReadableConfig options) {
 	}
 
 	@Override
-	public InputFormat<RowData, ?> createReader(ReaderContext context) {
-		return new ParquetInputFormat(
-				context.getPaths(),
-				context.getSchema().getFieldNames(),
-				context.getSchema().getFieldDataTypes(),
-				context.getProjectFields(),
-				context.getDefaultPartName(),
-				context.getPushedDownLimit(),
-				getParquetConfiguration(context.getFormatOptions()),
-				context.getFormatOptions().get(UTC_TIMEZONE));
+	public String factoryIdentifier() {
+		return "parquet";
 	}
 
 	@Override
-	public Optional<BulkWriter.Factory<RowData>> createBulkWriterFactory(WriterContext context) {
-		return Optional.of(ParquetRowDataBuilder.createWriterFactory(
-				RowType.of(Arrays.stream(context.getFormatFieldTypes())
-								.map(DataType::getLogicalType)
-								.toArray(LogicalType[]::new),
-						context.getFormatFieldNames()),
-				getParquetConfiguration(context.getFormatOptions()),
-				context.getFormatOptions().get(UTC_TIMEZONE)));
+	public Set<ConfigOption<?>> requiredOptions() {
+		return new HashSet<>();
 	}
 
 	@Override
-	public Optional<Encoder<RowData>> createEncoder(WriterContext context) {
-		return Optional.empty();
-	}
-
-	/**
-	 * An implementation of {@link ParquetInputFormat} to read {@link RowData} records
-	 * from Parquet files.
-	 */
-	public static class ParquetInputFormat extends FileInputFormat<RowData> {
-
-		private static final long serialVersionUID = 1L;
-
-		private final String[] fullFieldNames;
-		private final DataType[] fullFieldTypes;
-		private final int[] selectedFields;
-		private final String partDefaultName;
-		private final boolean utcTimestamp;
-		private final SerializableConfiguration conf;
-		private final long limit;
-
-		private transient ParquetColumnarRowSplitReader reader;
-		private transient long currentReadCount;
-
-		public ParquetInputFormat(
-				Path[] paths,
-				String[] fullFieldNames,
-				DataType[] fullFieldTypes,
-				int[] selectedFields,
-				String partDefaultName,
-				long limit,
-				Configuration conf,
-				boolean utcTimestamp) {
-			super.setFilePaths(paths);
-			this.limit = limit;
-			this.partDefaultName = partDefaultName;
-			this.fullFieldNames = fullFieldNames;
-			this.fullFieldTypes = fullFieldTypes;
-			this.selectedFields = selectedFields;
-			this.conf = new SerializableConfiguration(conf);
-			this.utcTimestamp = utcTimestamp;
-		}
-
-		@Override
-		public void open(FileInputSplit fileSplit) throws IOException {
-			// generate partition specs.
-			List<String> fieldNameList = Arrays.asList(fullFieldNames);
-			LinkedHashMap<String, String> partSpec = PartitionPathUtils.extractPartitionSpecFromPath(
-					fileSplit.getPath());
-			LinkedHashMap<String, Object> partObjects = new LinkedHashMap<>();
-			partSpec.forEach((k, v) -> partObjects.put(k, restorePartValueFromType(
-					partDefaultName.equals(v) ? null : v,
-					fullFieldTypes[fieldNameList.indexOf(k)])));
-
-			this.reader = ParquetSplitReaderUtil.genPartColumnarRowReader(
-					utcTimestamp,
-					true,
-					conf.conf(),
-					fullFieldNames,
-					fullFieldTypes,
-					partObjects,
-					selectedFields,
-					DEFAULT_SIZE,
-					new Path(fileSplit.getPath().toString()),
-					fileSplit.getStart(),
-					fileSplit.getLength());
-			this.currentReadCount = 0L;
-		}
-
-		@Override
-		public boolean supportsMultiPaths() {
-			return true;
-		}
-
-		@Override
-		public boolean reachedEnd() throws IOException {
-			if (currentReadCount >= limit) {
-				return true;
-			} else {
-				return reader.reachedEnd();
-			}
-		}
-
-		@Override
-		public RowData nextRecord(RowData reuse) {
-			currentReadCount++;
-			return reader.nextRecord();
-		}
-
-		@Override
-		public void close() throws IOException {
-			if (reader != null) {
-				this.reader.close();
-			}
-			this.reader = null;
-		}
+	public Set<ConfigOption<?>> optionalOptions() {
+		return new HashSet<>();

Review comment:
       Why `UTC_TIMEZONE` option is not in the set?

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/AbstractFileSystemTable.java
##########
@@ -53,15 +62,75 @@
 		context.getCatalogTable().getOptions().forEach(tableOptions::setString);
 		this.schema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
 		this.partitionKeys = context.getCatalogTable().getPartitionKeys();
-		this.path = new Path(context.getCatalogTable().getOptions().getOrDefault(PATH.key(), PATH.defaultValue()));
-		this.defaultPartName = context.getCatalogTable().getOptions().getOrDefault(
-				PARTITION_DEFAULT_NAME.key(), PARTITION_DEFAULT_NAME.defaultValue());
+		this.path = new Path(tableOptions.get(PATH));
+		this.defaultPartName = tableOptions.get(PARTITION_DEFAULT_NAME);
 	}
 
-	static FileSystemFormatFactory createFormatFactory(ReadableConfig tableOptions) {
+	ReadableConfig formatOptions(String identifier) {
+		return new DelegatingConfiguration(tableOptions, identifier + ".");
+	}
+
+	FileSystemFormatFactory createFormatFactory() {
 		return FactoryUtil.discoverFactory(
 				Thread.currentThread().getContextClassLoader(),
 				FileSystemFormatFactory.class,
 				tableOptions.get(FactoryUtil.FORMAT));
 	}
+
+	@SuppressWarnings("rawtypes")
+	<F extends EncodingFormatFactory<?>> Optional<EncodingFormat> discoverOptionalEncodingFormat(

Review comment:
       Why not reuse `FactoryUtil.TableFactoryHelper#discoverOptionalEncodingFormat`?

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java
##########
@@ -88,7 +92,16 @@ private FileSystemTableSource(
 	}
 
 	@Override
-	public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
+	public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
+		Optional<BulkDecodingFormat<RowData>> bulkDecodingFormat = discoverBulkDecodingFormat();
+
+		if (!partitionKeys.isEmpty() && getOrFetchPartitions().isEmpty()) {
+			// When this table has no partition, just return a empty source.
+			return InputFormatProvider.of(new CollectionInputFormat<>(new ArrayList<>(), null));
+		} else if (bulkDecodingFormat.isPresent()) {
+			return SourceProvider.of(createBulkFormatSource(bulkDecodingFormat.get(), scanContext));
+		}
+
 		return new DataStreamScanProvider() {
 			@Override
 			public DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv) {

Review comment:
       Why we should avoid using `ContinuousFileMonitoringFunction` here? and why not return `SourceFunctionProvider` of `InputFormatSourceFunction` directly?

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/BulkFormatFactory.java
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.table.connector.format.BulkDecodingFormat;
+import org.apache.flink.table.data.RowData;
+
+/**
+ * Base interface for configuring a {@link BulkFormat} for file system connector.
+ *
+ * @see FactoryUtil#createTableFactoryHelper(DynamicTableFactory, DynamicTableFactory.Context)
+ */
+@Internal
+public interface BulkFormatFactory extends DecodingFormatFactory<BulkFormat<RowData>> {

Review comment:
       How about naming this `BulkReaderFormatFactory` which is more align with `BulkWriterFormatFactory`.

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/EncoderFactory.java
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.Encoder;
+import org.apache.flink.table.data.RowData;
+
+/**
+ * Base interface for configuring a {@link Encoder} for file system connector.
+ *
+ * @see FactoryUtil#createTableFactoryHelper(DynamicTableFactory, DynamicTableFactory.Context)
+ */
+@Internal
+public interface EncoderFactory extends EncodingFormatFactory<Encoder<RowData>> {

Review comment:
       Do we really need this factory? It seems duplicate with the `SerializationSchema`. I'm afraid we will introduce a lot of duplicate codes if we can't reuse existing `SerializationFormatFactory`s. 

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/BulkWriterFactory.java
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.table.data.RowData;
+
+/**
+ * Base interface for configuring a {@link BulkWriter.Factory} for file system connector.
+ *
+ * @see FactoryUtil#createTableFactoryHelper(DynamicTableFactory, DynamicTableFactory.Context)
+ */
+@Internal
+public interface BulkWriterFactory extends EncodingFormatFactory<BulkWriter.Factory<RowData>> {

Review comment:
       How about naming this `BulkWriterFormatFactory` which is more align with `BulkReaderFormatFactory`.

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java
##########
@@ -199,15 +202,32 @@ private Path toStagingPath() {
 	}
 
 	@SuppressWarnings("unchecked")
-	private OutputFormatFactory<RowData> createOutputFormatFactory() {
-		Object writer = createWriter();
+	private OutputFormatFactory<RowData> createOutputFormatFactory(Context sinkContext) {
+		Object writer = createWriter(sinkContext);
 		return writer instanceof Encoder ?
 				path -> createEncoderOutputFormat((Encoder<RowData>) writer, path) :
 				path -> createBulkWriterOutputFormat((BulkWriter.Factory<RowData>) writer, path);
 	}
 
-	private Object createWriter() {
-		FileSystemFormatFactory formatFactory = createFormatFactory(tableOptions);
+	private DataType getFormatDataType() {
+		TableSchema.Builder builder = TableSchema.builder();
+		schema.getTableColumns().forEach(column -> {
+			if (!partitionKeys.contains(column.getName())) {
+				builder.add(column);
+			}
+		});
+		return builder.build().toRowDataType();
+	}
+
+	private Object createWriter(Context sinkContext) {
+		@SuppressWarnings("rawtypes")
+		Optional<EncodingFormat> encodingFormat = discoverOptionalEncodingFormat(BulkWriterFactory.class)
+				.map(Optional::of).orElseGet(() -> discoverOptionalEncodingFormat(EncoderFactory.class));
+		if (encodingFormat.isPresent()) {
+			return encodingFormat.get().createRuntimeEncoder(sinkContext, getFormatDataType());
+		}
+
+		FileSystemFormatFactory formatFactory = createFormatFactory();

Review comment:
       Why this is still needed? Do we need to migrate all the formats to use `EncodingFormatFactory` and `DecodingFormatFactory` before we can remove these code?  If yes, could you create an issue for that? 

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java
##########
@@ -108,13 +121,39 @@ public boolean isBounded() {
 		};
 	}
 
-	private InputFormat<RowData, ?> getInputFormat() {
-		// When this table has no partition, just return a empty source.
-		if (!partitionKeys.isEmpty() && getOrFetchPartitions().isEmpty()) {
-			return new CollectionInputFormat<>(new ArrayList<>(), null);
+	private FileSource<RowData> createBulkFormatSource(
+			BulkDecodingFormat<RowData> decodingFormat, ScanContext scanContext) {
+		decodingFormat.applyLimit(pushedDownLimit());
+		decodingFormat.applyFilters(pushedDownFilters());
+		BulkFormat<RowData> bulkFormat = decodingFormat.createRuntimeDecoder(
+				scanContext, getProducedDataType());
+		FileSource.FileSourceBuilder<RowData> builder = FileSource
+				.forBulkFileFormat(bulkFormat, paths());
+		return builder.build();
+	}
+
+	private Path[] paths() {
+		if (partitionKeys.isEmpty()) {
+			return new Path[] {path};
+		} else {
+			return getOrFetchPartitions().stream()
+					.map(FileSystemTableSource.this::toFullLinkedPartSpec)
+					.map(PartitionPathUtils::generatePartitionPath)
+					.map(n -> new Path(path, n))
+					.toArray(Path[]::new);
 		}
+	}
+
+	private long pushedDownLimit() {
+		return limit == null ? Long.MAX_VALUE : limit;

Review comment:
       I think `Long.MAX_VALUE` can't represent no limit, right?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #13605: [FLINK-19599][table] Introduce Filesystem format factories to integrate new FileSource to table

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #13605:
URL: https://github.com/apache/flink/pull/13605#discussion_r513207394



##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java
##########
@@ -88,7 +92,16 @@ private FileSystemTableSource(
 	}
 
 	@Override
-	public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
+	public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
+		Optional<BulkDecodingFormat<RowData>> bulkDecodingFormat = discoverBulkDecodingFormat();
+
+		if (!partitionKeys.isEmpty() && getOrFetchPartitions().isEmpty()) {
+			// When this table has no partition, just return a empty source.
+			return InputFormatProvider.of(new CollectionInputFormat<>(new ArrayList<>(), null));
+		} else if (bulkDecodingFormat.isPresent()) {
+			return SourceProvider.of(createBulkFormatSource(bulkDecodingFormat.get(), scanContext));
+		}
+
 		return new DataStreamScanProvider() {
 			@Override
 			public DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv) {

Review comment:
       Indeed, we can!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi edited a comment on pull request #13605: [FLINK-19599][table] Introduce Filesystem format factories to integrate new FileSource to table

Posted by GitBox <gi...@apache.org>.
JingsongLi edited a comment on pull request #13605:
URL: https://github.com/apache/flink/pull/13605#issuecomment-717723912


   Thanks a lot for your review! @wuchong 
   Try to wrap `SerializationSchema` and `DeserializationSchema` for csv and json, is a very interesting thing, we can finish it in next JIRA.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #13605: [FLINK-19599][table] Introduce Filesystem format factories to integrate new FileSource to table

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #13605:
URL: https://github.com/apache/flink/pull/13605#discussion_r513221615



##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/AbstractFileSystemTable.java
##########
@@ -53,15 +62,75 @@
 		context.getCatalogTable().getOptions().forEach(tableOptions::setString);
 		this.schema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
 		this.partitionKeys = context.getCatalogTable().getPartitionKeys();
-		this.path = new Path(context.getCatalogTable().getOptions().getOrDefault(PATH.key(), PATH.defaultValue()));
-		this.defaultPartName = context.getCatalogTable().getOptions().getOrDefault(
-				PARTITION_DEFAULT_NAME.key(), PARTITION_DEFAULT_NAME.defaultValue());
+		this.path = new Path(tableOptions.get(PATH));
+		this.defaultPartName = tableOptions.get(PARTITION_DEFAULT_NAME);
 	}
 
-	static FileSystemFormatFactory createFormatFactory(ReadableConfig tableOptions) {
+	ReadableConfig formatOptions(String identifier) {
+		return new DelegatingConfiguration(tableOptions, identifier + ".");
+	}
+
+	FileSystemFormatFactory createFormatFactory() {
 		return FactoryUtil.discoverFactory(
 				Thread.currentThread().getContextClassLoader(),
 				FileSystemFormatFactory.class,
 				tableOptions.get(FactoryUtil.FORMAT));
 	}
+
+	@SuppressWarnings("rawtypes")
+	<F extends EncodingFormatFactory<?>> Optional<EncodingFormat> discoverOptionalEncodingFormat(

Review comment:
       I will move these logic to `FileSystemTableFactory` and use `FactoryUtil` to create formats.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13605: [FLINK-19599][table] Introduce Filesystem format factories to integrate new FileSource to table

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13605:
URL: https://github.com/apache/flink/pull/13605#issuecomment-707521684


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a9df8a1384eb6306656b4fd952edd4be5d7a857d",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "a9df8a1384eb6306656b4fd952edd4be5d7a857d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8beabe2e1b04afe449610c44f4c376909b463e10",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7493",
       "triggerID" : "8beabe2e1b04afe449610c44f4c376909b463e10",
       "triggerType" : "PUSH"
     }, {
       "hash" : "76ff23e27070dc54e6af85dd91c2742457621aa9",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7565",
       "triggerID" : "76ff23e27070dc54e6af85dd91c2742457621aa9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "315b69fafef254ff273bc36c713d28166260e4c6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8362",
       "triggerID" : "315b69fafef254ff273bc36c713d28166260e4c6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "05116d4768052821a37adfa725e60e40f4f71176",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8403",
       "triggerID" : "05116d4768052821a37adfa725e60e40f4f71176",
       "triggerType" : "PUSH"
     }, {
       "hash" : "61fed9445fbd820423b73ef520d5f9ffc9e0606d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8475",
       "triggerID" : "61fed9445fbd820423b73ef520d5f9ffc9e0606d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fa0dc913f56aa2567c30073c044f688f9ed74fee",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8485",
       "triggerID" : "fa0dc913f56aa2567c30073c044f688f9ed74fee",
       "triggerType" : "PUSH"
     }, {
       "hash" : "96468e31f7614ef314c655f97a63fcabe83505a8",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "96468e31f7614ef314c655f97a63fcabe83505a8",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a9df8a1384eb6306656b4fd952edd4be5d7a857d UNKNOWN
   * fa0dc913f56aa2567c30073c044f688f9ed74fee Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8485) 
   * 96468e31f7614ef314c655f97a63fcabe83505a8 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #13605: [FLINK-19599][table] Introduce Filesystem format factories to integrate new FileSource to table

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #13605:
URL: https://github.com/apache/flink/pull/13605#discussion_r513199634



##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/EncoderFactory.java
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.Encoder;
+import org.apache.flink.table.data.RowData;
+
+/**
+ * Base interface for configuring a {@link Encoder} for file system connector.
+ *
+ * @see FactoryUtil#createTableFactoryHelper(DynamicTableFactory, DynamicTableFactory.Context)
+ */
+@Internal
+public interface EncoderFactory extends EncodingFormatFactory<Encoder<RowData>> {

Review comment:
       I think you are right, we can try to wrap `SerializationSchema` and `DeserializationSchema`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13605: [FLINK-19599][table] Introduce Filesystem format factories to integrate new FileSource to table

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13605:
URL: https://github.com/apache/flink/pull/13605#issuecomment-707521684


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a9df8a1384eb6306656b4fd952edd4be5d7a857d",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "a9df8a1384eb6306656b4fd952edd4be5d7a857d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8beabe2e1b04afe449610c44f4c376909b463e10",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7493",
       "triggerID" : "8beabe2e1b04afe449610c44f4c376909b463e10",
       "triggerType" : "PUSH"
     }, {
       "hash" : "76ff23e27070dc54e6af85dd91c2742457621aa9",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7565",
       "triggerID" : "76ff23e27070dc54e6af85dd91c2742457621aa9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "315b69fafef254ff273bc36c713d28166260e4c6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8362",
       "triggerID" : "315b69fafef254ff273bc36c713d28166260e4c6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "05116d4768052821a37adfa725e60e40f4f71176",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8403",
       "triggerID" : "05116d4768052821a37adfa725e60e40f4f71176",
       "triggerType" : "PUSH"
     }, {
       "hash" : "61fed9445fbd820423b73ef520d5f9ffc9e0606d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8475",
       "triggerID" : "61fed9445fbd820423b73ef520d5f9ffc9e0606d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fa0dc913f56aa2567c30073c044f688f9ed74fee",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8485",
       "triggerID" : "fa0dc913f56aa2567c30073c044f688f9ed74fee",
       "triggerType" : "PUSH"
     }, {
       "hash" : "96468e31f7614ef314c655f97a63fcabe83505a8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8499",
       "triggerID" : "96468e31f7614ef314c655f97a63fcabe83505a8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5ef88c8f400b6c2ef5f5ca6bbd750c17f27137c7",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8549",
       "triggerID" : "5ef88c8f400b6c2ef5f5ca6bbd750c17f27137c7",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a9df8a1384eb6306656b4fd952edd4be5d7a857d UNKNOWN
   * 5ef88c8f400b6c2ef5f5ca6bbd750c17f27137c7 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8549) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13605: [FLINK-19599][table] Introduce BulkFormatFactory&BulkWriterFactory to integrate new FileSource to table

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13605:
URL: https://github.com/apache/flink/pull/13605#issuecomment-707521684


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a9df8a1384eb6306656b4fd952edd4be5d7a857d",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "a9df8a1384eb6306656b4fd952edd4be5d7a857d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8beabe2e1b04afe449610c44f4c376909b463e10",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7493",
       "triggerID" : "8beabe2e1b04afe449610c44f4c376909b463e10",
       "triggerType" : "PUSH"
     }, {
       "hash" : "76ff23e27070dc54e6af85dd91c2742457621aa9",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "76ff23e27070dc54e6af85dd91c2742457621aa9",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a9df8a1384eb6306656b4fd952edd4be5d7a857d UNKNOWN
   * 8beabe2e1b04afe449610c44f4c376909b463e10 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7493) 
   * 76ff23e27070dc54e6af85dd91c2742457621aa9 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wuchong commented on a change in pull request #13605: [FLINK-19599][table] Introduce Filesystem format factories to integrate new FileSource to table

Posted by GitBox <gi...@apache.org>.
wuchong commented on a change in pull request #13605:
URL: https://github.com/apache/flink/pull/13605#discussion_r513164938



##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/BulkFormatFactory.java
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.table.connector.format.BulkDecodingFormat;
+import org.apache.flink.table.data.RowData;
+
+/**
+ * Base interface for configuring a {@link BulkFormat} for file system connector.
+ *
+ * @see FactoryUtil#createTableFactoryHelper(DynamicTableFactory, DynamicTableFactory.Context)
+ */
+@Internal
+public interface BulkFormatFactory extends DecodingFormatFactory<BulkFormat<RowData>> {
+	// interface is used for discovery but is already fully specified by the generics

Review comment:
       Remove this comment?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wuchong commented on a change in pull request #13605: [FLINK-19599][table] Introduce Filesystem format factories to integrate new FileSource to table

Posted by GitBox <gi...@apache.org>.
wuchong commented on a change in pull request #13605:
URL: https://github.com/apache/flink/pull/13605#discussion_r513205563



##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java
##########
@@ -88,7 +92,16 @@ private FileSystemTableSource(
 	}
 
 	@Override
-	public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
+	public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
+		Optional<BulkDecodingFormat<RowData>> bulkDecodingFormat = discoverBulkDecodingFormat();
+
+		if (!partitionKeys.isEmpty() && getOrFetchPartitions().isEmpty()) {
+			// When this table has no partition, just return a empty source.
+			return InputFormatProvider.of(new CollectionInputFormat<>(new ArrayList<>(), null));
+		} else if (bulkDecodingFormat.isPresent()) {
+			return SourceProvider.of(createBulkFormatSource(bulkDecodingFormat.get(), scanContext));
+		}
+
 		return new DataStreamScanProvider() {
 			@Override
 			public DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv) {

Review comment:
       Then can we return `SourceFunctionProvider.of(InputFormatSourceFunction)` instead of `DataStreamScanProvider` here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13605: [FLINK-19599][table] Introduce Filesystem format factories to integrate new FileSource to table

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13605:
URL: https://github.com/apache/flink/pull/13605#issuecomment-707521684


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a9df8a1384eb6306656b4fd952edd4be5d7a857d",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "a9df8a1384eb6306656b4fd952edd4be5d7a857d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8beabe2e1b04afe449610c44f4c376909b463e10",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7493",
       "triggerID" : "8beabe2e1b04afe449610c44f4c376909b463e10",
       "triggerType" : "PUSH"
     }, {
       "hash" : "76ff23e27070dc54e6af85dd91c2742457621aa9",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7565",
       "triggerID" : "76ff23e27070dc54e6af85dd91c2742457621aa9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "315b69fafef254ff273bc36c713d28166260e4c6",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8362",
       "triggerID" : "315b69fafef254ff273bc36c713d28166260e4c6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "05116d4768052821a37adfa725e60e40f4f71176",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8403",
       "triggerID" : "05116d4768052821a37adfa725e60e40f4f71176",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a9df8a1384eb6306656b4fd952edd4be5d7a857d UNKNOWN
   * 315b69fafef254ff273bc36c713d28166260e4c6 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8362) 
   * 05116d4768052821a37adfa725e60e40f4f71176 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8403) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13605: [FLINK-19599][table] Introduce Filesystem format factories to integrate new FileSource to table

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13605:
URL: https://github.com/apache/flink/pull/13605#issuecomment-707521684


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a9df8a1384eb6306656b4fd952edd4be5d7a857d",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "a9df8a1384eb6306656b4fd952edd4be5d7a857d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8beabe2e1b04afe449610c44f4c376909b463e10",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7493",
       "triggerID" : "8beabe2e1b04afe449610c44f4c376909b463e10",
       "triggerType" : "PUSH"
     }, {
       "hash" : "76ff23e27070dc54e6af85dd91c2742457621aa9",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7565",
       "triggerID" : "76ff23e27070dc54e6af85dd91c2742457621aa9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "315b69fafef254ff273bc36c713d28166260e4c6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8362",
       "triggerID" : "315b69fafef254ff273bc36c713d28166260e4c6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "05116d4768052821a37adfa725e60e40f4f71176",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8403",
       "triggerID" : "05116d4768052821a37adfa725e60e40f4f71176",
       "triggerType" : "PUSH"
     }, {
       "hash" : "61fed9445fbd820423b73ef520d5f9ffc9e0606d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8475",
       "triggerID" : "61fed9445fbd820423b73ef520d5f9ffc9e0606d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fa0dc913f56aa2567c30073c044f688f9ed74fee",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8485",
       "triggerID" : "fa0dc913f56aa2567c30073c044f688f9ed74fee",
       "triggerType" : "PUSH"
     }, {
       "hash" : "96468e31f7614ef314c655f97a63fcabe83505a8",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8499",
       "triggerID" : "96468e31f7614ef314c655f97a63fcabe83505a8",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a9df8a1384eb6306656b4fd952edd4be5d7a857d UNKNOWN
   * 96468e31f7614ef314c655f97a63fcabe83505a8 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8499) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on pull request #13605: [FLINK-19599][table] Introduce Filesystem format factories to integrate new FileSource to table

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on pull request #13605:
URL: https://github.com/apache/flink/pull/13605#issuecomment-717723912


   Thanks a lot for your review! @wuchong 
   Try to wrap `SerializationSchema` and `DeserializationSchema` for csv and json, is a very interesting thing.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13605: [FLINK-19599][table] Introduce Filesystem format factories to integrate new FileSource to table

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13605:
URL: https://github.com/apache/flink/pull/13605#issuecomment-707521684


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a9df8a1384eb6306656b4fd952edd4be5d7a857d",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "a9df8a1384eb6306656b4fd952edd4be5d7a857d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8beabe2e1b04afe449610c44f4c376909b463e10",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7493",
       "triggerID" : "8beabe2e1b04afe449610c44f4c376909b463e10",
       "triggerType" : "PUSH"
     }, {
       "hash" : "76ff23e27070dc54e6af85dd91c2742457621aa9",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7565",
       "triggerID" : "76ff23e27070dc54e6af85dd91c2742457621aa9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "315b69fafef254ff273bc36c713d28166260e4c6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8362",
       "triggerID" : "315b69fafef254ff273bc36c713d28166260e4c6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "05116d4768052821a37adfa725e60e40f4f71176",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8403",
       "triggerID" : "05116d4768052821a37adfa725e60e40f4f71176",
       "triggerType" : "PUSH"
     }, {
       "hash" : "61fed9445fbd820423b73ef520d5f9ffc9e0606d",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8475",
       "triggerID" : "61fed9445fbd820423b73ef520d5f9ffc9e0606d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fa0dc913f56aa2567c30073c044f688f9ed74fee",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "fa0dc913f56aa2567c30073c044f688f9ed74fee",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a9df8a1384eb6306656b4fd952edd4be5d7a857d UNKNOWN
   * 61fed9445fbd820423b73ef520d5f9ffc9e0606d Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8475) 
   * fa0dc913f56aa2567c30073c044f688f9ed74fee UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #13605: [FLINK-19599][table] Introduce Filesystem format factories to integrate new FileSource to table

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #13605:
URL: https://github.com/apache/flink/pull/13605#discussion_r513200486



##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/AbstractFileSystemTable.java
##########
@@ -53,15 +62,75 @@
 		context.getCatalogTable().getOptions().forEach(tableOptions::setString);
 		this.schema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
 		this.partitionKeys = context.getCatalogTable().getPartitionKeys();
-		this.path = new Path(context.getCatalogTable().getOptions().getOrDefault(PATH.key(), PATH.defaultValue()));
-		this.defaultPartName = context.getCatalogTable().getOptions().getOrDefault(
-				PARTITION_DEFAULT_NAME.key(), PARTITION_DEFAULT_NAME.defaultValue());
+		this.path = new Path(tableOptions.get(PATH));
+		this.defaultPartName = tableOptions.get(PARTITION_DEFAULT_NAME);
 	}
 
-	static FileSystemFormatFactory createFormatFactory(ReadableConfig tableOptions) {
+	ReadableConfig formatOptions(String identifier) {
+		return new DelegatingConfiguration(tableOptions, identifier + ".");
+	}
+
+	FileSystemFormatFactory createFormatFactory() {
 		return FactoryUtil.discoverFactory(
 				Thread.currentThread().getContextClassLoader(),
 				FileSystemFormatFactory.class,
 				tableOptions.get(FactoryUtil.FORMAT));
 	}
+
+	@SuppressWarnings("rawtypes")
+	<F extends EncodingFormatFactory<?>> Optional<EncodingFormat> discoverOptionalEncodingFormat(

Review comment:
       See below comments:
   ```
   	/**
   	 * Unlike {@link FactoryUtil#discoverFactory}, it will not throw an exception if it cannot
   	 * find the factory.
   	 */
   ```
   Do you think we should modify `FactoryUtil.TableFactoryHelper#discoverOptionalEncodingFormat`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13605: [FLINK-19599][table] Introduce BulkFormatFactory&BulkWriterFactory to integrate new FileSource to table

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13605:
URL: https://github.com/apache/flink/pull/13605#issuecomment-707521684


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a9df8a1384eb6306656b4fd952edd4be5d7a857d",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "a9df8a1384eb6306656b4fd952edd4be5d7a857d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8beabe2e1b04afe449610c44f4c376909b463e10",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7493",
       "triggerID" : "8beabe2e1b04afe449610c44f4c376909b463e10",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a9df8a1384eb6306656b4fd952edd4be5d7a857d UNKNOWN
   * 8beabe2e1b04afe449610c44f4c376909b463e10 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7493) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org