You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by fo...@apache.org on 2023/04/19 07:29:40 UTC

[iceberg] branch master updated: Flink: Apply row level filtering (#7109)

This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new c57952ecac Flink: Apply row level filtering (#7109)
c57952ecac is described below

commit c57952ecac0f55d978228df238916f0239160bb1
Author: Fokko Driesprong <fo...@apache.org>
AuthorDate: Wed Apr 19 09:29:33 2023 +0200

    Flink: Apply row level filtering (#7109)
    
    * Flink: Apply row level filtering
    
    * Fix the tests
    
    * Add test for case-sensitive
    
    * Reduce duplication using a private method
---
 .../apache/iceberg/data/GenericAppenderHelper.java |  7 ++++
 .../apache/iceberg/flink/FlinkSourceFilter.java    | 49 ++++++++++++++++++++++
 .../iceberg/flink/source/FlinkInputFormat.java     |  6 ++-
 .../apache/iceberg/flink/source/IcebergSource.java |  3 +-
 .../flink/source/RowDataFileScanTaskReader.java    | 23 +++++++++-
 .../iceberg/flink/source/RowDataRewriter.java      |  4 +-
 .../reader/AvroGenericRecordReaderFunction.java    | 10 +++--
 .../flink/source/reader/RowDataReaderFunction.java |  9 +++-
 .../apache/iceberg/flink/source/TestFlinkScan.java | 43 +++++++++++++++++--
 .../iceberg/flink/source/TestFlinkSource.java      |  9 +++-
 .../flink/source/TestIcebergSourceBounded.java     |  9 ++--
 .../TestIcebergSourceBoundedGenericRecord.java     |  3 +-
 .../iceberg/flink/source/reader/ReaderUtil.java    |  4 +-
 .../source/reader/TestIcebergSourceReader.java     |  4 +-
 .../source/reader/TestRowDataReaderFunction.java   |  4 +-
 15 files changed, 166 insertions(+), 21 deletions(-)

diff --git a/data/src/test/java/org/apache/iceberg/data/GenericAppenderHelper.java b/data/src/test/java/org/apache/iceberg/data/GenericAppenderHelper.java
index eb8aefb046..03e220f10e 100644
--- a/data/src/test/java/org/apache/iceberg/data/GenericAppenderHelper.java
+++ b/data/src/test/java/org/apache/iceberg/data/GenericAppenderHelper.java
@@ -91,6 +91,13 @@ public class GenericAppenderHelper {
     appendToTable(writeFile(partition, records));
   }
 
+  public DataFile writeFile(List<Record> records) throws IOException {
+    Preconditions.checkNotNull(table, "table not set");
+    File file = tmp.newFile();
+    Assert.assertTrue(file.delete());
+    return appendToLocalFile(table, file, fileFormat, null, records, conf);
+  }
+
   public DataFile writeFile(StructLike partition, List<Record> records) throws IOException {
     Preconditions.checkNotNull(table, "table not set");
     File file = tmp.newFile();
diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkSourceFilter.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkSourceFilter.java
new file mode 100644
index 0000000000..5fbd84909d
--- /dev/null
+++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkSourceFilter.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.expressions.Evaluator;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.types.Types;
+
+public class FlinkSourceFilter implements FilterFunction<RowData> {
+
+  private final RowType rowType;
+  private final Evaluator evaluator;
+  private final Types.StructType struct;
+  private volatile RowDataWrapper wrapper;
+
+  public FlinkSourceFilter(Schema schema, Expression expr, boolean caseSensitive) {
+    this.rowType = FlinkSchemaUtil.convert(schema);
+    this.struct = schema.asStruct();
+    this.evaluator = new Evaluator(struct, expr, caseSensitive);
+  }
+
+  @Override
+  public boolean filter(RowData value) {
+    if (wrapper == null) {
+      this.wrapper = new RowDataWrapper(rowType, struct);
+    }
+    return evaluator.eval(wrapper.wrap(value));
+  }
+}
diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java
index 56fdc61919..9a5123dc48 100644
--- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java
+++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java
@@ -69,7 +69,11 @@ public class FlinkInputFormat extends RichInputFormat<RowData, FlinkInputSplit>
     } else {
       this.rowDataReader =
           new RowDataFileScanTaskReader(
-              tableSchema, context.project(), context.nameMapping(), context.caseSensitive());
+              tableSchema,
+              context.project(),
+              context.nameMapping(),
+              context.caseSensitive(),
+              context.filters());
     }
   }
 
diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
index d75e79abf3..0675305e10 100644
--- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
+++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
@@ -449,7 +449,8 @@ public class IcebergSource<T> implements Source<T, IcebergSourceSplit, IcebergEn
                   context.nameMapping(),
                   context.caseSensitive(),
                   table.io(),
-                  table.encryption());
+                  table.encryption(),
+                  context.filters());
           this.readerFunction = (ReaderFunction<T>) rowDataReaderFunction;
         }
       }
diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java
index 5fada27d54..88364f4e87 100644
--- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java
+++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iceberg.flink.source;
 
+import java.util.List;
 import java.util.Map;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.data.RowData;
@@ -29,7 +30,10 @@ import org.apache.iceberg.StructLike;
 import org.apache.iceberg.avro.Avro;
 import org.apache.iceberg.data.DeleteFilter;
 import org.apache.iceberg.encryption.InputFilesDecryptor;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.FlinkSourceFilter;
 import org.apache.iceberg.flink.RowDataWrapper;
 import org.apache.iceberg.flink.data.FlinkAvroReader;
 import org.apache.iceberg.flink.data.FlinkOrcReader;
@@ -54,13 +58,27 @@ public class RowDataFileScanTaskReader implements FileScanTaskReader<RowData> {
   private final Schema projectedSchema;
   private final String nameMapping;
   private final boolean caseSensitive;
+  private final FlinkSourceFilter rowFilter;
 
   public RowDataFileScanTaskReader(
-      Schema tableSchema, Schema projectedSchema, String nameMapping, boolean caseSensitive) {
+      Schema tableSchema,
+      Schema projectedSchema,
+      String nameMapping,
+      boolean caseSensitive,
+      List<Expression> filters) {
     this.tableSchema = tableSchema;
     this.projectedSchema = projectedSchema;
     this.nameMapping = nameMapping;
     this.caseSensitive = caseSensitive;
+
+    if (filters != null && !filters.isEmpty()) {
+      Expression combinedExpression =
+          filters.stream().reduce(Expressions.alwaysTrue(), Expressions::and);
+      this.rowFilter =
+          new FlinkSourceFilter(this.projectedSchema, combinedExpression, this.caseSensitive);
+    } else {
+      this.rowFilter = null;
+    }
   }
 
   @Override
@@ -120,6 +138,9 @@ public class RowDataFileScanTaskReader implements FileScanTaskReader<RowData> {
       }
     }
 
+    if (rowFilter != null) {
+      return CloseableIterable.filter(iter, rowFilter::filter);
+    }
     return iter;
   }
 
diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java
index 23665b7c9f..c958604c00 100644
--- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java
+++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.flink.source;
 import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING;
 
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 import org.apache.flink.api.common.functions.RichMapFunction;
@@ -125,7 +126,8 @@ public class RowDataRewriter {
       this.encryptionManager = encryptionManager;
       this.taskWriterFactory = taskWriterFactory;
       this.rowDataReader =
-          new RowDataFileScanTaskReader(schema, schema, nameMapping, caseSensitive);
+          new RowDataFileScanTaskReader(
+              schema, schema, nameMapping, caseSensitive, Collections.emptyList());
     }
 
     @Override
diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/reader/AvroGenericRecordReaderFunction.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/reader/AvroGenericRecordReaderFunction.java
index 66c0e0ff23..66e59633ff 100644
--- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/reader/AvroGenericRecordReaderFunction.java
+++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/reader/AvroGenericRecordReaderFunction.java
@@ -18,12 +18,14 @@
  */
 package org.apache.iceberg.flink.source.reader;
 
+import java.util.List;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.flink.source.AvroGenericRecordFileScanTaskReader;
 import org.apache.iceberg.flink.source.DataIterator;
 import org.apache.iceberg.flink.source.RowDataFileScanTaskReader;
@@ -54,7 +56,8 @@ public class AvroGenericRecordReaderFunction extends DataIteratorReaderFunction<
         null,
         false,
         table.io(),
-        table.encryption());
+        table.encryption(),
+        null);
   }
 
   public AvroGenericRecordReaderFunction(
@@ -65,14 +68,15 @@ public class AvroGenericRecordReaderFunction extends DataIteratorReaderFunction<
       String nameMapping,
       boolean caseSensitive,
       FileIO io,
-      EncryptionManager encryption) {
+      EncryptionManager encryption,
+      List<Expression> filters) {
     super(new ListDataIteratorBatcher<>(config));
     this.tableName = tableName;
     this.readSchema = readSchema(tableSchema, projectedSchema);
     this.io = io;
     this.encryption = encryption;
     this.rowDataReader =
-        new RowDataFileScanTaskReader(tableSchema, readSchema, nameMapping, caseSensitive);
+        new RowDataFileScanTaskReader(tableSchema, readSchema, nameMapping, caseSensitive, filters);
   }
 
   @Override
diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataReaderFunction.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataReaderFunction.java
index c747375d2a..5d0a00954e 100644
--- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataReaderFunction.java
+++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataReaderFunction.java
@@ -18,10 +18,12 @@
  */
 package org.apache.iceberg.flink.source.reader;
 
+import java.util.List;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.data.RowData;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.flink.FlinkSchemaUtil;
 import org.apache.iceberg.flink.source.DataIterator;
 import org.apache.iceberg.flink.source.RowDataFileScanTaskReader;
