You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2020/02/03 17:08:07 UTC

[incubator-iceberg] branch master updated: Spark: Convert In filter to in expression (#749)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 905637d  Spark: Convert In filter to in expression (#749)
905637d is described below

commit 905637d9efc4a84b15ed54024f60abb39ad44edd
Author: jun-he <ju...@users.noreply.github.com>
AuthorDate: Mon Feb 3 09:07:54 2020 -0800

    Spark: Convert In filter to in expression (#749)
---
 .../org/apache/iceberg/spark/SparkFilters.java     | 15 +++++----
 .../iceberg/spark/source/TestFilteredScan.java     | 37 ++++++++++++++++++++++
 2 files changed, 46 insertions(+), 6 deletions(-)

diff --git a/spark/src/main/java/org/apache/iceberg/spark/SparkFilters.java b/spark/src/main/java/org/apache/iceberg/spark/SparkFilters.java
index c6f8f52..80bae98 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/SparkFilters.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/SparkFilters.java
@@ -23,6 +23,9 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import java.sql.Date;
 import java.sql.Timestamp;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expression.Operation;
 import org.apache.spark.sql.catalyst.util.DateTimeUtils;
@@ -41,11 +44,11 @@ import org.apache.spark.sql.sources.Not;
 import org.apache.spark.sql.sources.Or;
 import org.apache.spark.sql.sources.StringStartsWith;
 
-import static org.apache.iceberg.expressions.Expressions.alwaysFalse;
 import static org.apache.iceberg.expressions.Expressions.and;
 import static org.apache.iceberg.expressions.Expressions.equal;
 import static org.apache.iceberg.expressions.Expressions.greaterThan;
 import static org.apache.iceberg.expressions.Expressions.greaterThanOrEqual;
+import static org.apache.iceberg.expressions.Expressions.in;
 import static org.apache.iceberg.expressions.Expressions.isNull;
 import static org.apache.iceberg.expressions.Expressions.lessThan;
 import static org.apache.iceberg.expressions.Expressions.lessThanOrEqual;
@@ -122,11 +125,11 @@ public class SparkFilters {
 
         case IN:
           In inFilter = (In) filter;
-          Expression in = alwaysFalse();
-          for (Object value : inFilter.values()) {
-            in = or(in, equal(inFilter.attribute(), convertLiteral(value)));
-          }
-          return in;
+          return in(inFilter.attribute(),
+              Stream.of(inFilter.values())
+                  .filter(Objects::nonNull)
+                  .map(SparkFilters::convertLiteral)
+                  .collect(Collectors.toList()));
 
         case NOT:
           Not notFilter = (Not) filter;
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java
index 389a11f..e16c081 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java
@@ -56,6 +56,7 @@ import org.apache.spark.sql.sources.And;
 import org.apache.spark.sql.sources.EqualTo;
 import org.apache.spark.sql.sources.Filter;
 import org.apache.spark.sql.sources.GreaterThan;
+import org.apache.spark.sql.sources.In;
 import org.apache.spark.sql.sources.LessThan;
 import org.apache.spark.sql.sources.StringStartsWith;
 import org.apache.spark.sql.sources.v2.DataSourceOptions;
@@ -426,6 +427,42 @@ public class TestFilteredScan {
   }
 
   @Test
+  public void testInFilter() {
+    File location = buildPartitionedTable("partitioned_by_data", PARTITION_BY_DATA, "data_ident", "data");
+
+    DataSourceOptions options = new DataSourceOptions(ImmutableMap.of(
+        "path", location.toString())
+    );
+
+    IcebergSource source = new IcebergSource();
+    DataSourceReader reader = source.createReader(options);
+    pushFilters(reader, new In("data", new String[]{"foo", "junction", "brush", null}));
+
+    Assert.assertEquals(2, reader.planInputPartitions().size());
+  }
+
+  @Test
+  public void testInFilterForTimestamp() {
+    File location = buildPartitionedTable("partitioned_by_hour", PARTITION_BY_HOUR, "ts_hour", "ts");
+
+    DataSourceOptions options = new DataSourceOptions(ImmutableMap.of(
+        "path", location.toString())
+    );
+
+    IcebergSource source = new IcebergSource();
+    DataSourceReader reader = source.createReader(options);
+    pushFilters(reader, new In("ts", new Timestamp[]{
+        new Timestamp(timestamp("2017-12-22T00:00:00.123+00:00") / 1000),
+        new Timestamp(timestamp("2017-12-22T09:20:44.294+00:00") / 1000),
+        new Timestamp(timestamp("2017-12-22T00:34:00.184+00:00") / 1000),
+        new Timestamp(timestamp("2017-12-21T15:15:16.230+00:00") / 1000),
+        null
+    }));
+
+    Assert.assertEquals("Should create 1 task for 2017-12-21: 15", 1, reader.planInputPartitions().size());
+  }
+
+  @Test
   public void testPartitionedByDataStartsWithFilter() {
     File location = buildPartitionedTable("partitioned_by_data", PARTITION_BY_DATA, "data_ident", "data");