You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2022/01/25 08:27:38 UTC

[flink] 03/09: [FLINK-25391][connector-files] Forward catalog table options

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c9260311637ad47a6e67f154c629ddd49d9f262a
Author: slinkydeveloper <fr...@gmail.com>
AuthorDate: Thu Jan 6 14:52:50 2022 +0100

    [FLINK-25391][connector-files] Forward catalog table options
---
 docs/content/docs/connectors/table/filesystem.md   | 80 +++++++++++++++++-----
 .../file/table/AbstractFileSystemTable.java        | 24 +++----
 .../file/table/FileSystemTableFactory.java         | 29 +++++++-
 .../connector/file/table/FileSystemTableSink.java  | 21 ++++--
 .../file/table/FileSystemTableSource.java          | 25 ++++---
 5 files changed, 131 insertions(+), 48 deletions(-)

diff --git a/docs/content/docs/connectors/table/filesystem.md b/docs/content/docs/connectors/table/filesystem.md
index 24dfa11..e2c0aeb 100644
--- a/docs/content/docs/connectors/table/filesystem.md
+++ b/docs/content/docs/connectors/table/filesystem.md
@@ -208,21 +208,27 @@ a timeout that specifies the maximum duration for which a file can be open.
 <table class="table table-bordered">
   <thead>
     <tr>
-        <th class="text-left" style="width: 20%">Key</th>
-        <th class="text-left" style="width: 15%">Default</th>
+        <th class="text-left" style="width: 25%">Option</th>
+        <th class="text-left" style="width: 8%">Required</th>
+        <th class="text-left" style="width: 8%">Forwarded</th>
+        <th class="text-left" style="width: 7%">Default</th>
         <th class="text-left" style="width: 10%">Type</th>
-        <th class="text-left" style="width: 55%">Description</th>
+        <th class="text-left" style="width: 42%">Description</th>
     </tr>
   </thead>
   <tbody>
     <tr>
         <td><h5>sink.rolling-policy.file-size</h5></td>
+        <td>optional</td>
+        <td>yes</td>
         <td style="word-wrap: break-word;">128MB</td>
         <td>MemorySize</td>
         <td>The maximum part file size before rolling.</td>
     </tr>
     <tr>
         <td><h5>sink.rolling-policy.rollover-interval</h5></td>
+        <td>optional</td>
+        <td>yes</td>
         <td style="word-wrap: break-word;">30 min</td>
         <td>Duration</td>
         <td>The maximum time duration a part file can stay open before rolling (by default 30 min to avoid to many small files).
@@ -230,6 +236,8 @@ a timeout that specifies the maximum duration for which a file can be open.
     </tr>
     <tr>
         <td><h5>sink.rolling-policy.check-interval</h5></td>
+        <td>optional</td>
+        <td>yes</td>
         <td style="word-wrap: break-word;">1 min</td>
         <td>Duration</td>
         <td>The interval for checking time based rolling policies. This controls the frequency to check whether a part file should rollover based on 'sink.rolling-policy.rollover-interval'.</td>
@@ -250,21 +258,27 @@ The file sink supports file compactions, which allows applications to have small
 <table class="table table-bordered">
   <thead>
     <tr>
-        <th class="text-left" style="width: 20%">Key</th>
-        <th class="text-left" style="width: 15%">Default</th>
+        <th class="text-left" style="width: 25%">Option</th>
+        <th class="text-left" style="width: 8%">Required</th>
+        <th class="text-left" style="width: 8%">Forwarded</th>
+        <th class="text-left" style="width: 7%">Default</th>
         <th class="text-left" style="width: 10%">Type</th>
-        <th class="text-left" style="width: 55%">Description</th>
+        <th class="text-left" style="width: 42%">Description</th>
     </tr>
   </thead>
   <tbody>
     <tr>
         <td><h5>auto-compaction</h5></td>