@@ -36,6 +38,7 @@ public class RowDataReaderFunction extends DataIteratorReaderFunction<RowData> {
   private final boolean caseSensitive;
   private final FileIO io;
   private final EncryptionManager encryption;
+  private final List<Expression> filters;
 
   public RowDataReaderFunction(
       ReadableConfig config,
@@ -44,7 +47,8 @@ public class RowDataReaderFunction extends DataIteratorReaderFunction<RowData> {
       String nameMapping,
       boolean caseSensitive,
       FileIO io,
-      EncryptionManager encryption) {
+      EncryptionManager encryption,
+      List<Expression> filters) {
     super(
         new ArrayPoolDataIteratorBatcher<>(
             config,
@@ -56,12 +60,13 @@ public class RowDataReaderFunction extends DataIteratorReaderFunction<RowData> {
     this.caseSensitive = caseSensitive;
     this.io = io;
     this.encryption = encryption;
+    this.filters = filters;
   }
 
   @Override
   public DataIterator<RowData> createDataIterator(IcebergSourceSplit split) {
     return new DataIterator<>(
-        new RowDataFileScanTaskReader(tableSchema, readSchema, nameMapping, caseSensitive),
+        new RowDataFileScanTaskReader(tableSchema, readSchema, nameMapping, caseSensitive, filters),
         split.task(),
         io,
         encryption);
diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java
index a6cdc212b7..aa5b51eddf 100644
--- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java
+++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java
@@ -88,7 +88,12 @@ public abstract class TestFlinkScan {
 
   protected abstract List<Row> runWithProjection(String... projected) throws Exception;
 
-  protected abstract List<Row> runWithFilter(Expression filter, String sqlFilter) throws Exception;
+  protected abstract List<Row> runWithFilter(
+      Expression filter, String sqlFilter, boolean caseSensitive) throws Exception;
+
+  protected List<Row> runWithFilter(Expression filter, String sqlFilter) throws Exception {
+    return runWithFilter(filter, sqlFilter, true);
+  }
 
   protected abstract List<Row> runWithOptions(Map<String, String> options) throws Exception;
 
@@ -409,7 +414,7 @@ public abstract class TestFlinkScan {
   }
 
   @Test
-  public void testFilterExp() throws Exception {
+  public void testFilterExpPartition() throws Exception {
     Table table =
         catalogResource
             .catalog()
@@ -428,11 +433,43 @@ public abstract class TestFlinkScan {
             RandomGenericData.generate(TestFixtures.SCHEMA, 2, 0L));
     helper.appendToTable(dataFile1, dataFile2);
     TestHelpers.assertRecords(
-        runWithFilter(Expressions.equal("dt", "2020-03-20"), "where dt='2020-03-20'"),
+        runWithFilter(Expressions.equal("dt", "2020-03-20"), "where dt='2020-03-20'", true),
         expectedRecords,
         TestFixtures.SCHEMA);
   }
 
+  private void testFilterExp(Expression filter, String sqlFilter, boolean caseSensitive)
+      throws Exception {
+    Table table =
+        catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA);
+
+    List<Record> expectedRecords = RandomGenericData.generate(TestFixtures.SCHEMA, 3, 0L);
+    expectedRecords.get(0).set(0, "a");
+    expectedRecords.get(1).set(0, "b");
+    expectedRecords.get(2).set(0, "c");
+
+    GenericAppenderHelper helper = new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER);
+    DataFile dataFile = helper.writeFile(expectedRecords);
+    helper.appendToTable(dataFile);
+
+    List<Row> actual =
+        runWithFilter(Expressions.greaterThanOrEqual("data", "b"), "where data>='b'", true);
+
+    TestHelpers.assertRecords(actual, expectedRecords.subList(1, 3), TestFixtures.SCHEMA);
+  }
+
+  @Test
+  public void testFilterExp() throws Exception {
+    testFilterExp(Expressions.greaterThanOrEqual("data", "b"), "where data>='b'", true);
+  }
+
+  @Test
+  public void testFilterExpCaseInsensitive() throws Exception {
+    // sqlFilter does not support case-insensitive filtering:
+    // https://issues.apache.org/jira/browse/FLINK-16175
+    testFilterExp(Expressions.greaterThanOrEqual("DATA", "b"), "where data>='b'", false);
+  }
+
   @Test
   public void testPartitionTypes() throws Exception {
     Schema typesSchema =
diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSource.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSource.java
index cebce61c08..2b55bee6e5 100644
--- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSource.java
+++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSource.java
@@ -51,15 +51,20 @@ public abstract class TestFlinkSource extends TestFlinkScan {
   }
 
   @Override
-  protected List<Row> runWithFilter(Expression filter, String sqlFilter) throws Exception {
+  protected List<Row> runWithFilter(Expression filter, String sqlFilter, boolean caseSensitive)
+      throws Exception {
     FlinkSource.Builder builder =
         FlinkSource.forRowData().filters(Collections.singletonList(filter));
-    return run(builder, Maps.newHashMap(), sqlFilter, "*");
+    Map<String, String> options = Maps.newHashMap();
+    options.put("case-sensitive", Boolean.toString(caseSensitive));
+    return run(builder, options, sqlFilter, "*");
   }
 
   @Override
   protected List<Row> runWithOptions(Map<String, String> options) throws Exception {
     FlinkSource.Builder builder = FlinkSource.forRowData();
+    Optional.ofNullable(options.get("case-sensitive"))
+        .ifPresent(value -> builder.caseSensitive(Boolean.getBoolean(value)));
     Optional.ofNullable(options.get("snapshot-id"))
         .ifPresent(value -> builder.snapshotId(Long.parseLong(value)));
     Optional.ofNullable(options.get("tag")).ifPresent(value -> builder.tag(value));
diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBounded.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBounded.java
index 477d121316..a80f87d648 100644
--- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBounded.java
+++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBounded.java
@@ -18,7 +18,7 @@
  */
 package org.apache.iceberg.flink.source;
 
-import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
@@ -68,8 +68,11 @@ public class TestIcebergSourceBounded extends TestFlinkScan {
   }
 
   @Override
-  protected List<Row> runWithFilter(Expression filter, String sqlFilter) throws Exception {
-    return run(null, Arrays.asList(filter), Maps.newHashMap(), sqlFilter, "*");
+  protected List<Row> runWithFilter(Expression filter, String sqlFilter, boolean caseSensitive)
+      throws Exception {
+    Map<String, String> options = Maps.newHashMap();
+    options.put("case-sensitive", Boolean.toString(caseSensitive));
+    return run(null, Collections.singletonList(filter), options, sqlFilter, "*");
   }
 
   @Override
diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedGenericRecord.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedGenericRecord.java
index 4ffd92acee..0337f35970 100644
--- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedGenericRecord.java
+++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedGenericRecord.java
@@ -163,7 +163,8 @@ public class TestIcebergSourceBoundedGenericRecord {
             null,
             false,
             table.io(),
-            table.encryption());
+            table.encryption(),
+            filters);
 
     IcebergSource.Builder<GenericRecord> sourceBuilder =
         IcebergSource.<GenericRecord>builder()
diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderUtil.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderUtil.java
index f2e89428a9..f9ceaf8422 100644
--- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderUtil.java
+++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderUtil.java
@@ -20,6 +20,7 @@ package org.apache.iceberg.flink.source.reader;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
 import org.apache.flink.table.data.RowData;
 import org.apache.iceberg.BaseCombinedScanTask;
@@ -83,7 +84,8 @@ public class ReaderUtil {
 
   public static DataIterator<RowData> createDataIterator(CombinedScanTask combinedTask) {
     return new DataIterator<>(
-        new RowDataFileScanTaskReader(TestFixtures.SCHEMA, TestFixtures.SCHEMA, null, true),
+        new RowDataFileScanTaskReader(
+            TestFixtures.SCHEMA, TestFixtures.SCHEMA, null, true, Collections.emptyList()),
         combinedTask,
         new HadoopFileIO(new org.apache.hadoop.conf.Configuration()),
         new PlaintextEncryptionManager());
diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java
index ddc144be88..56af0caf12 100644
--- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java
+++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java
@@ -19,6 +19,7 @@
 package org.apache.iceberg.flink.source.reader;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import org.apache.flink.api.connector.source.SourceReaderContext;
 import org.apache.flink.configuration.Configuration;
@@ -106,7 +107,8 @@ public class TestIcebergSourceReader {
             null,
             true,
             new HadoopFileIO(new org.apache.hadoop.conf.Configuration()),
-            new PlaintextEncryptionManager());
+            new PlaintextEncryptionManager(),
+            Collections.emptyList());
     return new IcebergSourceReader<>(readerMetrics, readerFunction, readerContext);
   }
 }
diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestRowDataReaderFunction.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestRowDataReaderFunction.java
index aee271a3a7..d063ad7f4a 100644
--- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestRowDataReaderFunction.java
+++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestRowDataReaderFunction.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iceberg.flink.source.reader;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 import org.apache.flink.configuration.Configuration;
@@ -55,7 +56,8 @@ public class TestRowDataReaderFunction extends ReaderFunctionTestBase<RowData> {
         null,
         true,
         new HadoopFileIO(new org.apache.hadoop.conf.Configuration()),
-        new PlaintextEncryptionManager());
+        new PlaintextEncryptionManager(),
+        Collections.emptyList());
   }
 
   @Override