You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "Fokko (via GitHub)" <gi...@apache.org> on 2023/03/14 22:29:26 UTC

[GitHub] [iceberg] Fokko opened a new pull request, #7109: Flink: Apply row level filtering

Fokko opened a new pull request, #7109:
URL: https://github.com/apache/iceberg/pull/7109

   For Flink, we apply partition pruning, filtering based on metrics, and row-group skipping, but no row-level filtering.
   
   See the issue for more details
   
   Resolves #7022
   
   @stevenzwu would you have time to take a peek at this one?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on pull request #7109: Flink: Apply row level filtering

Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko commented on PR #7109:
URL: https://github.com/apache/iceberg/pull/7109#issuecomment-1505284184

   @stevenzwu thanks again for the review, could you do another pass?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] advancedxy commented on a diff in pull request #7109: Flink: Apply row level filtering

Posted by "advancedxy (via GitHub)" <gi...@apache.org>.
advancedxy commented on code in PR #7109:
URL: https://github.com/apache/iceberg/pull/7109#discussion_r1284027818


##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java:
##########
@@ -120,6 +138,9 @@ private CloseableIterable<RowData> newIterable(
       }
     }
 
+    if (rowFilter != null) {
+      return CloseableIterable.filter(iter, rowFilter::filter);
+    }

Review Comment:
   @Fokko @stevenzwu We have an internal request to add row filter dynamically, current impl requires the filter to be supplied at job startup time.  After some investigation, I believe maybe we don't have to passing the filters all the way done to this class.. The task itself already has the row filter `task.residual()`. We could simply convert it to rowFilter here, such as:
   
   ```
   //    if (rowFilter != null) {
   //      return CloseableIterable.filter(iter, rowFilter::filter);
   //    }
       if (task.residual() != null && !task.residual().isEquivalentTo(Expressions.alwaysTrue())) {
         FlinkSourceFilter dataFilter =
                 new FlinkSourceFilter(this.projectedSchema, task.residual(), this.caseSensitive);
         return CloseableIterable.filter(iter, dataFilter::filter);
       }
   ```
   
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on a diff in pull request #7109: Flink: Apply row level filtering

Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko commented on code in PR #7109:
URL: https://github.com/apache/iceberg/pull/7109#discussion_r1163879630


##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingReaderOperator.java:
##########
@@ -109,6 +111,49 @@ public void testProcessAllRecords() throws Exception {
     }
   }
 
+  @Test
+  public void testProcessAllRecordsFilter() throws Exception {

Review Comment:
   Let's remove this one then



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on pull request #7109: Flink: Apply row level filtering

Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko commented on PR #7109:
URL: https://github.com/apache/iceberg/pull/7109#issuecomment-1514261196

   Thanks @stevenzwu and @doki23 for the review!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #7109: Flink: Apply row level filtering

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on code in PR #7109:
URL: https://github.com/apache/iceberg/pull/7109#discussion_r1170607215


##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java:
##########
@@ -428,11 +433,47 @@ public void testFilterExp() throws Exception {
             RandomGenericData.generate(TestFixtures.SCHEMA, 2, 0L));
     helper.appendToTable(dataFile1, dataFile2);
     TestHelpers.assertRecords(
-        runWithFilter(Expressions.equal("dt", "2020-03-20"), "where dt='2020-03-20'"),
+        runWithFilter(Expressions.equal("dt", "2020-03-20"), "where dt='2020-03-20'", true),
         expectedRecords,
         TestFixtures.SCHEMA);
   }
 
