You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/03/03 09:32:57 UTC

[GitHub] [iceberg] openinx commented on a change in pull request #2229: Flink : refactor iceberg table source and sink use flink new interface

openinx commented on a change in pull request #2229:
URL: https://github.com/apache/iceberg/pull/2229#discussion_r586179283



##########
File path: flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java
##########
@@ -20,66 +20,68 @@
 package org.apache.iceberg.flink;
 
 import java.util.Map;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.sinks.AppendStreamTableSink;
-import org.apache.flink.table.sinks.OverwritableTableSink;
-import org.apache.flink.table.sinks.PartitionableTableSink;
-import org.apache.flink.table.sinks.TableSink;
-import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
+import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
+import org.apache.flink.types.RowKind;
 import org.apache.flink.util.Preconditions;
 import org.apache.iceberg.flink.sink.FlinkSink;
 
-public class IcebergTableSink implements AppendStreamTableSink<RowData>, OverwritableTableSink, PartitionableTableSink {
-  private final boolean isBounded;
+public class IcebergTableSink implements DynamicTableSink, SupportsPartitioning, SupportsOverwrite {
   private final TableLoader tableLoader;
   private final TableSchema tableSchema;
 
   private boolean overwrite = false;
 
-  public IcebergTableSink(boolean isBounded, TableLoader tableLoader, TableSchema tableSchema) {
-    this.isBounded = isBounded;
+  public IcebergTableSink(TableLoader tableLoader, TableSchema tableSchema) {
     this.tableLoader = tableLoader;
     this.tableSchema = tableSchema;
   }
 
   @Override
-  public DataStreamSink<?> consumeDataStream(DataStream<RowData> dataStream) {
-    Preconditions.checkState(!overwrite || isBounded, "Unbounded data stream doesn't support overwrite operation.");
+  public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+    Preconditions

Review comment:
       Nit: It's better to format this code like this: 
   
   ```java
       Preconditions.checkState(!overwrite || context.isBounded(),
           "Unbounded data stream doesn't support overwrite operation.");
   ```

##########
File path: flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java
##########
@@ -20,66 +20,68 @@
 package org.apache.iceberg.flink;
 
 import java.util.Map;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.sinks.AppendStreamTableSink;
-import org.apache.flink.table.sinks.OverwritableTableSink;
-import org.apache.flink.table.sinks.PartitionableTableSink;
-import org.apache.flink.table.sinks.TableSink;
-import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
+import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
+import org.apache.flink.types.RowKind;
 import org.apache.flink.util.Preconditions;
 import org.apache.iceberg.flink.sink.FlinkSink;
 
-public class IcebergTableSink implements AppendStreamTableSink<RowData>, OverwritableTableSink, PartitionableTableSink {
-  private final boolean isBounded;
+public class IcebergTableSink implements DynamicTableSink, SupportsPartitioning, SupportsOverwrite {
   private final TableLoader tableLoader;
   private final TableSchema tableSchema;
 
   private boolean overwrite = false;
 
-  public IcebergTableSink(boolean isBounded, TableLoader tableLoader, TableSchema tableSchema) {
-    this.isBounded = isBounded;
+  public IcebergTableSink(TableLoader tableLoader, TableSchema tableSchema) {
     this.tableLoader = tableLoader;
     this.tableSchema = tableSchema;
   }
 
   @Override
-  public DataStreamSink<?> consumeDataStream(DataStream<RowData> dataStream) {
-    Preconditions.checkState(!overwrite || isBounded, "Unbounded data stream doesn't support overwrite operation.");
+  public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+    Preconditions
+        .checkState(!overwrite || context.isBounded(), "Unbounded data stream doesn't support overwrite operation.");
 
-    return FlinkSink.forRowData(dataStream)
+    return (DataStreamSinkProvider) dataStream -> FlinkSink.forRowData(dataStream)
         .tableLoader(tableLoader)
         .tableSchema(tableSchema)
         .overwrite(overwrite)

Review comment:
       In you MySQL CDC -> iceberg test, did you provide the `equalityColumns` in the builder  ?   I think if make this whole workflow work , we will need the equalityColumns.  We might need a fully covered SQL unit tests to ensure the whole flow work in future PR. 

##########
File path: flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java
##########
@@ -20,66 +20,68 @@
 package org.apache.iceberg.flink;
 
 import java.util.Map;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.sinks.AppendStreamTableSink;
-import org.apache.flink.table.sinks.OverwritableTableSink;
-import org.apache.flink.table.sinks.PartitionableTableSink;
-import org.apache.flink.table.sinks.TableSink;
-import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
+import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
+import org.apache.flink.types.RowKind;
 import org.apache.flink.util.Preconditions;
 import org.apache.iceberg.flink.sink.FlinkSink;
 
-public class IcebergTableSink implements AppendStreamTableSink<RowData>, OverwritableTableSink, PartitionableTableSink {
-  private final boolean isBounded;
+public class IcebergTableSink implements DynamicTableSink, SupportsPartitioning, SupportsOverwrite {
   private final TableLoader tableLoader;
   private final TableSchema tableSchema;
 
   private boolean overwrite = false;
 
-  public IcebergTableSink(boolean isBounded, TableLoader tableLoader, TableSchema tableSchema) {
-    this.isBounded = isBounded;
+  public IcebergTableSink(TableLoader tableLoader, TableSchema tableSchema) {
     this.tableLoader = tableLoader;
     this.tableSchema = tableSchema;
   }
 
   @Override
-  public DataStreamSink<?> consumeDataStream(DataStream<RowData> dataStream) {
-    Preconditions.checkState(!overwrite || isBounded, "Unbounded data stream doesn't support overwrite operation.");
+  public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+    Preconditions
+        .checkState(!overwrite || context.isBounded(), "Unbounded data stream doesn't support overwrite operation.");
 
-    return FlinkSink.forRowData(dataStream)
+    return (DataStreamSinkProvider) dataStream -> FlinkSink.forRowData(dataStream)
         .tableLoader(tableLoader)
         .tableSchema(tableSchema)
         .overwrite(overwrite)
         .build();
   }
 
   @Override
-  public DataType getConsumedDataType() {
-    return tableSchema.toRowDataType().bridgedTo(RowData.class);
+  public void applyStaticPartition(Map<String, String> partition) {
+    // The flink's PartitionFanoutWriter will handle the static partition write policy automatically.
   }
 
   @Override
-  public TableSchema getTableSchema() {
-    return this.tableSchema;
+  public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+    return ChangelogMode.newBuilder()
+        .addContainedKind(RowKind.INSERT)
+        .addContainedKind(RowKind.UPDATE_BEFORE)
+        .addContainedKind(RowKind.UPDATE_AFTER)
+        .addContainedKind(RowKind.DELETE)
+        .build();
   }
 
   @Override
-  public TableSink<RowData> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
-    // This method has been deprecated and it will be removed in future version, so left the empty implementation here.
-    return this;
+  public DynamicTableSink copy() {
+    IcebergTableSink icebergTableSink = new IcebergTableSink(tableLoader, tableSchema);
+    icebergTableSink.overwrite = overwrite;
+    return icebergTableSink;
   }
 
   @Override
-  public void setOverwrite(boolean overwrite) {
-    this.overwrite = overwrite;
+  public String asSummaryString() {
+    return "iceberg table sink";

Review comment:
       Nit:  It's better to use "Iceberg table sink" here  ( to align with other flink table sink ).

##########
File path: flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSource.java
##########
@@ -40,14 +40,12 @@
   private static final String CATALOG_NAME = "test_catalog";
   private static final String DATABASE_NAME = "test_db";
   private static final String TABLE_NAME = "test_table";
-  private final String expectedFilterPushDownExplain = "FilterPushDown";

Review comment:
       Yeah,  that sounds reasonable.  the `lastScanEvent.filter().toString()` is enough to validate the pushed down filters. 

##########
File path: flink/src/main/java/org/apache/iceberg/flink/IcebergTableSource.java
##########
@@ -78,17 +77,11 @@ private IcebergTableSource(TableLoader loader, TableSchema schema, Map<String, S
   }
 
   @Override
-  public boolean isBounded() {
-    return FlinkSource.isBounded(properties);
+  public void applyProjection(int[][] projectFields) {

Review comment:
       As we don't implement nested projection in iceberg source,  here I think we could add a check here to ensure that the projections are not nested: 
   
   ```java
     @Override
     public void applyProjection(int[][] projectFields) {
       this.projectedFields = new int[projectFields.length];
       for (int i = 0; i < projectFields.length; i++) {
         Preconditions.checkArgument(projectFields[i].length == 0,
             "Don't support nested projection in iceberg source now.");
   
         this.projectedFields[i] = projectFields[i][0];
       }
     }
   
   ```

##########
File path: flink/src/main/java/org/apache/iceberg/flink/IcebergTableSource.java
##########
@@ -78,17 +77,11 @@ private IcebergTableSource(TableLoader loader, TableSchema schema, Map<String, S
   }
 
   @Override
-  public boolean isBounded() {
-    return FlinkSource.isBounded(properties);
+  public void applyProjection(int[][] projectFields) {
+    this.projectedFields = Arrays.stream(projectFields).mapToInt(value -> value[0]).toArray();
   }
 
-  @Override
-  public TableSource<RowData> projectFields(int[] fields) {
-    return new IcebergTableSource(loader, schema, properties, fields, isLimitPushDown, limit, filters, readableConfig);
-  }
-
-  @Override
-  public DataStream<RowData> getDataStream(StreamExecutionEnvironment execEnv) {
+  private DataStream<RowData> getDataStream(StreamExecutionEnvironment execEnv) {

Review comment:
       Nit:  It's better to rename it as `createDataStream` 

##########
File path: flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java
##########
@@ -20,66 +20,68 @@
 package org.apache.iceberg.flink;
 
 import java.util.Map;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.sinks.AppendStreamTableSink;
-import org.apache.flink.table.sinks.OverwritableTableSink;
-import org.apache.flink.table.sinks.PartitionableTableSink;
-import org.apache.flink.table.sinks.TableSink;
-import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
+import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
+import org.apache.flink.types.RowKind;
 import org.apache.flink.util.Preconditions;
 import org.apache.iceberg.flink.sink.FlinkSink;
 
-public class IcebergTableSink implements AppendStreamTableSink<RowData>, OverwritableTableSink, PartitionableTableSink {
-  private final boolean isBounded;
+public class IcebergTableSink implements DynamicTableSink, SupportsPartitioning, SupportsOverwrite {
   private final TableLoader tableLoader;
   private final TableSchema tableSchema;
 
   private boolean overwrite = false;
 
-  public IcebergTableSink(boolean isBounded, TableLoader tableLoader, TableSchema tableSchema) {
-    this.isBounded = isBounded;
+  public IcebergTableSink(TableLoader tableLoader, TableSchema tableSchema) {
     this.tableLoader = tableLoader;
     this.tableSchema = tableSchema;
   }
 
   @Override
-  public DataStreamSink<?> consumeDataStream(DataStream<RowData> dataStream) {
-    Preconditions.checkState(!overwrite || isBounded, "Unbounded data stream doesn't support overwrite operation.");
+  public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+    Preconditions
+        .checkState(!overwrite || context.isBounded(), "Unbounded data stream doesn't support overwrite operation.");
 
-    return FlinkSink.forRowData(dataStream)
+    return (DataStreamSinkProvider) dataStream -> FlinkSink.forRowData(dataStream)
         .tableLoader(tableLoader)
         .tableSchema(tableSchema)
         .overwrite(overwrite)
         .build();
   }
 
   @Override
-  public DataType getConsumedDataType() {
-    return tableSchema.toRowDataType().bridgedTo(RowData.class);
+  public void applyStaticPartition(Map<String, String> partition) {
+    // The flink's PartitionFanoutWriter will handle the static partition write policy automatically.
   }
 
   @Override
-  public TableSchema getTableSchema() {
-    return this.tableSchema;
+  public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+    return ChangelogMode.newBuilder()
+        .addContainedKind(RowKind.INSERT)
+        .addContainedKind(RowKind.UPDATE_BEFORE)
+        .addContainedKind(RowKind.UPDATE_AFTER)
+        .addContainedKind(RowKind.DELETE)
+        .build();
   }
 
   @Override
-  public TableSink<RowData> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
-    // This method has been deprecated and it will be removed in future version, so left the empty implementation here.
-    return this;
+  public DynamicTableSink copy() {
+    IcebergTableSink icebergTableSink = new IcebergTableSink(tableLoader, tableSchema);

Review comment:
       How about introducing a new IcebergTableSink constructor like the following ? 
   
   ```java
     private IcebergTableSink(IcebergTableSink toCopy) {
       this.tableLoader = toCopy.tableLoader;
       this.tableSchema = toCopy.tableSchema;
       this.overwrite = toCopy.overwrite;
     }
   ```
   
   Then, here we could just return the `new IcebergTableSink(this)` here.   We usually use this pattern in iceberg to copy an object , you could also see the  [StructCopy](https://github.com/apache/iceberg/blob/cde7ec33a075bba95583eb1a5d393880d141b04f/core/src/main/java/org/apache/iceberg/io/StructCopy.java#L28).

##########
File path: flink/src/main/java/org/apache/iceberg/flink/IcebergTableSource.java
##########
@@ -100,70 +93,73 @@ public boolean isBounded() {
         .build();
   }
 
-  @Override
-  public TableSchema getTableSchema() {
-    return schema;
-  }
-
-  @Override
-  public DataType getProducedDataType() {
-    return getProjectedSchema().toRowDataType().bridgedTo(RowData.class);
-  }
-
   private TableSchema getProjectedSchema() {
-    TableSchema fullSchema = getTableSchema();
     if (projectedFields == null) {
-      return fullSchema;
+      return schema;
     } else {
-      String[] fullNames = fullSchema.getFieldNames();
-      DataType[] fullTypes = fullSchema.getFieldDataTypes();
+      String[] fullNames = schema.getFieldNames();
+      DataType[] fullTypes = schema.getFieldDataTypes();
       return TableSchema.builder().fields(
           Arrays.stream(projectedFields).mapToObj(i -> fullNames[i]).toArray(String[]::new),
           Arrays.stream(projectedFields).mapToObj(i -> fullTypes[i]).toArray(DataType[]::new)).build();
     }
   }
 
   @Override
-  public String explainSource() {
-    String explain = "Iceberg table: " + loader.toString();
-    if (projectedFields != null) {
-      explain += ", ProjectedFields: " + Arrays.toString(projectedFields);
-    }
-
-    if (isLimitPushDown) {
-      explain += String.format(", LimitPushDown : %d", limit);
-    }
+  public void applyLimit(long newLimit) {
+    this.limit = newLimit;
+  }
 
-    if (isFilterPushedDown()) {
-      explain += String.format(", FilterPushDown: %s", COMMA.join(filters));
+  @Override
+  public Result applyFilters(List<ResolvedExpression> flinkFilters) {
+    List<ResolvedExpression> acceptedFilters = Lists.newArrayList();
+    List<Expression> expressions = Lists.newArrayList();
+
+    for (ResolvedExpression resolvedExpression : flinkFilters) {
+      Optional<Expression> icebergExpression = FlinkFilters.convert(resolvedExpression);
+      if (icebergExpression.isPresent()) {
+        expressions.add(icebergExpression.get());
+        acceptedFilters.add(resolvedExpression);
+      }
     }
 
-    return TableConnectorUtils.generateRuntimeName(getClass(), getTableSchema().getFieldNames()) + explain;
+    this.filters = expressions;
+    return Result.of(acceptedFilters, flinkFilters);
   }
 
   @Override
-  public boolean isLimitPushedDown() {
-    return isLimitPushDown;
+  public boolean supportsNestedProjection() {
+    return false;
   }
 
   @Override
-  public TableSource<RowData> applyLimit(long newLimit) {
-    return new IcebergTableSource(loader, schema, properties, projectedFields, true, newLimit, filters, readableConfig);
+  public ChangelogMode getChangelogMode() {
+    return ChangelogMode.insertOnly();
   }
 
   @Override
-  public TableSource<RowData> applyPredicate(List<Expression> predicates) {
-    List<org.apache.iceberg.expressions.Expression> expressions = Lists.newArrayList();
-    for (Expression predicate : predicates) {
-      FlinkFilters.convert(predicate).ifPresent(expressions::add);
-    }
+  public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
+    return new DataStreamScanProvider() {
+      @Override
+      public DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv) {
+        return getDataStream(execEnv);
+      }
+
+      @Override
+      public boolean isBounded() {
+        return FlinkSource.isBounded(properties);
+      }
+    };
+  }
 
-    return new IcebergTableSource(loader, schema, properties, projectedFields, isLimitPushDown, limit, expressions,
+  @Override
+  public DynamicTableSource copy() {
+    return new IcebergTableSource(loader, schema, properties, projectedFields, isLimitPushDown, limit, filters,

Review comment:
       Nit:  we could use the similar `IcebergTableSource(toCopy)` to copy the object.

##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java
##########
@@ -19,48 +19,53 @@
 
 package org.apache.iceberg.flink;
 
-import java.util.List;
-import java.util.Map;
+import java.util.Set;
+import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.ObjectPath;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.factories.TableSinkFactory;
-import org.apache.flink.table.factories.TableSourceFactory;
-import org.apache.flink.table.sinks.TableSink;
-import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
 import org.apache.flink.table.utils.TableSchemaUtils;
 
-public class FlinkTableFactory implements TableSinkFactory<RowData>, TableSourceFactory<RowData> {
+public class FlinkDynamicTableFactory implements DynamicTableSinkFactory, DynamicTableSourceFactory {
   private final FlinkCatalog catalog;
 
-  public FlinkTableFactory(FlinkCatalog catalog) {
+  public FlinkDynamicTableFactory(FlinkCatalog catalog) {
     this.catalog = catalog;
   }
 
   @Override
-  public TableSource<RowData> createTableSource(TableSourceFactory.Context context) {
+  public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context context) {

Review comment:
       Nit:  Here we could just use the `Context` to replace the `DynamicTableFactory.Context` ? 

##########
File path: flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSource.java
##########
@@ -56,6 +58,28 @@ public TestFlinkTableSource() {
     }, ScanEvent.class);
   }
 
+  @Override
+  protected TableEnvironment getTableEnv() {
+    if (tEnv == null) {
+      synchronized (this) {
+        if (tEnv == null) {
+          EnvironmentSettings settings = EnvironmentSettings
+              .newInstance()
+              .useBlinkPlanner()
+              .inBatchMode()
+              .build();
+
+          TableEnvironment env = TableEnvironment.create(settings);
+          env.getConfig().getConfiguration()
+              .set(FlinkTableOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM, false)
+              .set(CoreOptions.DEFAULT_PARALLELISM, 1);

Review comment:
       The default parallelism is 1, so why do we need to implement the getTableEnv with setting the DEFAULT_PARALLELISM to 1 again ?
   
   ```java
       public static final ConfigOption<Integer> DEFAULT_PARALLELISM =
               ConfigOptions.key("parallelism.default")
                       .defaultValue(1)
                       .withDescription("Default parallelism for jobs.");
   ```

##########
File path: flink/src/main/java/org/apache/iceberg/flink/IcebergTableSource.java
##########
@@ -100,70 +93,73 @@ public boolean isBounded() {
         .build();
   }
 
-  @Override
-  public TableSchema getTableSchema() {
-    return schema;
-  }
-
-  @Override
-  public DataType getProducedDataType() {
-    return getProjectedSchema().toRowDataType().bridgedTo(RowData.class);
-  }
-
   private TableSchema getProjectedSchema() {
-    TableSchema fullSchema = getTableSchema();
     if (projectedFields == null) {
-      return fullSchema;
+      return schema;
     } else {
-      String[] fullNames = fullSchema.getFieldNames();
-      DataType[] fullTypes = fullSchema.getFieldDataTypes();
+      String[] fullNames = schema.getFieldNames();
+      DataType[] fullTypes = schema.getFieldDataTypes();
       return TableSchema.builder().fields(
           Arrays.stream(projectedFields).mapToObj(i -> fullNames[i]).toArray(String[]::new),
           Arrays.stream(projectedFields).mapToObj(i -> fullTypes[i]).toArray(DataType[]::new)).build();
     }
   }
 
   @Override
-  public String explainSource() {
-    String explain = "Iceberg table: " + loader.toString();
-    if (projectedFields != null) {
-      explain += ", ProjectedFields: " + Arrays.toString(projectedFields);
-    }
-
-    if (isLimitPushDown) {
-      explain += String.format(", LimitPushDown : %d", limit);
-    }
+  public void applyLimit(long newLimit) {
+    this.limit = newLimit;
+  }
 
-    if (isFilterPushedDown()) {
-      explain += String.format(", FilterPushDown: %s", COMMA.join(filters));
+  @Override
+  public Result applyFilters(List<ResolvedExpression> flinkFilters) {
+    List<ResolvedExpression> acceptedFilters = Lists.newArrayList();
+    List<Expression> expressions = Lists.newArrayList();
+
+    for (ResolvedExpression resolvedExpression : flinkFilters) {
+      Optional<Expression> icebergExpression = FlinkFilters.convert(resolvedExpression);
+      if (icebergExpression.isPresent()) {
+        expressions.add(icebergExpression.get());
+        acceptedFilters.add(resolvedExpression);
+      }
     }
 
-    return TableConnectorUtils.generateRuntimeName(getClass(), getTableSchema().getFieldNames()) + explain;
+    this.filters = expressions;
+    return Result.of(acceptedFilters, flinkFilters);

Review comment:
       I read the  javadoc of [SupportsFilterPushDown](https://github.com/apache/flink/blob/97bfd049951f8d52a2e0aed14265074c4255ead0/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsFilterPushDown.java#L63),  the remanningFilters don't have to be the full list of origin `flinkFilters`,   it could be the list of filters that did  push down for iceberg source.
   
   Although it will be surely correct if pass the complete list as `remainingFilters`, but that seems will introduce extra resources to apply the `acceptedFilters` twice. 

##########
File path: flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java
##########
@@ -280,15 +280,15 @@ public void testRewriteLargeTableHasResiduals() throws IOException {
       Assert.assertEquals("Residuals must be ignored", Expressions.alwaysTrue(), task.residual());
     }
     List<DataFile> dataFiles = Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
-    Assert.assertEquals("Should have 2 data files before rewrite", 2, dataFiles.size());

Review comment:
       Where did we have change the default parallelism from 1 to 4 ?  I did not find the changes .

##########
File path: flink/src/main/java/org/apache/iceberg/flink/IcebergTableSource.java
##########
@@ -100,70 +93,73 @@ public boolean isBounded() {
         .build();
   }
 
-  @Override
-  public TableSchema getTableSchema() {
-    return schema;
-  }
-
-  @Override
-  public DataType getProducedDataType() {
-    return getProjectedSchema().toRowDataType().bridgedTo(RowData.class);
-  }
-
   private TableSchema getProjectedSchema() {
-    TableSchema fullSchema = getTableSchema();
     if (projectedFields == null) {
-      return fullSchema;
+      return schema;
     } else {
-      String[] fullNames = fullSchema.getFieldNames();
-      DataType[] fullTypes = fullSchema.getFieldDataTypes();
+      String[] fullNames = schema.getFieldNames();
+      DataType[] fullTypes = schema.getFieldDataTypes();
       return TableSchema.builder().fields(
           Arrays.stream(projectedFields).mapToObj(i -> fullNames[i]).toArray(String[]::new),
           Arrays.stream(projectedFields).mapToObj(i -> fullTypes[i]).toArray(DataType[]::new)).build();
     }
   }
 
   @Override
-  public String explainSource() {
-    String explain = "Iceberg table: " + loader.toString();
-    if (projectedFields != null) {
-      explain += ", ProjectedFields: " + Arrays.toString(projectedFields);
-    }
-
-    if (isLimitPushDown) {
-      explain += String.format(", LimitPushDown : %d", limit);
-    }
+  public void applyLimit(long newLimit) {
+    this.limit = newLimit;
+  }
 
-    if (isFilterPushedDown()) {
-      explain += String.format(", FilterPushDown: %s", COMMA.join(filters));
+  @Override
+  public Result applyFilters(List<ResolvedExpression> flinkFilters) {
+    List<ResolvedExpression> acceptedFilters = Lists.newArrayList();
+    List<Expression> expressions = Lists.newArrayList();
+
+    for (ResolvedExpression resolvedExpression : flinkFilters) {
+      Optional<Expression> icebergExpression = FlinkFilters.convert(resolvedExpression);
+      if (icebergExpression.isPresent()) {
+        expressions.add(icebergExpression.get());
+        acceptedFilters.add(resolvedExpression);
+      }
     }
 
-    return TableConnectorUtils.generateRuntimeName(getClass(), getTableSchema().getFieldNames()) + explain;
+    this.filters = expressions;
+    return Result.of(acceptedFilters, flinkFilters);
   }
 
   @Override
-  public boolean isLimitPushedDown() {
-    return isLimitPushDown;
+  public boolean supportsNestedProjection() {
+    return false;

Review comment:
       Could you pls add a  TODO indicate that  need to support nested projection in future PR,  actually we iceberg has the ability to support nested projection now.  We could add next PR to address this issue.

##########
File path: flink/src/main/java/org/apache/iceberg/flink/IcebergTableSource.java
##########
@@ -100,70 +93,73 @@ public boolean isBounded() {
         .build();
   }
 
-  @Override
-  public TableSchema getTableSchema() {
-    return schema;
-  }
-
-  @Override
-  public DataType getProducedDataType() {
-    return getProjectedSchema().toRowDataType().bridgedTo(RowData.class);
-  }
-
   private TableSchema getProjectedSchema() {
-    TableSchema fullSchema = getTableSchema();
     if (projectedFields == null) {
-      return fullSchema;
+      return schema;
     } else {
-      String[] fullNames = fullSchema.getFieldNames();
-      DataType[] fullTypes = fullSchema.getFieldDataTypes();
+      String[] fullNames = schema.getFieldNames();
+      DataType[] fullTypes = schema.getFieldDataTypes();
       return TableSchema.builder().fields(
           Arrays.stream(projectedFields).mapToObj(i -> fullNames[i]).toArray(String[]::new),
           Arrays.stream(projectedFields).mapToObj(i -> fullTypes[i]).toArray(DataType[]::new)).build();
     }
   }
 
   @Override
-  public String explainSource() {
-    String explain = "Iceberg table: " + loader.toString();
-    if (projectedFields != null) {
-      explain += ", ProjectedFields: " + Arrays.toString(projectedFields);
-    }
-
-    if (isLimitPushDown) {
-      explain += String.format(", LimitPushDown : %d", limit);
-    }
+  public void applyLimit(long newLimit) {
+    this.limit = newLimit;
+  }
 
-    if (isFilterPushedDown()) {
-      explain += String.format(", FilterPushDown: %s", COMMA.join(filters));
+  @Override
+  public Result applyFilters(List<ResolvedExpression> flinkFilters) {
+    List<ResolvedExpression> acceptedFilters = Lists.newArrayList();
+    List<Expression> expressions = Lists.newArrayList();
+
+    for (ResolvedExpression resolvedExpression : flinkFilters) {
+      Optional<Expression> icebergExpression = FlinkFilters.convert(resolvedExpression);
+      if (icebergExpression.isPresent()) {
+        expressions.add(icebergExpression.get());
+        acceptedFilters.add(resolvedExpression);
+      }
     }
 
-    return TableConnectorUtils.generateRuntimeName(getClass(), getTableSchema().getFieldNames()) + explain;
+    this.filters = expressions;
+    return Result.of(acceptedFilters, flinkFilters);
   }
 
   @Override
-  public boolean isLimitPushedDown() {
-    return isLimitPushDown;
+  public boolean supportsNestedProjection() {
+    return false;
   }
 
   @Override
-  public TableSource<RowData> applyLimit(long newLimit) {
-    return new IcebergTableSource(loader, schema, properties, projectedFields, true, newLimit, filters, readableConfig);
+  public ChangelogMode getChangelogMode() {
+    return ChangelogMode.insertOnly();
   }
 
   @Override
-  public TableSource<RowData> applyPredicate(List<Expression> predicates) {
-    List<org.apache.iceberg.expressions.Expression> expressions = Lists.newArrayList();
-    for (Expression predicate : predicates) {
-      FlinkFilters.convert(predicate).ifPresent(expressions::add);
-    }
+  public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
+    return new DataStreamScanProvider() {
+      @Override
+      public DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv) {
+        return getDataStream(execEnv);
+      }
+
+      @Override
+      public boolean isBounded() {
+        return FlinkSource.isBounded(properties);
+      }
+    };
+  }
 
-    return new IcebergTableSource(loader, schema, properties, projectedFields, isLimitPushDown, limit, expressions,
+  @Override
+  public DynamicTableSource copy() {
+    return new IcebergTableSource(loader, schema, properties, projectedFields, isLimitPushDown, limit, filters,
         readableConfig);
   }
 
   @Override
-  public boolean isFilterPushedDown() {
-    return this.filters != null && this.filters.size() > 0;
+  public String asSummaryString() {
+    return "iceberg table source";

Review comment:
       Nit:  use `Iceberg table source` ( to align with other existing flink table source ).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org