You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2020/02/03 17:08:07 UTC
[incubator-iceberg] branch master updated: Spark: Convert In filter
to in expression (#749)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 905637d Spark: Convert In filter to in expression (#749)
905637d is described below
commit 905637d9efc4a84b15ed54024f60abb39ad44edd
Author: jun-he <ju...@users.noreply.github.com>
AuthorDate: Mon Feb 3 09:07:54 2020 -0800
Spark: Convert In filter to in expression (#749)
---
.../org/apache/iceberg/spark/SparkFilters.java | 15 +++++----
.../iceberg/spark/source/TestFilteredScan.java | 37 ++++++++++++++++++++++
2 files changed, 46 insertions(+), 6 deletions(-)
diff --git a/spark/src/main/java/org/apache/iceberg/spark/SparkFilters.java b/spark/src/main/java/org/apache/iceberg/spark/SparkFilters.java
index c6f8f52..80bae98 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/SparkFilters.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/SparkFilters.java
@@ -23,6 +23,9 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import java.sql.Date;
import java.sql.Timestamp;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expression.Operation;
import org.apache.spark.sql.catalyst.util.DateTimeUtils;
@@ -41,11 +44,11 @@ import org.apache.spark.sql.sources.Not;
import org.apache.spark.sql.sources.Or;
import org.apache.spark.sql.sources.StringStartsWith;
-import static org.apache.iceberg.expressions.Expressions.alwaysFalse;
import static org.apache.iceberg.expressions.Expressions.and;
import static org.apache.iceberg.expressions.Expressions.equal;
import static org.apache.iceberg.expressions.Expressions.greaterThan;
import static org.apache.iceberg.expressions.Expressions.greaterThanOrEqual;
+import static org.apache.iceberg.expressions.Expressions.in;
import static org.apache.iceberg.expressions.Expressions.isNull;
import static org.apache.iceberg.expressions.Expressions.lessThan;
import static org.apache.iceberg.expressions.Expressions.lessThanOrEqual;
@@ -122,11 +125,11 @@ public class SparkFilters {
case IN:
In inFilter = (In) filter;
- Expression in = alwaysFalse();
- for (Object value : inFilter.values()) {
- in = or(in, equal(inFilter.attribute(), convertLiteral(value)));
- }
- return in;
+ return in(inFilter.attribute(),
+ Stream.of(inFilter.values())
+ .filter(Objects::nonNull)
+ .map(SparkFilters::convertLiteral)
+ .collect(Collectors.toList()));
case NOT:
Not notFilter = (Not) filter;
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java
index 389a11f..e16c081 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java
@@ -56,6 +56,7 @@ import org.apache.spark.sql.sources.And;
import org.apache.spark.sql.sources.EqualTo;
import org.apache.spark.sql.sources.Filter;
import org.apache.spark.sql.sources.GreaterThan;
+import org.apache.spark.sql.sources.In;
import org.apache.spark.sql.sources.LessThan;
import org.apache.spark.sql.sources.StringStartsWith;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
@@ -426,6 +427,42 @@ public class TestFilteredScan {
}
@Test
+ public void testInFilter() {
+ File location = buildPartitionedTable("partitioned_by_data", PARTITION_BY_DATA, "data_ident", "data");
+
+ DataSourceOptions options = new DataSourceOptions(ImmutableMap.of(
+ "path", location.toString())
+ );
+
+ IcebergSource source = new IcebergSource();
+ DataSourceReader reader = source.createReader(options);
+ pushFilters(reader, new In("data", new String[]{"foo", "junction", "brush", null}));
+
+ Assert.assertEquals(2, reader.planInputPartitions().size());
+ }
+
+ @Test
+ public void testInFilterForTimestamp() {
+ File location = buildPartitionedTable("partitioned_by_hour", PARTITION_BY_HOUR, "ts_hour", "ts");
+
+ DataSourceOptions options = new DataSourceOptions(ImmutableMap.of(
+ "path", location.toString())
+ );
+
+ IcebergSource source = new IcebergSource();
+ DataSourceReader reader = source.createReader(options);
+ pushFilters(reader, new In("ts", new Timestamp[]{
+ new Timestamp(timestamp("2017-12-22T00:00:00.123+00:00") / 1000),
+ new Timestamp(timestamp("2017-12-22T09:20:44.294+00:00") / 1000),
+ new Timestamp(timestamp("2017-12-22T00:34:00.184+00:00") / 1000),
+ new Timestamp(timestamp("2017-12-21T15:15:16.230+00:00") / 1000),
+ null
+ }));
+
+ Assert.assertEquals("Should create 1 task for 2017-12-21: 15", 1, reader.planInputPartitions().size());
+ }
+
+ @Test
public void testPartitionedByDataStartsWithFilter() {
File location = buildPartitionedTable("partitioned_by_data", PARTITION_BY_DATA, "data_ident", "data");