+        <td>optional</td>
+        <td>no</td>
         <td style="word-wrap: break-word;">false</td>
         <td>Boolean</td>
         <td>Whether to enable automatic compaction in streaming sink or not. The data will be written to temporary files. After the checkpoint is completed, the temporary files generated by a checkpoint will be compacted. The temporary files are invisible before compaction.</td>
     </tr>
     <tr>
         <td><h5>compaction.file-size</h5></td>
+        <td>optional</td>
+        <td>yes</td>
         <td style="word-wrap: break-word;">(none)</td>
         <td>MemorySize</td>
         <td>The compaction target file size, the default value is the rolling file size.</td>
@@ -294,27 +308,35 @@ To define when to commit a partition, providing partition commit trigger:
 <table class="table table-bordered">
   <thead>
     <tr>
-        <th class="text-left" style="width: 20%">Key</th>
-        <th class="text-left" style="width: 15%">Default</th>
+        <th class="text-left" style="width: 25%">Option</th>
+        <th class="text-left" style="width: 8%">Required</th>
+        <th class="text-left" style="width: 8%">Forwarded</th>
+        <th class="text-left" style="width: 7%">Default</th>
         <th class="text-left" style="width: 10%">Type</th>
-        <th class="text-left" style="width: 55%">Description</th>
+        <th class="text-left" style="width: 42%">Description</th>
     </tr>
   </thead>
   <tbody>
     <tr>
         <td><h5>sink.partition-commit.trigger</h5></td>
+        <td>optional</td>
+        <td>yes</td>
         <td style="word-wrap: break-word;">process-time</td>
         <td>String</td>
         <td>Trigger type for partition commit: 'process-time': based on the time of the machine, it neither requires partition time extraction nor watermark generation. Commit partition once the 'current system time' passes 'partition creation system time' plus 'delay'. 'partition-time': based on the time that extracted from partition values, it requires watermark generation. Commit partition once the 'watermark' passes 'time extracted from partition values' plus 'delay'.</td>
     </tr>
     <tr>
         <td><h5>sink.partition-commit.delay</h5></td>
+        <td>optional</td>
+        <td>yes</td>
         <td style="word-wrap: break-word;">0 s</td>
         <td>Duration</td>
         <td>The partition will not commit until the delay time. If it is a daily partition, should be '1 d', if it is a hourly partition, should be '1 h'.</td>
     </tr>
     <tr>
         <td><h5>sink.partition-commit.watermark-time-zone</h5></td>
+        <td>optional</td>
+        <td>yes</td>
         <td style="word-wrap: break-word;">UTC</td>
         <td>String</td>
         <td>The time zone to parse the long watermark value to TIMESTAMP value, the parsed watermark timestamp is used to compare with partition time to decide the partition should commit or not. This option is only take effect when `sink.partition-commit.trigger` is set to 'partition-time'. If this option is not configured correctly, e.g. source rowtime is defined on TIMESTAMP_LTZ column, but this config is not configured, then users may see the partition committed after a few hours. Th [...]
@@ -356,33 +378,43 @@ Time extractors define extracting time from partition values.
 <table class="table table-bordered">
   <thead>
     <tr>
-        <th class="text-left" style="width: 20%">Key</th>
-        <th class="text-left" style="width: 15%">Default</th>
+        <th class="text-left" style="width: 25%">Option</th>
+        <th class="text-left" style="width: 8%">Required</th>
+        <th class="text-left" style="width: 8%">Forwarded</th>
+        <th class="text-left" style="width: 7%">Default</th>
         <th class="text-left" style="width: 10%">Type</th>
-        <th class="text-left" style="width: 55%">Description</th>
+        <th class="text-left" style="width: 42%">Description</th>
     </tr>
   </thead>
   <tbody>
     <tr>
         <td><h5>partition.time-extractor.kind</h5></td>
+        <td>optional</td>
+        <td>no</td>
         <td style="word-wrap: break-word;">default</td>
         <td>String</td>
         <td>Time extractor to extract time from partition values. Support default and custom. For default, can configure timestamp pattern\formatter. For custom, should configure extractor class.</td>
     </tr>
     <tr>
         <td><h5>partition.time-extractor.class</h5></td>