+  @Test
+  public void testFilterExp() throws Exception {
+    Table table =
+        catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA);
+
+    List<Record> expectedRecords = RandomGenericData.generate(TestFixtures.SCHEMA, 3, 0L);
+    expectedRecords.get(0).set(0, "a");
+    expectedRecords.get(1).set(0, "b");
+    expectedRecords.get(2).set(0, "c");
+
+    GenericAppenderHelper helper = new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER);
+    DataFile dataFile = helper.writeFile(expectedRecords);
+    helper.appendToTable(dataFile);
+    List<Row> actual =
+        runWithFilter(Expressions.greaterThanOrEqual("data", "b"), "where data>='b'", true);
+    TestHelpers.assertRecords(actual, expectedRecords.subList(1, 3), TestFixtures.SCHEMA);
+  }
+
+  @Test
+  public void testFilterExpCaseInsensitive() throws Exception {

Review Comment:
   nit: for now, maybe extract a private method to avoid test duplications. with Junit 5, this can be handled better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on a diff in pull request #7109: Flink: Apply row level filtering

Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko commented on code in PR #7109:
URL: https://github.com/apache/iceberg/pull/7109#discussion_r1136295164


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkSourceFilter.java:
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.expressions.Evaluator;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.types.Types;
+
+public class FlinkSourceFilter implements FilterFunction<RowData> {
+
+  private final Schema schema;
+  private final Expression expr;
+  private final boolean caseSensitive;
+
+  private volatile RowDataWrapper wrapper;
+  private volatile Evaluator evaluator;
+
+  public FlinkSourceFilter(Schema schema, Expression expr, boolean caseSensitive) {
+    this.schema = schema;
+    this.expr = expr;
+    this.caseSensitive = caseSensitive;
+  }
+
+  @Override
+  public boolean filter(RowData value) throws Exception {
+    if (this.wrapper == null) {
+      RowType rowType = FlinkSchemaUtil.convert(schema);
+      Types.StructType struct = schema.asStruct();
+      this.wrapper = new RowDataWrapper(rowType, struct);

Review Comment:
   The wrapper is not serializable, therefore I decided to construct it when we need it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] doki23 commented on a diff in pull request #7109: Flink: Apply row level filtering

Posted by "doki23 (via GitHub)" <gi...@apache.org>.
doki23 commented on code in PR #7109:
URL: https://github.com/apache/iceberg/pull/7109#discussion_r1140949052


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkSourceFilter.java:
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.expressions.Evaluator;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.types.Types;
+
+public class FlinkSourceFilter implements FilterFunction<RowData> {
+
+  private final Schema schema;
+  private final Expression expr;
+  private final boolean caseSensitive;
+
+  private volatile RowDataWrapper wrapper;
+  private volatile Evaluator evaluator;
+
+  public FlinkSourceFilter(Schema schema, Expression expr, boolean caseSensitive) {
+    this.schema = schema;
+    this.expr = expr;
+    this.caseSensitive = caseSensitive;
+  }
+
+  @Override
+  public boolean filter(RowData value) throws Exception {
+    if (this.wrapper == null) {
+      RowType rowType = FlinkSchemaUtil.convert(schema);
+      Types.StructType struct = schema.asStruct();
+      this.wrapper = new RowDataWrapper(rowType, struct);

Review Comment:
   Is it possible to make the wrapper serializable?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] doki23 commented on a diff in pull request #7109: Flink: Apply row level filtering

Posted by "doki23 (via GitHub)" <gi...@apache.org>.
doki23 commented on code in PR #7109:
URL: https://github.com/apache/iceberg/pull/7109#discussion_r1140949729


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java:
##########
@@ -291,7 +294,25 @@ public DataStream<RowData> build() {
         if (env.getMaxParallelism() > 0) {
           parallelism = Math.min(parallelism, env.getMaxParallelism());
         }
-        return env.createInput(format, typeInfo).setParallelism(parallelism);
+
+        DataStreamSource<RowData> source =
+            env.createInput(format, typeInfo).setParallelism(parallelism);
+
+        // Apply row filtering if required
+        if (!context.filters().isEmpty()) {

Review Comment:
   Could it be null if we call `.filters(null)`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on pull request #7109: Flink: Apply row level filtering

Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko commented on PR #7109:
URL: https://github.com/apache/iceberg/pull/7109#issuecomment-1502211699

   @stevenzwu Sorry for the long wait. I've updated the PR according to your suggestion. Looking at the FLIP27 tests, I think row filtering is already applied over there. I also took the liberty to change some public methods, since Flink 1.17 hasn't been released to the public.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on pull request #7109: Flink: Apply row level filtering

Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko commented on PR #7109:
URL: https://github.com/apache/iceberg/pull/7109#issuecomment-1517743273

   @stevenzwu sure thing! Here you go: https://github.com/apache/iceberg/pull/7397


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #7109: Flink: Apply row level filtering

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on code in PR #7109:
URL: https://github.com/apache/iceberg/pull/7109#discussion_r1142865328


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java:
##########
@@ -291,7 +294,25 @@ public DataStream<RowData> build() {
         if (env.getMaxParallelism() > 0) {
           parallelism = Math.min(parallelism, env.getMaxParallelism());
         }
-        return env.createInput(format, typeInfo).setParallelism(parallelism);
+
+        DataStreamSource<RowData> source =

Review Comment:
   another possible common place to evaluate the residual filter is probably `RowDataFileScanTaskReader`. This approach also brings a benefit that it doesn't change the shape of Flink DAG. E.g., users won't see an extra filter function/operator and wonder where does it come from.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on a diff in pull request #7109: Flink: Apply row level filtering

Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko commented on code in PR #7109:
URL: https://github.com/apache/iceberg/pull/7109#discussion_r1141491966


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkSourceFilter.java:
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.expressions.Evaluator;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.types.Types;
+
+public class FlinkSourceFilter implements FilterFunction<RowData> {
+
+  private final Schema schema;
+  private final Expression expr;
+  private final boolean caseSensitive;
+
+  private volatile RowDataWrapper wrapper;
+  private volatile Evaluator evaluator;
+
+  public FlinkSourceFilter(Schema schema, Expression expr, boolean caseSensitive) {
+    this.schema = schema;
+    this.expr = expr;
+    this.caseSensitive = caseSensitive;
+  }
+
+  @Override
+  public boolean filter(RowData value) throws Exception {
+    if (this.wrapper == null) {
+      RowType rowType = FlinkSchemaUtil.convert(schema);
+      Types.StructType struct = schema.asStruct();
+      this.wrapper = new RowDataWrapper(rowType, struct);

Review Comment:
   It is but it requires some more changes. I've pushed a commit that includes those changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on a diff in pull request #7109: Flink: Apply row level filtering

Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko commented on code in PR #7109:
URL: https://github.com/apache/iceberg/pull/7109#discussion_r1141491830


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java:
##########
@@ -291,7 +294,25 @@ public DataStream<RowData> build() {
         if (env.getMaxParallelism() > 0) {
           parallelism = Math.min(parallelism, env.getMaxParallelism());
         }
-        return env.createInput(format, typeInfo).setParallelism(parallelism);
+
+        DataStreamSource<RowData> source =
+            env.createInput(format, typeInfo).setParallelism(parallelism);
+
+        // Apply row filtering if required
+        if (!context.filters().isEmpty()) {

Review Comment:
   Good catch, added the check



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] doki23 commented on a diff in pull request #7109: Flink: Apply row level filtering

Posted by "doki23 (via GitHub)" <gi...@apache.org>.
doki23 commented on code in PR #7109:
URL: https://github.com/apache/iceberg/pull/7109#discussion_r1144509686


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkSourceFilter.java:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.expressions.Evaluator;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.types.Types;
+
+public class FlinkSourceFilter implements FilterFunction<RowData> {
+
+  private final RowDataWrapper wrapper;

Review Comment:
   Lazily initialized is ok, but it makes sense that making it serializable -- it's meaningful that we don't need to check the if statement per record.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #7109: Flink: Apply row level filtering

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on code in PR #7109:
URL: https://github.com/apache/iceberg/pull/7109#discussion_r1142850082


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java:
##########
@@ -291,7 +294,25 @@ public DataStream<RowData> build() {
         if (env.getMaxParallelism() > 0) {
           parallelism = Math.min(parallelism, env.getMaxParallelism());
         }
-        return env.createInput(format, typeInfo).setParallelism(parallelism);
+
+        DataStreamSource<RowData> source =

Review Comment:
   This covers one scenario. there are two other scenarios. I am wondering if we should fix it in the lower level of Flink readers (avro, parquet, orc).
   
   1) Use `FlinkInputFormat` directly. e.g. `StreamingReaderOperator`.
   ```
     private void processSplits() throws IOException {
       FlinkInputSplit split = splits.poll();
       if (split == null) {
         currentSplitState = SplitState.IDLE;
         return;
       }
   
       format.open(split);
       try {
         RowData nextElement = null;
         while (!format.reachedEnd()) {
           nextElement = format.nextRecord(nextElement);
           sourceContext.collect(nextElement);
         }
       } finally {
         currentSplitState = SplitState.IDLE;
         format.close();
       }
   
       // Re-schedule to process the next split.
       enqueueProcessSplits();
     }
   ```
   
   2) new Flink FLIP-27 `IcebergSource`. Here is an example from `IcebergTableSource` that shows how users can construct the DataStream. We can fix it in `IcebergTableSource`. but we can't control users' code to add the filter in the `DataStream`. 
   ```
     private DataStreamSource<RowData> createFLIP27Stream(StreamExecutionEnvironment env) {
       SplitAssignerType assignerType =
           readableConfig.get(FlinkConfigOptions.TABLE_EXEC_SPLIT_ASSIGNER_TYPE);
       IcebergSource<RowData> source =
           IcebergSource.forRowData()
               .tableLoader(loader)
               .assignerFactory(assignerType.factory())
               .properties(properties)
               .project(getProjectedSchema())
               .limit(limit)
               .filters(filters)
               .flinkConfig(readableConfig)
               .build();
       DataStreamSource stream =
           env.fromSource(
               source,
               WatermarkStrategy.noWatermarks(),
               source.name(),
               TypeInformation.of(RowData.class));
       return stream;
     }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on a diff in pull request #7109: Flink: Apply row level filtering

Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko commented on code in PR #7109:
URL: https://github.com/apache/iceberg/pull/7109#discussion_r1285211584


##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java:
##########
@@ -120,6 +138,9 @@ private CloseableIterable<RowData> newIterable(
       }
     }
 
+    if (rowFilter != null) {
+      return CloseableIterable.filter(iter, rowFilter::filter);
+    }

Review Comment:
   @advancedxy That sounds like a great approach. If we only have to apply the residual, then we're also more efficient. That would be an improvement over the current approach.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on pull request #7109: Flink: Apply row level filtering

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on PR #7109:
URL: https://github.com/apache/iceberg/pull/7109#issuecomment-1516588617

   @Fokko  can you also create the backport PR for 1.15 and 1.16?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on a diff in pull request #7109: Flink: Apply row level filtering

Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko commented on code in PR #7109:
URL: https://github.com/apache/iceberg/pull/7109#discussion_r1169854299


##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java:
##########
@@ -54,13 +58,27 @@ public class RowDataFileScanTaskReader implements FileScanTaskReader<RowData> {
   private final Schema projectedSchema;
   private final String nameMapping;
   private final boolean caseSensitive;
+  private final FlinkSourceFilter rowFilter;
 
   public RowDataFileScanTaskReader(
-      Schema tableSchema, Schema projectedSchema, String nameMapping, boolean caseSensitive) {
+      Schema tableSchema,
+      Schema projectedSchema,
+      String nameMapping,
+      boolean caseSensitive,
+      List<Expression> filters) {
     this.tableSchema = tableSchema;
     this.projectedSchema = projectedSchema;
     this.nameMapping = nameMapping;
     this.caseSensitive = caseSensitive;
+
+    if (filters != null && !filters.isEmpty()) {
+      Expression combinedExpression =
+          filters.stream().reduce(Expressions.alwaysTrue(), Expressions::and);

Review Comment:
   I fully agree. Good to pick this up in a separate PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko merged pull request #7109: Flink: Apply row level filtering

Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko merged PR #7109:
URL: https://github.com/apache/iceberg/pull/7109


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #7109: Flink: Apply row level filtering

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on code in PR #7109:
URL: https://github.com/apache/iceberg/pull/7109#discussion_r1164477096


##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java:
##########
@@ -433,6 +433,24 @@ public void testFilterExp() throws Exception {
         TestFixtures.SCHEMA);
   }
 
+  @Test
+  public void testFilterExp() throws Exception {
+    Table table =
+        catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA);
+
+    List<Record> expectedRecords = RandomGenericData.generate(TestFixtures.SCHEMA, 3, 0L);
+    expectedRecords.get(0).set(2, "a");
+    expectedRecords.get(1).set(2, "b");
+    expectedRecords.get(2).set(2, "c");
+
+    GenericAppenderHelper helper = new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER);
+    DataFile dataFile = helper.writeFile(expectedRecords);
+    helper.appendToTable(dataFile);
+    List<Row> actual =
+        runWithFilter(Expressions.greaterThanOrEqual("data", "b"), "where data>='b'");
+    TestHelpers.assertRecords(actual, expectedRecords, TestFixtures.SCHEMA);

Review Comment:
   shouldn't the filter match only 2 rows (not all 3 rows)?



##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java:
##########
@@ -140,6 +144,43 @@ public void testBasicProjection() throws IOException {
     TestHelpers.assertRows(result, expected);
   }
 
+  public static Record createRecord(Schema writeSchema, long val) {
+    Record record = GenericRecord.create(writeSchema);
+    record.set(0, val);
+    return record;
+  }
+
+  @Test
+  public void testBasicFormatFiltering() throws IOException {

Review Comment:
   this test is also not necessary, right? `TestFlinkInputFormat` extends from `TestFlinkScan` too



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java:
##########
@@ -54,13 +58,27 @@ public class RowDataFileScanTaskReader implements FileScanTaskReader<RowData> {
   private final Schema projectedSchema;
   private final String nameMapping;
   private final boolean caseSensitive;
+  private final FlinkSourceFilter rowFilter;
 
   public RowDataFileScanTaskReader(
-      Schema tableSchema, Schema projectedSchema, String nameMapping, boolean caseSensitive) {
+      Schema tableSchema,
+      Schema projectedSchema,
+      String nameMapping,
+      boolean caseSensitive,
+      List<Expression> filters) {
     this.tableSchema = tableSchema;
     this.projectedSchema = projectedSchema;
     this.nameMapping = nameMapping;
     this.caseSensitive = caseSensitive;
+
+    if (filters != null && !filters.isEmpty()) {
+      Expression combinedExpression =
+          filters.stream().reduce(Expressions.alwaysTrue(), Expressions::and);

Review Comment:
   FlinkSink should have taken a single expression (not a list) for filter. Created an issue to track: https://github.com/apache/iceberg/issues/7335



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #7109: Flink: Apply row level filtering

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on code in PR #7109:
URL: https://github.com/apache/iceberg/pull/7109#discussion_r1170300376


##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java:
##########
@@ -433,6 +433,24 @@ public void testFilterExp() throws Exception {
         TestFixtures.SCHEMA);
   }
 
+  @Test
+  public void testFilterExp() throws Exception {
+    Table table =
+        catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA);
+
+    List<Record> expectedRecords = RandomGenericData.generate(TestFixtures.SCHEMA, 3, 0L);
+    expectedRecords.get(0).set(0, "a");
+    expectedRecords.get(1).set(0, "b");
+    expectedRecords.get(2).set(0, "c");
+
+    GenericAppenderHelper helper = new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER);
+    DataFile dataFile = helper.writeFile(expectedRecords);
+    helper.appendToTable(dataFile);
+    List<Row> actual =
+        runWithFilter(Expressions.greaterThanOrEqual("data", "b"), "where data>='b'");

Review Comment:
   ideally, we also want to test the ignore case sensitive option with filter. but the current test structure seems very hard to do that. we need both filter and options



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] doki23 commented on a diff in pull request #7109: Flink: Apply row level filtering

Posted by "doki23 (via GitHub)" <gi...@apache.org>.
doki23 commented on code in PR #7109:
URL: https://github.com/apache/iceberg/pull/7109#discussion_r1140949729


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java:
##########
@@ -291,7 +294,25 @@ public DataStream<RowData> build() {
         if (env.getMaxParallelism() > 0) {
           parallelism = Math.min(parallelism, env.getMaxParallelism());
         }
-        return env.createInput(format, typeInfo).setParallelism(parallelism);
+
+        DataStreamSource<RowData> source =
+            env.createInput(format, typeInfo).setParallelism(parallelism);
+
+        // Apply row filtering if required
+        if (!context.filters().isEmpty()) {

Review Comment:
   It would cause NPE when we set `.filters(null)`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] doki23 commented on a diff in pull request #7109: Flink: Apply row level filtering

Posted by "doki23 (via GitHub)" <gi...@apache.org>.
doki23 commented on code in PR #7109:
URL: https://github.com/apache/iceberg/pull/7109#discussion_r1140949729


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java:
##########
@@ -291,7 +294,25 @@ public DataStream<RowData> build() {
         if (env.getMaxParallelism() > 0) {
           parallelism = Math.min(parallelism, env.getMaxParallelism());
         }
-        return env.createInput(format, typeInfo).setParallelism(parallelism);
+
+        DataStreamSource<RowData> source =
+            env.createInput(format, typeInfo).setParallelism(parallelism);
+
+        // Apply row filtering if required
+        if (!context.filters().isEmpty()) {

Review Comment:
   Could it be null?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #7109: Flink: Apply row level filtering

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on code in PR #7109:
URL: https://github.com/apache/iceberg/pull/7109#discussion_r1142846858


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkSourceFilter.java:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.expressions.Evaluator;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.types.Types;
+
+public class FlinkSourceFilter implements FilterFunction<RowData> {
+
+  private final RowDataWrapper wrapper;

Review Comment:
   we don't have to make `RowDataWrapper`. it can be lazily initialized.



##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkSourceFilter.java:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.expressions.Evaluator;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.types.Types;
+
+public class FlinkSourceFilter implements FilterFunction<RowData> {
+
+  private final RowDataWrapper wrapper;

Review Comment:
   we don't have to make `RowDataWrapper` serializable. it can be lazily initialized.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on pull request #7109: Flink: Apply row level filtering

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on PR #7109:
URL: https://github.com/apache/iceberg/pull/7109#issuecomment-1516587130

   @Fokko thx for fixing this issue


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on a diff in pull request #7109: Flink: Apply row level filtering

Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko commented on code in PR #7109:
URL: https://github.com/apache/iceberg/pull/7109#discussion_r1163747072


##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkSourceFilter.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.expressions.Evaluator;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.types.Types;
+
+public class FlinkSourceFilter implements FilterFunction<RowData> {
+
+  private final RowType rowType;
+  private final Evaluator evaluator;
+  private final Types.StructType struct;
+  private volatile RowDataWrapper wrapper;
+
+  public FlinkSourceFilter(Schema schema, Expression expr, boolean caseSensitive) {
+    rowType = FlinkSchemaUtil.convert(schema);
+    struct = schema.asStruct();
+    this.evaluator = new Evaluator(struct, expr, caseSensitive);
+  }
+
+  @Override
+  public boolean filter(RowData value) {
+    if (this.wrapper == null) {

Review Comment:
   I wasn't aware of that, thanks! I've updated the code 👍🏻 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on a diff in pull request #7109: Flink: Apply row level filtering

Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko commented on code in PR #7109:
URL: https://github.com/apache/iceberg/pull/7109#discussion_r1161990618


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkSourceFilter.java:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.expressions.Evaluator;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.types.Types;
+
+public class FlinkSourceFilter implements FilterFunction<RowData> {
+
+  private final RowDataWrapper wrapper;

Review Comment:
   I'm open to both. It is just one null check.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on a diff in pull request #7109: Flink: Apply row level filtering

Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko commented on code in PR #7109:
URL: https://github.com/apache/iceberg/pull/7109#discussion_r1169853525


##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java:
##########
@@ -433,6 +433,24 @@ public void testFilterExp() throws Exception {
         TestFixtures.SCHEMA);
   }
 
+  @Test
+  public void testFilterExp() throws Exception {
+    Table table =
+        catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA);
+
+    List<Record> expectedRecords = RandomGenericData.generate(TestFixtures.SCHEMA, 3, 0L);
+    expectedRecords.get(0).set(2, "a");
+    expectedRecords.get(1).set(2, "b");
+    expectedRecords.get(2).set(2, "c");
+
+    GenericAppenderHelper helper = new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER);
+    DataFile dataFile = helper.writeFile(expectedRecords);
+    helper.appendToTable(dataFile);
+    List<Row> actual =
+        runWithFilter(Expressions.greaterThanOrEqual("data", "b"), "where data>='b'");
+    TestHelpers.assertRecords(actual, expectedRecords, TestFixtures.SCHEMA);

Review Comment:
   You are correct! The code was working, but the test wasn't good. Fixed it!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on a diff in pull request #7109: Flink: Apply row level filtering

Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko commented on code in PR #7109:
URL: https://github.com/apache/iceberg/pull/7109#discussion_r1164141140


##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java:
##########
@@ -140,6 +148,90 @@ public void testBasicProjection() throws IOException {
     TestHelpers.assertRows(result, expected);
   }
 
+  @Test
+  public void testBasicFormatFiltering() throws IOException {
+    Schema writeSchema = new Schema(Types.NestedField.required(0, "id", Types.LongType.get()));
+
+    Table sourceTable =
+        catalogResource.catalog().createTable(TableIdentifier.of("default", "t"), writeSchema);
+
+    List<Record> writeRecords = RandomGenericData.generate(writeSchema, 22, 0L);
+    new GenericAppenderHelper(sourceTable, fileFormat, TEMPORARY_FOLDER)
+        .appendToTable(writeRecords);
+
+    List<Row> result =
+        runFormat(
+            FlinkSource.forRowData()
+                .tableLoader(tableLoader())
+                .filters(Collections.singletonList(Expressions.greaterThanOrEqual("id", 0)))
+                .buildFormat());
+
+    List<Row> expected =
+        Lists.newArrayList(
+            Row.of(170168523965373507L),
+            Row.of(236240913033168047L),
+            Row.of(6146794652083548235L),
+            Row.of(5072005423257391728L),
+            Row.of(171134583860878546L),
+            Row.of(8730854458729406051L),
+            Row.of(6688467811848818630L),
+            Row.of(428667830982598836L),
+            Row.of(9223372036854775807L),
+            Row.of(4922475540349336432L),
+            Row.of(7138230367502298321L),
+            Row.of(6787954838522539928L),
+            Row.of(9223372036854775807L),
+            Row.of(2440897930508784356L),
+            Row.of(282712201594543727L));
+
+    TestHelpers.assertRows(result, expected);
+  }
+
+  @Test
+  public void testBasicRowDataFiltering() throws Exception {

Review Comment:
   I've added a test in `TestFlinkScan ` that filters on a not-partitioned column



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #7109: Flink: Apply row level filtering

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on code in PR #7109:
URL: https://github.com/apache/iceberg/pull/7109#discussion_r1284732661


##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java:
##########
@@ -120,6 +138,9 @@ private CloseableIterable<RowData> newIterable(
       }
     }
 
+    if (rowFilter != null) {
+      return CloseableIterable.filter(iter, rowFilter::filter);
+    }

Review Comment:
   >  I believe maybe we don't have to passing the filters all the way done to this class.. The task itself already has the row filter task.residual(). 
   
   if that is the case, it is probably simpler.
   
   > add row filter dynamically,
   
   can you explain what's the dynamic part and how is it related to the residual filter?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] advancedxy commented on a diff in pull request #7109: Flink: Apply row level filtering

Posted by "advancedxy (via GitHub)" <gi...@apache.org>.
advancedxy commented on code in PR #7109:
URL: https://github.com/apache/iceberg/pull/7109#discussion_r1285164750


##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java:
##########
@@ -120,6 +138,9 @@ private CloseableIterable<RowData> newIterable(
       }
     }
 
+    if (rowFilter != null) {
+      return CloseableIterable.filter(iter, rowFilter::filter);
+    }

Review Comment:
   > can you explain what's the dynamic part and how is it related to the residual filter?
   
   The feature we are developing needs to add some additional scan filters at planIcebergSourceSplits phase, the additional scan filter is added to scanContext, which would be part of `task.residual`. 
   
   The additional filter itself is dynamically added.
   
   > if that is the case, it is probably simpler.
   
   Then, if we can just use `task.residual`, do you think we should refactor this PR to use that? Is it possible to revert the class interface change in this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] doki23 commented on a diff in pull request #7109: Flink: Apply row level filtering

Posted by "doki23 (via GitHub)" <gi...@apache.org>.
doki23 commented on code in PR #7109:
URL: https://github.com/apache/iceberg/pull/7109#discussion_r1140949729


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java:
##########
@@ -291,7 +294,25 @@ public DataStream<RowData> build() {
         if (env.getMaxParallelism() > 0) {
           parallelism = Math.min(parallelism, env.getMaxParallelism());
         }
-        return env.createInput(format, typeInfo).setParallelism(parallelism);
+
+        DataStreamSource<RowData> source =
+            env.createInput(format, typeInfo).setParallelism(parallelism);
+
+        // Apply row filtering if required
+        if (!context.filters().isEmpty()) {

Review Comment:
   It would cause NPE when we set `.filters(null)`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #7109: Flink: Apply row level filtering

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on code in PR #7109:
URL: https://github.com/apache/iceberg/pull/7109#discussion_r1162268348


##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java:
##########
@@ -140,6 +148,90 @@ public void testBasicProjection() throws IOException {
     TestHelpers.assertRows(result, expected);
   }
 
+  @Test
+  public void testBasicFormatFiltering() throws IOException {
+    Schema writeSchema = new Schema(Types.NestedField.required(0, "id", Types.LongType.get()));
+
+    Table sourceTable =
+        catalogResource.catalog().createTable(TableIdentifier.of("default", "t"), writeSchema);
+
+    List<Record> writeRecords = RandomGenericData.generate(writeSchema, 22, 0L);
+    new GenericAppenderHelper(sourceTable, fileFormat, TEMPORARY_FOLDER)
+        .appendToTable(writeRecords);
+
+    List<Row> result =
+        runFormat(
+            FlinkSource.forRowData()
+                .tableLoader(tableLoader())
+                .filters(Collections.singletonList(Expressions.greaterThanOrEqual("id", 0)))
+                .buildFormat());
+
+    List<Row> expected =

Review Comment:
   it is not intuitive to interpret expected records because we are using `RandomGenericData` above. I can just deterministic generate the `GenericRecord` in a for loop and set the id value to the index number. Then it is easier to understand the expected value. 



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java:
##########
@@ -120,6 +130,20 @@ private CloseableIterable<RowData> newIterable(
       }
     }
 
+    if (this.filters != null && !this.filters.isEmpty()) {
+      Expression combinedExpression = null;

Review Comment:
   nit: should we move the `FlinkSourceFilter` construction to the constructor. If it is not null, we apply the additional filter on the iterable.



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/reader/AvroGenericRecordReaderFunction.java:
##########
@@ -72,7 +73,8 @@ public AvroGenericRecordReaderFunction(
     this.io = io;
     this.encryption = encryption;
     this.rowDataReader =
-        new RowDataFileScanTaskReader(tableSchema, readSchema, nameMapping, caseSensitive);
+        new RowDataFileScanTaskReader(
+            tableSchema, readSchema, nameMapping, caseSensitive, Collections.emptyList());

Review Comment:
   this constructor needs to take the filters and pass the arg here.



##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingReaderOperator.java:
##########
@@ -109,6 +111,49 @@ public void testProcessAllRecords() throws Exception {
     }
   }
 
+  @Test
+  public void testProcessAllRecordsFilter() throws Exception {

Review Comment:
   this test seems redundant to what `TestFlinkInputFormat` already has



##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkSourceFilter.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.expressions.Evaluator;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.types.Types;
+
+public class FlinkSourceFilter implements FilterFunction<RowData> {
+
+  private final RowType rowType;
+  private final Evaluator evaluator;
+  private final Types.StructType struct;
+  private volatile RowDataWrapper wrapper;
+
+  public FlinkSourceFilter(Schema schema, Expression expr, boolean caseSensitive) {
+    rowType = FlinkSchemaUtil.convert(schema);
+    struct = schema.asStruct();
+    this.evaluator = new Evaluator(struct, expr, caseSensitive);
+  }
+
+  @Override
+  public boolean filter(RowData value) {
+    if (this.wrapper == null) {

Review Comment:
   nit: I thought the style is only to use `this.` for setting a new value



##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java:
##########
@@ -140,6 +148,90 @@ public void testBasicProjection() throws IOException {
     TestHelpers.assertRows(result, expected);
   }
 
+  @Test
+  public void testBasicFormatFiltering() throws IOException {
+    Schema writeSchema = new Schema(Types.NestedField.required(0, "id", Types.LongType.get()));
+
+    Table sourceTable =
+        catalogResource.catalog().createTable(TableIdentifier.of("default", "t"), writeSchema);
+
+    List<Record> writeRecords = RandomGenericData.generate(writeSchema, 22, 0L);
+    new GenericAppenderHelper(sourceTable, fileFormat, TEMPORARY_FOLDER)
+        .appendToTable(writeRecords);
+
+    List<Row> result =
+        runFormat(
+            FlinkSource.forRowData()
+                .tableLoader(tableLoader())
+                .filters(Collections.singletonList(Expressions.greaterThanOrEqual("id", 0)))
+                .buildFormat());
+
+    List<Row> expected =
+        Lists.newArrayList(
+            Row.of(170168523965373507L),
+            Row.of(236240913033168047L),
+            Row.of(6146794652083548235L),
+            Row.of(5072005423257391728L),
+            Row.of(171134583860878546L),
+            Row.of(8730854458729406051L),
+            Row.of(6688467811848818630L),
+            Row.of(428667830982598836L),
+            Row.of(9223372036854775807L),
+            Row.of(4922475540349336432L),
+            Row.of(7138230367502298321L),
+            Row.of(6787954838522539928L),
+            Row.of(9223372036854775807L),
+            Row.of(2440897930508784356L),
+            Row.of(282712201594543727L));
+
+    TestHelpers.assertRows(result, expected);
+  }
+
+  @Test
+  public void testBasicRowDataFiltering() throws Exception {

Review Comment:
   this method won't be necessary if we add the new test method above to `TestFlinkScan`. Currently, `TestFlinkScan#testFilterExp` only covers filter with partition column. we can add the above test method as `testResidualFilter` where filter is constructed with non-partition column.
   
   `TestFlinkScan` is the base class for both the old `FlinkSource`(covered by `TestFlinkInputFormat` and others) and the new FLIP-27 `IcebergSource` (covered by `TestIcebergSourceBounded` and others)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on a diff in pull request #7109: Flink: Apply row level filtering

Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko commented on code in PR #7109:
URL: https://github.com/apache/iceberg/pull/7109#discussion_r1169853914


##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java:
##########
@@ -140,6 +144,43 @@ public void testBasicProjection() throws IOException {
     TestHelpers.assertRows(result, expected);
   }
 
+  public static Record createRecord(Schema writeSchema, long val) {
+    Record record = GenericRecord.create(writeSchema);
+    record.set(0, val);
+    return record;
+  }
+
+  @Test
+  public void testBasicFormatFiltering() throws IOException {

Review Comment:
   Yes, this one can go! 👍🏻 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on a diff in pull request #7109: Flink: Apply row level filtering

Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko commented on code in PR #7109:
URL: https://github.com/apache/iceberg/pull/7109#discussion_r1170429093


##########
flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java:
##########
@@ -433,6 +433,24 @@ public void testFilterExp() throws Exception {
         TestFixtures.SCHEMA);
   }
 
+  @Test
+  public void testFilterExp() throws Exception {
+    Table table =
+        catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA);
+
+    List<Record> expectedRecords = RandomGenericData.generate(TestFixtures.SCHEMA, 3, 0L);
+    expectedRecords.get(0).set(0, "a");
+    expectedRecords.get(1).set(0, "b");
+    expectedRecords.get(2).set(0, "c");
+
+    GenericAppenderHelper helper = new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER);
+    DataFile dataFile = helper.writeFile(expectedRecords);
+    helper.appendToTable(dataFile);
+    List<Row> actual =
+        runWithFilter(Expressions.greaterThanOrEqual("data", "b"), "where data>='b'");

Review Comment:
   That's a good point. I got it working quite easily when using the expression. Do you know if Flink supports case-insensitive mode when it comes to Flink SQL? Seems to be an option issue: https://issues.apache.org/jira/browse/FLINK-16175



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #7109: Flink: Apply row level filtering

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on code in PR #7109:
URL: https://github.com/apache/iceberg/pull/7109#discussion_r1142850082


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java:
##########
@@ -291,7 +294,25 @@ public DataStream<RowData> build() {
         if (env.getMaxParallelism() > 0) {
           parallelism = Math.min(parallelism, env.getMaxParallelism());
         }
-        return env.createInput(format, typeInfo).setParallelism(parallelism);
+
+        DataStreamSource<RowData> source =

Review Comment:
   This covers one scenario. there are two other scenarios. 
   
   1) Use `FlinkInputFormat` directly. e.g. `StreamingReaderOperator`.
   ```
     private void processSplits() throws IOException {
       FlinkInputSplit split = splits.poll();
       if (split == null) {
         currentSplitState = SplitState.IDLE;
         return;
       }
   
       format.open(split);
       try {
         RowData nextElement = null;
         while (!format.reachedEnd()) {
           nextElement = format.nextRecord(nextElement);
           sourceContext.collect(nextElement);
         }
       } finally {
         currentSplitState = SplitState.IDLE;
         format.close();
       }
   
       // Re-schedule to process the next split.
       enqueueProcessSplits();
     }
   ```
   
   2) new Flink FLIP-27 `IcebergSource`. Here is an example from `IcebergTableSource` that shows how users can construct the DataStream. We can fix it in `IcebergTableSource`. but we can't control users' code to add the filter in the `DataStream`. Note that FLIP-27 source will be the future Flink source. 
   ```
     private DataStreamSource<RowData> createFLIP27Stream(StreamExecutionEnvironment env) {
       SplitAssignerType assignerType =
           readableConfig.get(FlinkConfigOptions.TABLE_EXEC_SPLIT_ASSIGNER_TYPE);
       IcebergSource<RowData> source =
           IcebergSource.forRowData()
               .tableLoader(loader)
               .assignerFactory(assignerType.factory())
               .properties(properties)
               .project(getProjectedSchema())
               .limit(limit)
               .filters(filters)
               .flinkConfig(readableConfig)
               .build();
       DataStreamSource stream =
           env.fromSource(
               source,
               WatermarkStrategy.noWatermarks(),
               source.name(),
               TypeInformation.of(RowData.class));
       return stream;
     }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on a diff in pull request #7109: Flink: Apply row level filtering

Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko commented on code in PR #7109:
URL: https://github.com/apache/iceberg/pull/7109#discussion_r1163758581


##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java:
##########
@@ -120,6 +130,20 @@ private CloseableIterable<RowData> newIterable(
       }
     }
 
+    if (this.filters != null && !this.filters.isEmpty()) {
+      Expression combinedExpression = null;

Review Comment:
   Updated!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on a diff in pull request #7109: Flink: Apply row level filtering

Posted by "Fokko (via GitHub)" <gi...@apache.org>.
Fokko commented on code in PR #7109:
URL: https://github.com/apache/iceberg/pull/7109#discussion_r1163781459


##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/reader/AvroGenericRecordReaderFunction.java:
##########
@@ -72,7 +73,8 @@ public AvroGenericRecordReaderFunction(
     this.io = io;
     this.encryption = encryption;
     this.rowDataReader =
-        new RowDataFileScanTaskReader(tableSchema, readSchema, nameMapping, caseSensitive);
+        new RowDataFileScanTaskReader(
+            tableSchema, readSchema, nameMapping, caseSensitive, Collections.emptyList());

Review Comment:
   Done!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org