You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/02/09 23:02:43 UTC

[GitHub] [iceberg] stevenzwu commented on a change in pull request #2229: Flink : refactor iceberg table source and sink use flink new interface

stevenzwu commented on a change in pull request #2229:
URL: https://github.com/apache/iceberg/pull/2229#discussion_r573310539



##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkTableFactory.java
##########
@@ -19,51 +19,57 @@
 
 package org.apache.iceberg.flink;
 
-import java.util.List;
-import java.util.Map;
+import java.util.Set;
+import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.ObjectPath;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.factories.TableSinkFactory;
-import org.apache.flink.table.factories.TableSourceFactory;
-import org.apache.flink.table.sinks.TableSink;
-import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
 import org.apache.flink.table.utils.TableSchemaUtils;
 
-public class FlinkTableFactory implements TableSinkFactory<RowData>, TableSourceFactory<RowData> {
+public class FlinkTableFactory implements DynamicTableSinkFactory, DynamicTableSourceFactory {
+  public static final String IDENTIFIER = "iceberg";
   private final FlinkCatalog catalog;
 
   public FlinkTableFactory(FlinkCatalog catalog) {
     this.catalog = catalog;
   }
 
   @Override
-  public TableSource<RowData> createTableSource(TableSourceFactory.Context context) {
+  public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context context) {
     ObjectPath objectPath = context.getObjectIdentifier().toObjectPath();
     TableLoader tableLoader = createTableLoader(objectPath);
-    TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema(context.getTable().getSchema());
-    return new IcebergTableSource(tableLoader, tableSchema, context.getTable().getOptions(),
+    TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
+    return new IcebergTableSource(tableLoader, tableSchema, context.getCatalogTable().getOptions(),
         context.getConfiguration());
   }
 
   @Override
-  public TableSink<RowData> createTableSink(TableSinkFactory.Context context) {
+  public DynamicTableSink createDynamicTableSink(Context context) {
     ObjectPath objectPath = context.getObjectIdentifier().toObjectPath();
     TableLoader tableLoader = createTableLoader(objectPath);
-    TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema(context.getTable().getSchema());
-    return new IcebergTableSink(context.isBounded(), tableLoader, tableSchema);
+    TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
+    return new IcebergTableSink(tableLoader, tableSchema);
   }
 
   @Override
-  public Map<String, String> requiredContext() {
+  public Set<ConfigOption<?>> requiredOptions() {

Review comment:
       why throw exception here? if there are no required or optional configs, we should just return empty sets, right?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org