+        <td>optional</td>
+        <td>no</td>
         <td style="word-wrap: break-word;">(none)</td>
         <td>String</td>
         <td>The extractor class for implement PartitionTimeExtractor interface.</td>
     </tr>
     <tr>
         <td><h5>partition.time-extractor.timestamp-pattern</h5></td>
+        <td>optional</td>
+        <td>no</td>
         <td style="word-wrap: break-word;">(none)</td>
         <td>String</td>
         <td>The 'default' construction way allows users to use partition fields to get a legal timestamp pattern. Default support 'yyyy-MM-dd hh:mm:ss' from first field. If timestamp should be extracted from a single partition field 'dt', can configure: '$dt'. If timestamp should be extracted from multiple partition fields, say 'year', 'month', 'day' and 'hour', can configure: '$year-$month-$day $hour:00:00'. If timestamp should be extracted from two partition fields 'dt' and 'hour', can [...]
     </tr>
     <tr>
         <td><h5>partition.time-extractor.timestamp-formatter</h5></td>
+        <td>optional</td>
+        <td>no</td>
         <td style="word-wrap: break-word;">yyyy-MM-dd&nbsp;HH:mm:ss</td>
         <td>String</td>
         <td>The formatter that formats the partition timestamp string value to timestamp, the partition timestamp string value is expressed by 'partition.time-extractor.timestamp-pattern'. For example, the partition timestamp is extracted from multiple partition fields, say 'year', 'month' and 'day', you can configure 'partition.time-extractor.timestamp-pattern' to '$year$month$day', and configure `partition.time-extractor.timestamp-formatter` to 'yyyyMMdd'. By default the formatter is ' [...]
@@ -417,27 +449,35 @@ The partition commit policy defines what action is taken when partitions are com
 <table class="table table-bordered">
   <thead>
     <tr>
-        <th class="text-left" style="width: 20%">Key</th>
-        <th class="text-left" style="width: 15%">Default</th>
+        <th class="text-left" style="width: 25%">Option</th>
+        <th class="text-left" style="width: 8%">Required</th>
+        <th class="text-left" style="width: 8%">Forwarded</th>
+        <th class="text-left" style="width: 7%">Default</th>
         <th class="text-left" style="width: 10%">Type</th>
-        <th class="text-left" style="width: 55%">Description</th>
+        <th class="text-left" style="width: 42%">Description</th>
     </tr>
   </thead>
   <tbody>
     <tr>
         <td><h5>sink.partition-commit.policy.kind</h5></td>
+        <td>optional</td>
+        <td>yes</td>
         <td style="word-wrap: break-word;">(none)</td>
         <td>String</td>
         <td>Policy to commit a partition is to notify the downstream application that the partition has finished writing, the partition is ready to be read. metastore: add partition to metastore. Only hive table supports metastore policy, file system manages partitions through directory structure. success-file: add '_success' file to directory. Both can be configured at the same time: 'metastore,success-file'. custom: use policy class to create a commit policy. Support to configure multi [...]
     </tr>
     <tr>
         <td><h5>sink.partition-commit.policy.class</h5></td>
+        <td>optional</td>
+        <td>yes</td>
         <td style="word-wrap: break-word;">(none)</td>
         <td>String</td>
         <td>The partition commit policy class for implement PartitionCommitPolicy interface. Only work in custom commit policy.</td>
     </tr>
     <tr>
         <td><h5>sink.partition-commit.success-file.name</h5></td>
+        <td>optional</td>
+        <td>yes</td>
         <td style="word-wrap: break-word;">_SUCCESS</td>
         <td>String</td>
         <td>The file name for success-file partition commit policy, default is '_SUCCESS'.</td>
@@ -482,15 +522,19 @@ The parallelism of writing files into external file system (including Hive) can
 <table class="table table-bordered">
   <thead>
     <tr>
