You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2020/10/28 22:58:11 UTC
[iceberg] branch master updated: Fix IN predicate performance
(#1672)
This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 444779d Fix IN predicate performance (#1672)
444779d is described below
commit 444779d86ba7fc21a1754aa21610aa7d1dc3ea23
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Wed Oct 28 15:57:56 2020 -0700
Fix IN predicate performance (#1672)
---
.../iceberg/expressions/InclusiveMetricsEvaluator.java | 7 +++++++
.../apache/iceberg/expressions/ManifestEvaluator.java | 7 +++++++
.../expressions/TestInclusiveMetricsEvaluator.java | 10 ++++++++++
.../apache/iceberg/data/TestMetricsRowGroupFilter.java | 17 +++++++++++++++++
.../iceberg/parquet/ParquetMetricsRowGroupFilter.java | 7 +++++++
5 files changed, 48 insertions(+)
diff --git a/api/src/main/java/org/apache/iceberg/expressions/InclusiveMetricsEvaluator.java b/api/src/main/java/org/apache/iceberg/expressions/InclusiveMetricsEvaluator.java
index 7bad98c..344a453 100644
--- a/api/src/main/java/org/apache/iceberg/expressions/InclusiveMetricsEvaluator.java
+++ b/api/src/main/java/org/apache/iceberg/expressions/InclusiveMetricsEvaluator.java
@@ -46,6 +46,8 @@ import static org.apache.iceberg.expressions.Expressions.rewriteNot;
* return value of {@code eval} is false.
*/
public class InclusiveMetricsEvaluator {
+ private static final int IN_PREDICATE_LIMIT = 200;
+
private final Expression expr;
public InclusiveMetricsEvaluator(Schema schema, Expression unbound) {
@@ -274,6 +276,11 @@ public class InclusiveMetricsEvaluator {
Collection<T> literals = literalSet;
+ if (literals.size() > IN_PREDICATE_LIMIT) {
+ // skip evaluating the predicate if the number of values is too big
+ return ROWS_MIGHT_MATCH;
+ }
+
if (lowerBounds != null && lowerBounds.containsKey(id)) {
T lower = Conversions.fromByteBuffer(ref.type(), lowerBounds.get(id));
literals = literals.stream().filter(v -> ref.comparator().compare(lower, v) <= 0).collect(Collectors.toList());
diff --git a/api/src/main/java/org/apache/iceberg/expressions/ManifestEvaluator.java b/api/src/main/java/org/apache/iceberg/expressions/ManifestEvaluator.java
index 5e8be15..e0f4728 100644
--- a/api/src/main/java/org/apache/iceberg/expressions/ManifestEvaluator.java
+++ b/api/src/main/java/org/apache/iceberg/expressions/ManifestEvaluator.java
@@ -48,6 +48,8 @@ import static org.apache.iceberg.expressions.Expressions.rewriteNot;
* return value of {@code eval} is false.
*/
public class ManifestEvaluator {
+ private static final int IN_PREDICATE_LIMIT = 200;
+
private final StructType struct;
private final Expression expr;
@@ -252,6 +254,11 @@ public class ManifestEvaluator {
Collection<T> literals = literalSet;
+ if (literals.size() > IN_PREDICATE_LIMIT) {
+ // skip evaluating the predicate if the number of values is too big
+ return ROWS_MIGHT_MATCH;
+ }
+
T lower = Conversions.fromByteBuffer(ref.type(), fieldStats.lowerBound());
literals = literals.stream().filter(v -> ref.comparator().compare(lower, v) <= 0).collect(Collectors.toList());
if (literals.isEmpty()) { // if all values are less than lower bound, rows cannot match.
diff --git a/api/src/test/java/org/apache/iceberg/expressions/TestInclusiveMetricsEvaluator.java b/api/src/test/java/org/apache/iceberg/expressions/TestInclusiveMetricsEvaluator.java
index 57f5255..3863261 100644
--- a/api/src/test/java/org/apache/iceberg/expressions/TestInclusiveMetricsEvaluator.java
+++ b/api/src/test/java/org/apache/iceberg/expressions/TestInclusiveMetricsEvaluator.java
@@ -19,6 +19,7 @@
package org.apache.iceberg.expressions;
+import java.util.List;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.Schema;
@@ -26,6 +27,7 @@ import org.apache.iceberg.TestHelpers.Row;
import org.apache.iceberg.TestHelpers.TestDataFile;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.types.Types.IntegerType;
import org.apache.iceberg.types.Types.StringType;
@@ -484,6 +486,14 @@ public class TestInclusiveMetricsEvaluator {
shouldRead = new InclusiveMetricsEvaluator(SCHEMA,
in("no_nulls", "abc", "def")).eval(FILE);
Assert.assertTrue("Should read: in on no nulls column", shouldRead);
+
+ // should read as the number of elements in the in expression is too big
+ List<Integer> ids = Lists.newArrayListWithExpectedSize(400);
+ for (int id = -400; id <= 0; id++) {
+ ids.add(id);
+ }
+ shouldRead = new InclusiveMetricsEvaluator(SCHEMA, in("id", ids)).eval(FILE);
+ Assert.assertTrue("Should read: large in expression", shouldRead);
}
@Test
diff --git a/data/src/test/java/org/apache/iceberg/data/TestMetricsRowGroupFilter.java b/data/src/test/java/org/apache/iceberg/data/TestMetricsRowGroupFilter.java
index 7633bb8..1d40be1 100644
--- a/data/src/test/java/org/apache/iceberg/data/TestMetricsRowGroupFilter.java
+++ b/data/src/test/java/org/apache/iceberg/data/TestMetricsRowGroupFilter.java
@@ -22,6 +22,7 @@ package org.apache.iceberg.data;
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
+import java.util.List;
import java.util.Locale;
import java.util.UUID;
import org.apache.avro.generic.GenericData.Record;
@@ -750,6 +751,22 @@ public class TestMetricsRowGroupFilter {
Assert.assertTrue("Should read: notEqual on some nulls column", shouldRead);
}
+ @Test
+ public void testInLimitParquet() {
+ Assume.assumeTrue(format == FileFormat.PARQUET);
+
+ boolean shouldRead = shouldRead(in("id", 1, 2));
+ Assert.assertFalse("Should not read if IN is evaluated", shouldRead);
+
+ List<Integer> ids = Lists.newArrayListWithExpectedSize(400);
+ for (int id = -400; id <= 0; id++) {
+ ids.add(id);
+ }
+
+ shouldRead = shouldRead(in("id", ids));
+ Assert.assertTrue("Should read if IN is not evaluated", shouldRead);
+ }
+
private boolean shouldRead(Expression expression) {
return shouldRead(expression, true);
}
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java
index 3aa2660..e42f440 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java
@@ -47,6 +47,8 @@ import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
public class ParquetMetricsRowGroupFilter {
+ private static final int IN_PREDICATE_LIMIT = 200;
+
private final Schema schema;
private final Expression expr;
@@ -373,6 +375,11 @@ public class ParquetMetricsRowGroupFilter {
Collection<T> literals = literalSet;
+ if (literals.size() > IN_PREDICATE_LIMIT) {
+ // skip evaluating the predicate if the number of values is too big
+ return ROWS_MIGHT_MATCH;
+ }
+
T lower = min(colStats, id);
literals = literals.stream().filter(v -> ref.comparator().compare(lower, v) <= 0).collect(Collectors.toList());
if (literals.isEmpty()) { // if all values are less than lower bound, rows cannot match.