You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by st...@apache.org on 2023/04/21 23:45:16 UTC
[iceberg] branch master updated: Flink: Backport row filter into 1.15 and 1.16 (#7397)
This is an automated email from the ASF dual-hosted git repository.
stevenwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new a36d120d75 Flink: Backport row filter into 1.15 and 1.16 (#7397)
a36d120d75 is described below
commit a36d120d755b003ae6bb19a27107a49bf1d780c6
Author: Fokko Driesprong <fo...@apache.org>
AuthorDate: Sat Apr 22 01:45:09 2023 +0200
Flink: Backport row filter into 1.15 and 1.16 (#7397)
---
.../apache/iceberg/flink/FlinkSourceFilter.java | 49 ++++++++++++++++++++++
.../iceberg/flink/source/FlinkInputFormat.java | 6 ++-
.../apache/iceberg/flink/source/IcebergSource.java | 3 +-
.../flink/source/RowDataFileScanTaskReader.java | 26 ++++++++++++
.../reader/AvroGenericRecordReaderFunction.java | 29 ++++++++++++-
.../flink/source/reader/RowDataReaderFunction.java | 34 ++++++++++++---
.../apache/iceberg/flink/source/TestFlinkScan.java | 43 +++++++++++++++++--
.../iceberg/flink/source/TestFlinkSource.java | 9 +++-
.../flink/source/TestIcebergSourceBounded.java | 9 ++--
.../TestIcebergSourceBoundedGenericRecord.java | 3 +-
.../apache/iceberg/flink/FlinkSourceFilter.java | 49 ++++++++++++++++++++++
.../iceberg/flink/source/FlinkInputFormat.java | 6 ++-
.../apache/iceberg/flink/source/IcebergSource.java | 3 +-
.../flink/source/RowDataFileScanTaskReader.java | 29 +++++++++++++
.../reader/AvroGenericRecordReaderFunction.java | 30 ++++++++++++-
.../flink/source/reader/RowDataReaderFunction.java | 27 +++++++++++-
.../apache/iceberg/flink/source/TestFlinkScan.java | 43 +++++++++++++++++--
.../iceberg/flink/source/TestFlinkSource.java | 9 +++-
.../flink/source/TestIcebergSourceBounded.java | 9 ++--
.../TestIcebergSourceBoundedGenericRecord.java | 3 +-
20 files changed, 387 insertions(+), 32 deletions(-)
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkSourceFilter.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkSourceFilter.java
new file mode 100644
index 0000000000..5fbd84909d
--- /dev/null
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkSourceFilter.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.expressions.Evaluator;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.types.Types;
+
+public class FlinkSourceFilter implements FilterFunction<RowData> {
+
+ private final RowType rowType;
+ private final Evaluator evaluator;
+ private final Types.StructType struct;
+ private volatile RowDataWrapper wrapper;
+
+ public FlinkSourceFilter(Schema schema, Expression expr, boolean caseSensitive) {
+ this.rowType = FlinkSchemaUtil.convert(schema);
+ this.struct = schema.asStruct();
+ this.evaluator = new Evaluator(struct, expr, caseSensitive);
+ }
+
+ @Override
+ public boolean filter(RowData value) {
+ if (wrapper == null) {
+ this.wrapper = new RowDataWrapper(rowType, struct);
+ }
+ return evaluator.eval(wrapper.wrap(value));
+ }
+}
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java
index 56fdc61919..9a5123dc48 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java
@@ -69,7 +69,11 @@ public class FlinkInputFormat extends RichInputFormat<RowData, FlinkInputSplit>
} else {
this.rowDataReader =
new RowDataFileScanTaskReader(
- tableSchema, context.project(), context.nameMapping(), context.caseSensitive());
+ tableSchema,
+ context.project(),
+ context.nameMapping(),
+ context.caseSensitive(),
+ context.filters());
}
}
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
index d75e79abf3..0675305e10 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
@@ -449,7 +449,8 @@ public class IcebergSource<T> implements Source<T, IcebergSourceSplit, IcebergEn
context.nameMapping(),
context.caseSensitive(),
table.io(),
- table.encryption());
+ table.encryption(),
+ context.filters());
this.readerFunction = (ReaderFunction<T>) rowDataReaderFunction;
}
}
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java
index 5fada27d54..c2c587267c 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java
@@ -18,6 +18,8 @@
*/
package org.apache.iceberg.flink.source;
+import java.util.Collections;
+import java.util.List;
import java.util.Map;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.data.RowData;
@@ -29,7 +31,10 @@ import org.apache.iceberg.StructLike;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.data.DeleteFilter;
import org.apache.iceberg.encryption.InputFilesDecryptor;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.FlinkSourceFilter;
import org.apache.iceberg.flink.RowDataWrapper;
import org.apache.iceberg.flink.data.FlinkAvroReader;
import org.apache.iceberg.flink.data.FlinkOrcReader;
@@ -54,13 +59,31 @@ public class RowDataFileScanTaskReader implements FileScanTaskReader<RowData> {
private final Schema projectedSchema;
private final String nameMapping;
private final boolean caseSensitive;
+ private final FlinkSourceFilter rowFilter;
public RowDataFileScanTaskReader(
Schema tableSchema, Schema projectedSchema, String nameMapping, boolean caseSensitive) {
+ this(tableSchema, projectedSchema, nameMapping, caseSensitive, Collections.emptyList());
+ }
+
+ public RowDataFileScanTaskReader(
+ Schema tableSchema,
+ Schema projectedSchema,
+ String nameMapping,
+ boolean caseSensitive,
+ List<Expression> filters) {
this.tableSchema = tableSchema;
this.projectedSchema = projectedSchema;
this.nameMapping = nameMapping;
this.caseSensitive = caseSensitive;
+ if (filters != null && !filters.isEmpty()) {
+ Expression combinedExpression =
+ filters.stream().reduce(Expressions.alwaysTrue(), Expressions::and);
+ this.rowFilter =
+ new FlinkSourceFilter(this.projectedSchema, combinedExpression, this.caseSensitive);
+ } else {
+ this.rowFilter = null;
+ }
}
@Override
@@ -120,6 +143,9 @@ public class RowDataFileScanTaskReader implements FileScanTaskReader<RowData> {
}
}
+ if (rowFilter != null) {
+ return CloseableIterable.filter(iter, rowFilter::filter);
+ }
return iter;
}
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/AvroGenericRecordReaderFunction.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/AvroGenericRecordReaderFunction.java
index 66c0e0ff23..b1ce166748 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/AvroGenericRecordReaderFunction.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/AvroGenericRecordReaderFunction.java
@@ -18,12 +18,15 @@
*/
package org.apache.iceberg.flink.source.reader;
+import java.util.Collections;
+import java.util.List;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.flink.source.AvroGenericRecordFileScanTaskReader;
import org.apache.iceberg.flink.source.DataIterator;
import org.apache.iceberg.flink.source.RowDataFileScanTaskReader;
@@ -57,6 +60,27 @@ public class AvroGenericRecordReaderFunction extends DataIteratorReaderFunction<
table.encryption());
}
+ public AvroGenericRecordReaderFunction(
+ String name,
+ Configuration config,
+ Schema schema,
+ Schema projectedSchema,
+ String nameMapping,
+ boolean caseSensitive,
+ FileIO io,
+ EncryptionManager encryption) {
+ this(
+ name,
+ config,
+ schema,
+ projectedSchema,
+ nameMapping,
+ caseSensitive,
+ io,
+ encryption,
+ Collections.emptyList());
+ }
+
public AvroGenericRecordReaderFunction(
String tableName,
ReadableConfig config,
@@ -65,14 +89,15 @@ public class AvroGenericRecordReaderFunction extends DataIteratorReaderFunction<
String nameMapping,
boolean caseSensitive,
FileIO io,
- EncryptionManager encryption) {
+ EncryptionManager encryption,
+ List<Expression> filters) {
super(new ListDataIteratorBatcher<>(config));
this.tableName = tableName;
this.readSchema = readSchema(tableSchema, projectedSchema);
this.io = io;
this.encryption = encryption;
this.rowDataReader =
- new RowDataFileScanTaskReader(tableSchema, readSchema, nameMapping, caseSensitive);
+ new RowDataFileScanTaskReader(tableSchema, readSchema, nameMapping, caseSensitive, filters);
}
@Override
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataReaderFunction.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataReaderFunction.java
index c747375d2a..dcf84305f8 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataReaderFunction.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataReaderFunction.java
@@ -18,10 +18,13 @@
*/
package org.apache.iceberg.flink.source.reader;
+import java.util.Collections;
+import java.util.List;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.data.RowData;
import org.apache.iceberg.Schema;
import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.source.DataIterator;
import org.apache.iceberg.flink.source.RowDataFileScanTaskReader;
@@ -36,6 +39,7 @@ public class RowDataReaderFunction extends DataIteratorReaderFunction<RowData> {
private final boolean caseSensitive;
private final FileIO io;
private final EncryptionManager encryption;
+ private final List<Expression> filters;
public RowDataReaderFunction(
ReadableConfig config,
@@ -45,23 +49,43 @@ public class RowDataReaderFunction extends DataIteratorReaderFunction<RowData> {
boolean caseSensitive,
FileIO io,
EncryptionManager encryption) {
+ this(
+ config,
+ tableSchema,
+ projectedSchema,
+ nameMapping,
+ caseSensitive,
+ io,
+ encryption,
+ Collections.emptyList());
+ }
+
+ public RowDataReaderFunction(
+ ReadableConfig config,
+ Schema schema,
+ Schema project,
+ String nameMapping,
+ boolean caseSensitive,
+ FileIO io,
+ EncryptionManager encryption,
+ List<Expression> filters) {
super(
new ArrayPoolDataIteratorBatcher<>(
config,
- new RowDataRecordFactory(
- FlinkSchemaUtil.convert(readSchema(tableSchema, projectedSchema)))));
- this.tableSchema = tableSchema;
- this.readSchema = readSchema(tableSchema, projectedSchema);
+ new RowDataRecordFactory(FlinkSchemaUtil.convert(readSchema(schema, project)))));
+ this.tableSchema = schema;
+ this.readSchema = readSchema(schema, project);
this.nameMapping = nameMapping;
this.caseSensitive = caseSensitive;
this.io = io;
this.encryption = encryption;
+ this.filters = filters;
}
@Override
public DataIterator<RowData> createDataIterator(IcebergSourceSplit split) {
return new DataIterator<>(
- new RowDataFileScanTaskReader(tableSchema, readSchema, nameMapping, caseSensitive),
+ new RowDataFileScanTaskReader(tableSchema, readSchema, nameMapping, caseSensitive, filters),
split.task(),
io,
encryption);
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java
index a6cdc212b7..aa5b51eddf 100644
--- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java
@@ -88,7 +88,12 @@ public abstract class TestFlinkScan {
protected abstract List<Row> runWithProjection(String... projected) throws Exception;
- protected abstract List<Row> runWithFilter(Expression filter, String sqlFilter) throws Exception;
+ protected abstract List<Row> runWithFilter(
+ Expression filter, String sqlFilter, boolean caseSensitive) throws Exception;
+
+ protected List<Row> runWithFilter(Expression filter, String sqlFilter) throws Exception {
+ return runWithFilter(filter, sqlFilter, true);
+ }
protected abstract List<Row> runWithOptions(Map<String, String> options) throws Exception;
@@ -409,7 +414,7 @@ public abstract class TestFlinkScan {
}
@Test
- public void testFilterExp() throws Exception {
+ public void testFilterExpPartition() throws Exception {
Table table =
catalogResource
.catalog()
@@ -428,11 +433,43 @@ public abstract class TestFlinkScan {
RandomGenericData.generate(TestFixtures.SCHEMA, 2, 0L));
helper.appendToTable(dataFile1, dataFile2);
TestHelpers.assertRecords(
- runWithFilter(Expressions.equal("dt", "2020-03-20"), "where dt='2020-03-20'"),
+ runWithFilter(Expressions.equal("dt", "2020-03-20"), "where dt='2020-03-20'", true),
expectedRecords,
TestFixtures.SCHEMA);
}
+ private void testFilterExp(Expression filter, String sqlFilter, boolean caseSensitive)
+ throws Exception {
+ Table table =
+ catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA);
+
+ List<Record> expectedRecords = RandomGenericData.generate(TestFixtures.SCHEMA, 3, 0L);
+ expectedRecords.get(0).set(0, "a");
+ expectedRecords.get(1).set(0, "b");
+ expectedRecords.get(2).set(0, "c");
+
+ GenericAppenderHelper helper = new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER);
+ DataFile dataFile = helper.writeFile(expectedRecords);
+ helper.appendToTable(dataFile);
+
+ List<Row> actual =
+ runWithFilter(Expressions.greaterThanOrEqual("data", "b"), "where data>='b'", true);
+
+ TestHelpers.assertRecords(actual, expectedRecords.subList(1, 3), TestFixtures.SCHEMA);
+ }
+
+ @Test
+ public void testFilterExp() throws Exception {
+ testFilterExp(Expressions.greaterThanOrEqual("data", "b"), "where data>='b'", true);
+ }
+
+ @Test
+ public void testFilterExpCaseInsensitive() throws Exception {
+ // sqlFilter does not support case-insensitive filtering:
+ // https://issues.apache.org/jira/browse/FLINK-16175
+ testFilterExp(Expressions.greaterThanOrEqual("DATA", "b"), "where data>='b'", false);
+ }
+
@Test
public void testPartitionTypes() throws Exception {
Schema typesSchema =
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSource.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSource.java
index cebce61c08..2b55bee6e5 100644
--- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSource.java
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSource.java
@@ -51,15 +51,20 @@ public abstract class TestFlinkSource extends TestFlinkScan {
}
@Override
- protected List<Row> runWithFilter(Expression filter, String sqlFilter) throws Exception {
+ protected List<Row> runWithFilter(Expression filter, String sqlFilter, boolean caseSensitive)
+ throws Exception {
FlinkSource.Builder builder =
FlinkSource.forRowData().filters(Collections.singletonList(filter));
- return run(builder, Maps.newHashMap(), sqlFilter, "*");
+ Map<String, String> options = Maps.newHashMap();
+ options.put("case-sensitive", Boolean.toString(caseSensitive));
+ return run(builder, options, sqlFilter, "*");
}
@Override
protected List<Row> runWithOptions(Map<String, String> options) throws Exception {
FlinkSource.Builder builder = FlinkSource.forRowData();
+ Optional.ofNullable(options.get("case-sensitive"))
+ .ifPresent(value -> builder.caseSensitive(Boolean.getBoolean(value)));
Optional.ofNullable(options.get("snapshot-id"))
.ifPresent(value -> builder.snapshotId(Long.parseLong(value)));
Optional.ofNullable(options.get("tag")).ifPresent(value -> builder.tag(value));
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBounded.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBounded.java
index 477d121316..a80f87d648 100644
--- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBounded.java
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBounded.java
@@ -18,7 +18,7 @@
*/
package org.apache.iceberg.flink.source;
-import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
@@ -68,8 +68,11 @@ public class TestIcebergSourceBounded extends TestFlinkScan {
}
@Override
- protected List<Row> runWithFilter(Expression filter, String sqlFilter) throws Exception {
- return run(null, Arrays.asList(filter), Maps.newHashMap(), sqlFilter, "*");
+ protected List<Row> runWithFilter(Expression filter, String sqlFilter, boolean caseSensitive)
+ throws Exception {
+ Map<String, String> options = Maps.newHashMap();
+ options.put("case-sensitive", Boolean.toString(caseSensitive));
+ return run(null, Collections.singletonList(filter), options, sqlFilter, "*");
}
@Override
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedGenericRecord.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedGenericRecord.java
index 4ffd92acee..0337f35970 100644
--- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedGenericRecord.java
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedGenericRecord.java
@@ -163,7 +163,8 @@ public class TestIcebergSourceBoundedGenericRecord {
null,
false,
table.io(),
- table.encryption());
+ table.encryption(),
+ filters);
IcebergSource.Builder<GenericRecord> sourceBuilder =
IcebergSource.<GenericRecord>builder()
diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkSourceFilter.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkSourceFilter.java
new file mode 100644
index 0000000000..5fbd84909d
--- /dev/null
+++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkSourceFilter.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.expressions.Evaluator;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.types.Types;
+
+public class FlinkSourceFilter implements FilterFunction<RowData> {
+
+ private final RowType rowType;
+ private final Evaluator evaluator;
+ private final Types.StructType struct;
+ private volatile RowDataWrapper wrapper;
+
+ public FlinkSourceFilter(Schema schema, Expression expr, boolean caseSensitive) {
+ this.rowType = FlinkSchemaUtil.convert(schema);
+ this.struct = schema.asStruct();
+ this.evaluator = new Evaluator(struct, expr, caseSensitive);
+ }
+
+ @Override
+ public boolean filter(RowData value) {
+ if (wrapper == null) {
+ this.wrapper = new RowDataWrapper(rowType, struct);
+ }
+ return evaluator.eval(wrapper.wrap(value));
+ }
+}
diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java
index 56fdc61919..9a5123dc48 100644
--- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java
+++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java
@@ -69,7 +69,11 @@ public class FlinkInputFormat extends RichInputFormat<RowData, FlinkInputSplit>
} else {
this.rowDataReader =
new RowDataFileScanTaskReader(
- tableSchema, context.project(), context.nameMapping(), context.caseSensitive());
+ tableSchema,
+ context.project(),
+ context.nameMapping(),
+ context.caseSensitive(),
+ context.filters());
}
}
diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
index d75e79abf3..0675305e10 100644
--- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
+++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
@@ -449,7 +449,8 @@ public class IcebergSource<T> implements Source<T, IcebergSourceSplit, IcebergEn
context.nameMapping(),
context.caseSensitive(),
table.io(),
- table.encryption());
+ table.encryption(),
+ context.filters());
this.readerFunction = (ReaderFunction<T>) rowDataReaderFunction;
}
}
diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java
index 5fada27d54..092de8c944 100644
--- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java
+++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java
@@ -18,6 +18,8 @@
*/
package org.apache.iceberg.flink.source;
+import java.util.Collections;
+import java.util.List;
import java.util.Map;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.data.RowData;
@@ -29,7 +31,10 @@ import org.apache.iceberg.StructLike;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.data.DeleteFilter;
import org.apache.iceberg.encryption.InputFilesDecryptor;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.FlinkSourceFilter;
import org.apache.iceberg.flink.RowDataWrapper;
import org.apache.iceberg.flink.data.FlinkAvroReader;
import org.apache.iceberg.flink.data.FlinkOrcReader;
@@ -55,12 +60,32 @@ public class RowDataFileScanTaskReader implements FileScanTaskReader<RowData> {
private final String nameMapping;
private final boolean caseSensitive;
+ private final FlinkSourceFilter rowFilter;
+
public RowDataFileScanTaskReader(
Schema tableSchema, Schema projectedSchema, String nameMapping, boolean caseSensitive) {
+ this(tableSchema, projectedSchema, nameMapping, caseSensitive, Collections.emptyList());
+ }
+
+ public RowDataFileScanTaskReader(
+ Schema tableSchema,
+ Schema projectedSchema,
+ String nameMapping,
+ boolean caseSensitive,
+ List<Expression> filters) {
this.tableSchema = tableSchema;
this.projectedSchema = projectedSchema;
this.nameMapping = nameMapping;
this.caseSensitive = caseSensitive;
+
+ if (filters != null && !filters.isEmpty()) {
+ Expression combinedExpression =
+ filters.stream().reduce(Expressions.alwaysTrue(), Expressions::and);
+ this.rowFilter =
+ new FlinkSourceFilter(this.projectedSchema, combinedExpression, this.caseSensitive);
+ } else {
+ this.rowFilter = null;
+ }
}
@Override
@@ -120,6 +145,10 @@ public class RowDataFileScanTaskReader implements FileScanTaskReader<RowData> {
}
}
+ if (rowFilter != null) {
+ return CloseableIterable.filter(iter, rowFilter::filter);
+ }
+
return iter;
}
diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/AvroGenericRecordReaderFunction.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/AvroGenericRecordReaderFunction.java
index 66c0e0ff23..d70820e4cc 100644
--- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/AvroGenericRecordReaderFunction.java
+++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/AvroGenericRecordReaderFunction.java
@@ -18,12 +18,15 @@
*/
package org.apache.iceberg.flink.source.reader;
+import java.util.Collections;
+import java.util.List;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.flink.source.AvroGenericRecordFileScanTaskReader;
import org.apache.iceberg.flink.source.DataIterator;
import org.apache.iceberg.flink.source.RowDataFileScanTaskReader;
@@ -54,7 +57,8 @@ public class AvroGenericRecordReaderFunction extends DataIteratorReaderFunction<
null,
false,
table.io(),
- table.encryption());
+ table.encryption(),
+ Collections.emptyList());
}
public AvroGenericRecordReaderFunction(
@@ -66,13 +70,35 @@ public class AvroGenericRecordReaderFunction extends DataIteratorReaderFunction<
boolean caseSensitive,
FileIO io,
EncryptionManager encryption) {
+ this(
+ tableName,
+ config,
+ tableSchema,
+ projectedSchema,
+ nameMapping,
+ caseSensitive,
+ io,
+ encryption,
+ Collections.emptyList());
+ }
+
+ public AvroGenericRecordReaderFunction(
+ String tableName,
+ ReadableConfig config,
+ Schema tableSchema,
+ Schema projectedSchema,
+ String nameMapping,
+ boolean caseSensitive,
+ FileIO io,
+ EncryptionManager encryption,
+ List<Expression> filters) {
super(new ListDataIteratorBatcher<>(config));
this.tableName = tableName;
this.readSchema = readSchema(tableSchema, projectedSchema);
this.io = io;
this.encryption = encryption;
this.rowDataReader =
- new RowDataFileScanTaskReader(tableSchema, readSchema, nameMapping, caseSensitive);
+ new RowDataFileScanTaskReader(tableSchema, readSchema, nameMapping, caseSensitive, filters);
}
@Override
diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataReaderFunction.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataReaderFunction.java
index c747375d2a..a40fbf0426 100644
--- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataReaderFunction.java
+++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataReaderFunction.java
@@ -18,10 +18,13 @@
*/
package org.apache.iceberg.flink.source.reader;
+import java.util.Collections;
+import java.util.List;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.data.RowData;
import org.apache.iceberg.Schema;
import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.source.DataIterator;
import org.apache.iceberg.flink.source.RowDataFileScanTaskReader;
@@ -36,6 +39,7 @@ public class RowDataReaderFunction extends DataIteratorReaderFunction<RowData> {
private final boolean caseSensitive;
private final FileIO io;
private final EncryptionManager encryption;
+ private final List<Expression> filters;
public RowDataReaderFunction(
ReadableConfig config,
@@ -45,6 +49,26 @@ public class RowDataReaderFunction extends DataIteratorReaderFunction<RowData> {
boolean caseSensitive,
FileIO io,
EncryptionManager encryption) {
+ this(
+ config,
+ tableSchema,
+ projectedSchema,
+ nameMapping,
+ caseSensitive,
+ io,
+ encryption,
+ Collections.emptyList());
+ }
+
+ public RowDataReaderFunction(
+ ReadableConfig config,
+ Schema tableSchema,
+ Schema projectedSchema,
+ String nameMapping,
+ boolean caseSensitive,
+ FileIO io,
+ EncryptionManager encryption,
+ List<Expression> filters) {
super(
new ArrayPoolDataIteratorBatcher<>(
config,
@@ -56,12 +80,13 @@ public class RowDataReaderFunction extends DataIteratorReaderFunction<RowData> {
this.caseSensitive = caseSensitive;
this.io = io;
this.encryption = encryption;
+ this.filters = filters;
}
@Override
public DataIterator<RowData> createDataIterator(IcebergSourceSplit split) {
return new DataIterator<>(
- new RowDataFileScanTaskReader(tableSchema, readSchema, nameMapping, caseSensitive),
+ new RowDataFileScanTaskReader(tableSchema, readSchema, nameMapping, caseSensitive, filters),
split.task(),
io,
encryption);
diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java
index a6cdc212b7..aa5b51eddf 100644
--- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java
+++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java
@@ -88,7 +88,12 @@ public abstract class TestFlinkScan {
protected abstract List<Row> runWithProjection(String... projected) throws Exception;
- protected abstract List<Row> runWithFilter(Expression filter, String sqlFilter) throws Exception;
+ protected abstract List<Row> runWithFilter(
+ Expression filter, String sqlFilter, boolean caseSensitive) throws Exception;
+
+ protected List<Row> runWithFilter(Expression filter, String sqlFilter) throws Exception {
+ return runWithFilter(filter, sqlFilter, true);
+ }
protected abstract List<Row> runWithOptions(Map<String, String> options) throws Exception;
@@ -409,7 +414,7 @@ public abstract class TestFlinkScan {
}
@Test
- public void testFilterExp() throws Exception {
+ public void testFilterExpPartition() throws Exception {
Table table =
catalogResource
.catalog()
@@ -428,11 +433,43 @@ public abstract class TestFlinkScan {
RandomGenericData.generate(TestFixtures.SCHEMA, 2, 0L));
helper.appendToTable(dataFile1, dataFile2);
TestHelpers.assertRecords(
- runWithFilter(Expressions.equal("dt", "2020-03-20"), "where dt='2020-03-20'"),
+ runWithFilter(Expressions.equal("dt", "2020-03-20"), "where dt='2020-03-20'", true),
expectedRecords,
TestFixtures.SCHEMA);
}
+ private void testFilterExp(Expression filter, String sqlFilter, boolean caseSensitive)
+ throws Exception {
+ Table table =
+ catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA);
+
+ List<Record> expectedRecords = RandomGenericData.generate(TestFixtures.SCHEMA, 3, 0L);
+ expectedRecords.get(0).set(0, "a");
+ expectedRecords.get(1).set(0, "b");
+ expectedRecords.get(2).set(0, "c");
+
+ GenericAppenderHelper helper = new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER);
+ DataFile dataFile = helper.writeFile(expectedRecords);
+ helper.appendToTable(dataFile);
+
+ List<Row> actual =
+ runWithFilter(Expressions.greaterThanOrEqual("data", "b"), "where data>='b'", true);
+
+ TestHelpers.assertRecords(actual, expectedRecords.subList(1, 3), TestFixtures.SCHEMA);
+ }
+
+ @Test
+ public void testFilterExp() throws Exception {
+ testFilterExp(Expressions.greaterThanOrEqual("data", "b"), "where data>='b'", true);
+ }
+
+ @Test
+ public void testFilterExpCaseInsensitive() throws Exception {
+ // sqlFilter does not support case-insensitive filtering:
+ // https://issues.apache.org/jira/browse/FLINK-16175
+ testFilterExp(Expressions.greaterThanOrEqual("DATA", "b"), "where data>='b'", false);
+ }
+
@Test
public void testPartitionTypes() throws Exception {
Schema typesSchema =
diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSource.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSource.java
index cebce61c08..2b55bee6e5 100644
--- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSource.java
+++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSource.java
@@ -51,15 +51,20 @@ public abstract class TestFlinkSource extends TestFlinkScan {
}
@Override
- protected List<Row> runWithFilter(Expression filter, String sqlFilter) throws Exception {
+ protected List<Row> runWithFilter(Expression filter, String sqlFilter, boolean caseSensitive)
+ throws Exception {
FlinkSource.Builder builder =
FlinkSource.forRowData().filters(Collections.singletonList(filter));
- return run(builder, Maps.newHashMap(), sqlFilter, "*");
+ Map<String, String> options = Maps.newHashMap();
+ options.put("case-sensitive", Boolean.toString(caseSensitive));
+ return run(builder, options, sqlFilter, "*");
}
@Override
protected List<Row> runWithOptions(Map<String, String> options) throws Exception {
FlinkSource.Builder builder = FlinkSource.forRowData();
+ Optional.ofNullable(options.get("case-sensitive"))
+ .ifPresent(value -> builder.caseSensitive(Boolean.getBoolean(value)));
Optional.ofNullable(options.get("snapshot-id"))
.ifPresent(value -> builder.snapshotId(Long.parseLong(value)));
Optional.ofNullable(options.get("tag")).ifPresent(value -> builder.tag(value));
diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBounded.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBounded.java
index 477d121316..a80f87d648 100644
--- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBounded.java
+++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBounded.java
@@ -18,7 +18,7 @@
*/
package org.apache.iceberg.flink.source;
-import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
@@ -68,8 +68,11 @@ public class TestIcebergSourceBounded extends TestFlinkScan {
}
@Override
- protected List<Row> runWithFilter(Expression filter, String sqlFilter) throws Exception {
- return run(null, Arrays.asList(filter), Maps.newHashMap(), sqlFilter, "*");
+ protected List<Row> runWithFilter(Expression filter, String sqlFilter, boolean caseSensitive)
+ throws Exception {
+ Map<String, String> options = Maps.newHashMap();
+ options.put("case-sensitive", Boolean.toString(caseSensitive));
+ return run(null, Collections.singletonList(filter), options, sqlFilter, "*");
}
@Override
diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedGenericRecord.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedGenericRecord.java
index 4ffd92acee..0337f35970 100644
--- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedGenericRecord.java
+++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedGenericRecord.java
@@ -163,7 +163,8 @@ public class TestIcebergSourceBoundedGenericRecord {
null,
false,
table.io(),
- table.encryption());
+ table.encryption(),
+ filters);
IcebergSource.Builder<GenericRecord> sourceBuilder =
IcebergSource.<GenericRecord>builder()