-        <th class="text-left" style="width: 20%">Key</th>
-        <th class="text-left" style="width: 15%">Default</th>
+        <th class="text-left" style="width: 25%">Option</th>
+        <th class="text-left" style="width: 8%">Required</th>
+        <th class="text-left" style="width: 8%">Forwarded</th>
+        <th class="text-left" style="width: 7%">Default</th>
         <th class="text-left" style="width: 10%">Type</th>
-        <th class="text-left" style="width: 55%">Description</th>
+        <th class="text-left" style="width: 42%">Description</th>
     </tr>
   </thead>
   <tbody>
     <tr>
         <td><h5>sink.parallelism</h5></td>
+        <td>optional</td>
+        <td>no</td>
         <td style="word-wrap: break-word;">(none)</td>
         <td>Integer</td>
         <td>Parallelism of writing files into external file system. The value should greater than zero otherwise exception will be thrown.</td>
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/AbstractFileSystemTable.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/AbstractFileSystemTable.java
index f78bee6..265c04a 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/AbstractFileSystemTable.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/AbstractFileSystemTable.java
@@ -25,7 +25,6 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ResolvedSchema;
-import org.apache.flink.table.factories.DynamicTableFactory;
 import org.apache.flink.table.types.DataType;
 
 import java.util.List;
@@ -34,7 +33,6 @@ import java.util.stream.Collectors;
 /** Abstract File system table for providing some common methods. */
 abstract class AbstractFileSystemTable {
 
-    final DynamicTableFactory.Context context;
     final ObjectIdentifier tableIdentifier;
     final Configuration tableOptions;
     final ResolvedSchema schema;
@@ -43,16 +41,18 @@ abstract class AbstractFileSystemTable {
 
     List<String> partitionKeys;
 
-    AbstractFileSystemTable(DynamicTableFactory.Context context) {
-        this.context = context;
-        this.tableIdentifier = context.getObjectIdentifier();
-        this.tableOptions = new Configuration();
-        context.getCatalogTable().getOptions().forEach(tableOptions::setString);
-        this.schema = context.getCatalogTable().getResolvedSchema();
-        this.path = new Path(tableOptions.get(FileSystemConnectorOptions.PATH));
-        this.defaultPartName = tableOptions.get(FileSystemConnectorOptions.PARTITION_DEFAULT_NAME);
-
-        this.partitionKeys = context.getCatalogTable().getPartitionKeys();
+    AbstractFileSystemTable(
+            ObjectIdentifier tableIdentifier,
+            ResolvedSchema schema,
+            List<String> partitionKeys,
+            ReadableConfig tableOptions) {
+        this.tableIdentifier = tableIdentifier;
+        this.tableOptions = (Configuration) tableOptions;
+        this.schema = schema;
+        this.path = new Path(this.tableOptions.get(FileSystemConnectorOptions.PATH));
+        this.defaultPartName =
+                this.tableOptions.get(FileSystemConnectorOptions.PARTITION_DEFAULT_NAME);
+        this.partitionKeys = partitionKeys;
     }
 
     ReadableConfig formatOptions(String identifier) {
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableFactory.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableFactory.java
index 952c522..86a4ed3 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableFactory.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableFactory.java
@@ -45,6 +45,7 @@ import java.util.List;
 import java.util.ServiceLoader;
 import java.util.Set;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static java.time.ZoneId.SHORT_IDS;
 
@@ -70,7 +71,10 @@ public class FileSystemTableFactory implements DynamicTableSourceFactory, Dynami
         FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
         validate(helper);
         return new FileSystemTableSource(
-                context,
+                context.getObjectIdentifier(),
+                context.getCatalogTable().getResolvedSchema(),
+                context.getCatalogTable().getPartitionKeys(),
+                helper.getOptions(),
                 discoverDecodingFormat(context, BulkReaderFormatFactory.class),
                 discoverDecodingFormat(context, DeserializationFormatFactory.class),
                 discoverFormatFactory(context));
@@ -81,7 +85,10 @@ public class FileSystemTableFactory implements DynamicTableSourceFactory, Dynami
         FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
         validate(helper);
         return new FileSystemTableSink(
-                context,
+                context.getObjectIdentifier(),
+                context.getCatalogTable().getResolvedSchema(),
+                context.getCatalogTable().getPartitionKeys(),
+                helper.getOptions(),
                 discoverDecodingFormat(context, BulkReaderFormatFactory.class),
                 discoverDecodingFormat(context, DeserializationFormatFactory.class),
                 discoverFormatFactory(context),
@@ -122,6 +129,24 @@ public class FileSystemTableFactory implements DynamicTableSourceFactory, Dynami
         return options;
     }
 
+    @Override
+    public Set<ConfigOption<?>> forwardOptions() {
+        return Stream.of(
+                        FileSystemConnectorOptions.PATH,
+                        FileSystemConnectorOptions.PARTITION_DEFAULT_NAME,
+                        FileSystemConnectorOptions.SINK_ROLLING_POLICY_FILE_SIZE,
+                        FileSystemConnectorOptions.SINK_ROLLING_POLICY_ROLLOVER_INTERVAL,
+                        FileSystemConnectorOptions.SINK_ROLLING_POLICY_CHECK_INTERVAL,
+                        FileSystemConnectorOptions.SINK_PARTITION_COMMIT_TRIGGER,
+                        FileSystemConnectorOptions.SINK_PARTITION_COMMIT_DELAY,
+                        FileSystemConnectorOptions.SINK_PARTITION_COMMIT_WATERMARK_TIME_ZONE,
+                        FileSystemConnectorOptions.SINK_PARTITION_COMMIT_POLICY_KIND,
+                        FileSystemConnectorOptions.SINK_PARTITION_COMMIT_POLICY_CLASS,
+                        FileSystemConnectorOptions.SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME,
+                        FileSystemConnectorOptions.COMPACTION_FILE_SIZE)
+                .collect(Collectors.toSet());
+    }
+
     private void validate(FactoryUtil.TableFactoryHelper helper) {
         // Except format options, some formats like parquet and orc can not list all supported
         // options.
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java
index 09705cd..6708b60 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java
@@ -55,6 +55,8 @@ import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.format.DecodingFormat;
 import org.apache.flink.table.connector.format.EncodingFormat;
@@ -65,7 +67,6 @@ import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.expressions.ResolvedExpression;
-import org.apache.flink.table.factories.DynamicTableFactory;
 import org.apache.flink.table.factories.FactoryUtil;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
@@ -117,20 +118,22 @@ public class FileSystemTableSink extends AbstractFileSystemTable
     @Nullable private Integer configuredParallelism;
 
     FileSystemTableSink(
-            DynamicTableFactory.Context context,
+            ObjectIdentifier tableIdentifier,
+            ResolvedSchema schema,
+            List<String> partitionKeys,
+            ReadableConfig tableOptions,
             @Nullable DecodingFormat<BulkFormat<RowData, FileSourceSplit>> bulkReaderFormat,
             @Nullable DecodingFormat<DeserializationSchema<RowData>> deserializationFormat,
             @Nullable FileSystemFormatFactory formatFactory,
             @Nullable EncodingFormat<BulkWriter.Factory<RowData>> bulkWriterFormat,
             @Nullable EncodingFormat<SerializationSchema<RowData>> serializationFormat) {
-        super(context);
+        super(tableIdentifier, schema, partitionKeys, tableOptions);
         this.bulkReaderFormat = bulkReaderFormat;
         this.deserializationFormat = deserializationFormat;
         this.formatFactory = formatFactory;
         if (Stream.of(bulkWriterFormat, serializationFormat, formatFactory)
                 .allMatch(Objects::isNull)) {
-            Configuration options = Configuration.fromMap(context.getCatalogTable().getOptions());
-            String identifier = options.get(FactoryUtil.FORMAT);
+            String identifier = tableOptions.get(FactoryUtil.FORMAT);
             throw new ValidationException(
                     String.format(
                             "Could not find any format factory for identifier '%s' in the classpath.",
@@ -138,7 +141,8 @@ public class FileSystemTableSink extends AbstractFileSystemTable
         }
         this.bulkWriterFormat = bulkWriterFormat;
         this.serializationFormat = serializationFormat;
-        this.configuredParallelism = tableOptions.get(FileSystemConnectorOptions.SINK_PARALLELISM);
+        this.configuredParallelism =
+                this.tableOptions.get(FileSystemConnectorOptions.SINK_PARALLELISM);
     }
 
     @Override
@@ -545,7 +549,10 @@ public class FileSystemTableSink extends AbstractFileSystemTable
     public DynamicTableSink copy() {
         FileSystemTableSink sink =
                 new FileSystemTableSink(
-                        context,
+                        tableIdentifier,
+                        schema,
+                        partitionKeys,
+                        tableOptions,
                         bulkReaderFormat,
                         deserializationFormat,
                         formatFactory,
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java
index 5287f6f..f8cb703 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java
@@ -22,7 +22,6 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.java.io.CollectionInputFormat;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.connector.file.src.FileSource;
 import org.apache.flink.connector.file.src.FileSourceSplit;
@@ -35,6 +34,8 @@ import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.Projection;
 import org.apache.flink.table.connector.format.DecodingFormat;
@@ -52,7 +53,6 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.expressions.ResolvedExpression;
-import org.apache.flink.table.factories.DynamicTableFactory;
 import org.apache.flink.table.factories.FactoryUtil;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.utils.PartitionPathUtils;
@@ -97,15 +97,17 @@ public class FileSystemTableSource extends AbstractFileSystemTable
     private DataType producedDataType;
 
     public FileSystemTableSource(
-            DynamicTableFactory.Context context,
+            ObjectIdentifier tableIdentifier,
+            ResolvedSchema schema,
+            List<String> partitionKeys,
+            ReadableConfig tableOptions,
             @Nullable DecodingFormat<BulkFormat<RowData, FileSourceSplit>> bulkReaderFormat,
             @Nullable DecodingFormat<DeserializationSchema<RowData>> deserializationFormat,
             @Nullable FileSystemFormatFactory formatFactory) {
-        super(context);
+        super(tableIdentifier, schema, partitionKeys, tableOptions);
         if (Stream.of(bulkReaderFormat, deserializationFormat, formatFactory)
                 .allMatch(Objects::isNull)) {
-            Configuration options = Configuration.fromMap(context.getCatalogTable().getOptions());
-            String identifier = options.get(FactoryUtil.FORMAT);
+            String identifier = tableOptions.get(FactoryUtil.FORMAT);
             throw new ValidationException(
                     String.format(
                             "Could not find any format factory for identifier '%s' in the classpath.",
@@ -114,8 +116,7 @@ public class FileSystemTableSource extends AbstractFileSystemTable
         this.bulkReaderFormat = bulkReaderFormat;
         this.deserializationFormat = deserializationFormat;
         this.formatFactory = formatFactory;
-
-        this.producedDataType = context.getPhysicalRowDataType();
+        this.producedDataType = schema.toPhysicalRowDataType();
     }
 
     @Override
@@ -410,7 +411,13 @@ public class FileSystemTableSource extends AbstractFileSystemTable
     public FileSystemTableSource copy() {
         FileSystemTableSource source =
                 new FileSystemTableSource(
-                        context, bulkReaderFormat, deserializationFormat, formatFactory);
+                        tableIdentifier,
+                        schema,
+                        partitionKeys,
+                        tableOptions,
+                        bulkReaderFormat,
+                        deserializationFormat,
+                        formatFactory);
         source.partitionKeys = partitionKeys;
         source.remainingPartitions = remainingPartitions;
         source.filters = filters;