You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2016/09/27 16:50:20 UTC

spark git commit: [SPARK-16516][SQL] Support for pushing down filters for decimal and timestamp types in ORC

Repository: spark
Updated Branches:
  refs/heads/master 5de1737b0 -> 2cac3b2d4


[SPARK-16516][SQL] Support for pushing down filters for decimal and timestamp types in ORC

## What changes were proposed in this pull request?

It seems ORC supports all the types in  ([`PredicateLeaf.Type`](https://github.com/apache/hive/blob/e085b7e9bd059d91aaf013df0db4d71dca90ec6f/storage-api/src/java/org/apache/hadoop/hive/ql/io/sarg/PredicateLeaf.java#L50-L56)) which includes timestamp type and decimal type.

In more details, the types listed in [`SearchArgumentImpl.boxLiteral()`](https://github.com/apache/hive/blob/branch-1.2/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java#L1068-L1093) can be used as a filter value.

FYI, inital `case` caluse for supported types was introduced in https://github.com/apache/spark/commit/65d71bd9fbfe6fe1b741c80fed72d6ae3d22b028 and this was not changed overtime. At that time, Hive version was, 0.13 which supports only some types for filter-push down (See [SearchArgumentImpl.java#L945-L965](https://github.com/apache/hive/blob/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java#L945-L965) at 0.13).

However, the version was upgraded into 1.2.x and now it supports more types (See [SearchArgumentImpl.java#L1068-L1093](https://github.com/apache/hive/blob/branch-1.2/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java#L1068-L1093) at 1.2.0)

## How was this patch tested?

Unit tests in `OrcFilterSuite` and `OrcQuerySuite`

Author: hyukjinkwon <gu...@gmail.com>

Closes #14172 from HyukjinKwon/SPARK-16516.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2cac3b2d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2cac3b2d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2cac3b2d

Branch: refs/heads/master
Commit: 2cac3b2d4a4a4f3d0d45af4defc23bb0ba53484b
Parents: 5de1737
Author: hyukjinkwon <gu...@gmail.com>
Authored: Wed Sep 28 00:50:12 2016 +0800
Committer: Cheng Lian <li...@databricks.com>
Committed: Wed Sep 28 00:50:12 2016 +0800

----------------------------------------------------------------------
 .../apache/spark/sql/hive/orc/OrcFilters.scala  |  1 +
 .../spark/sql/hive/orc/OrcFilterSuite.scala     | 62 +++++++++++++++++---
 .../spark/sql/hive/orc/OrcQuerySuite.scala      | 35 +++++++++++
 3 files changed, 89 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2cac3b2d/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala
index 6ab8244..d9efd0c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala
@@ -84,6 +84,7 @@ private[orc] object OrcFilters extends Logging {
       // the `SearchArgumentImpl.BuilderImpl.boxLiteral()` method.
       case ByteType | ShortType | FloatType | DoubleType => true
       case IntegerType | LongType | StringType | BooleanType => true
+      case TimestampType | _: DecimalType => true
       case _ => false
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2cac3b2d/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala
index 471192a..222c249 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala
@@ -229,6 +229,59 @@ class OrcFilterSuite extends QueryTest with OrcTest {
     }
   }
 
+  test("filter pushdown - decimal") {
+    withOrcDataFrame((1 to 4).map(i => Tuple1.apply(BigDecimal.valueOf(i)))) { implicit df =>
+      checkFilterPredicate('_1.isNull, PredicateLeaf.Operator.IS_NULL)
+
+      checkFilterPredicate('_1 === BigDecimal.valueOf(1), PredicateLeaf.Operator.EQUALS)
+      checkFilterPredicate('_1 <=> BigDecimal.valueOf(1), PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+
+      checkFilterPredicate('_1 < BigDecimal.valueOf(2), PredicateLeaf.Operator.LESS_THAN)
+      checkFilterPredicate('_1 > BigDecimal.valueOf(3), PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate('_1 <= BigDecimal.valueOf(1), PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate('_1 >= BigDecimal.valueOf(4), PredicateLeaf.Operator.LESS_THAN)
+
+      checkFilterPredicate(
+        Literal(BigDecimal.valueOf(1)) === '_1, PredicateLeaf.Operator.EQUALS)
+      checkFilterPredicate(
+        Literal(BigDecimal.valueOf(1)) <=> '_1, PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+      checkFilterPredicate(
+        Literal(BigDecimal.valueOf(2)) > '_1, PredicateLeaf.Operator.LESS_THAN)
+      checkFilterPredicate(
+        Literal(BigDecimal.valueOf(3)) < '_1, PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate(
+        Literal(BigDecimal.valueOf(1)) >= '_1, PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate(
+        Literal(BigDecimal.valueOf(4)) <= '_1, PredicateLeaf.Operator.LESS_THAN)
+    }
+  }
+
+  test("filter pushdown - timestamp") {
+    val timeString = "2015-08-20 14:57:00"
+    val timestamps = (1 to 4).map { i =>
+      val milliseconds = Timestamp.valueOf(timeString).getTime + i * 3600
+      new Timestamp(milliseconds)
+    }
+    withOrcDataFrame(timestamps.map(Tuple1(_))) { implicit df =>
+      checkFilterPredicate('_1.isNull, PredicateLeaf.Operator.IS_NULL)
+
+      checkFilterPredicate('_1 === timestamps(0), PredicateLeaf.Operator.EQUALS)
+      checkFilterPredicate('_1 <=> timestamps(0), PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+
+      checkFilterPredicate('_1 < timestamps(1), PredicateLeaf.Operator.LESS_THAN)
+      checkFilterPredicate('_1 > timestamps(2), PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate('_1 <= timestamps(0), PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate('_1 >= timestamps(3), PredicateLeaf.Operator.LESS_THAN)
+
+      checkFilterPredicate(Literal(timestamps(0)) === '_1, PredicateLeaf.Operator.EQUALS)
+      checkFilterPredicate(Literal(timestamps(0)) <=> '_1, PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+      checkFilterPredicate(Literal(timestamps(1)) > '_1, PredicateLeaf.Operator.LESS_THAN)
+      checkFilterPredicate(Literal(timestamps(2)) < '_1, PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate(Literal(timestamps(0)) >= '_1, PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate(Literal(timestamps(3)) <= '_1, PredicateLeaf.Operator.LESS_THAN)
+    }
+  }
+
   test("filter pushdown - combinations with logical operators") {
     withOrcDataFrame((1 to 4).map(i => Tuple1(Option(i)))) { implicit df =>
       // Because `ExpressionTree` is not accessible at Hive 1.2.x, this should be checked
@@ -277,19 +330,10 @@ class OrcFilterSuite extends QueryTest with OrcTest {
     withOrcDataFrame((1 to 4).map(i => Tuple1(Array(i)))) { implicit df =>
       checkNoFilterPredicate('_1.isNull)
     }
-    // DecimalType
-    withOrcDataFrame((1 to 4).map(i => Tuple1(BigDecimal.valueOf(i)))) { implicit df =>
-      checkNoFilterPredicate('_1 <= BigDecimal.valueOf(4))
-    }
     // BinaryType
     withOrcDataFrame((1 to 4).map(i => Tuple1(i.b))) { implicit df =>
       checkNoFilterPredicate('_1 <=> 1.b)
     }
-    // TimestampType
-    val stringTimestamp = "2015-08-20 15:57:00"
-    withOrcDataFrame(Seq(Tuple1(Timestamp.valueOf(stringTimestamp)))) { implicit df =>
-      checkNoFilterPredicate('_1 <=> Timestamp.valueOf(stringTimestamp))
-    }
     // DateType
     val stringDate = "2015-01-01"
     withOrcDataFrame(Seq(Tuple1(Date.valueOf(stringDate)))) { implicit df =>

http://git-wip-us.apache.org/repos/asf/spark/blob/2cac3b2d/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
index b13878d..b2ee49c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.hive.orc
 
 import java.nio.charset.StandardCharsets
+import java.sql.Timestamp
 
 import org.scalatest.BeforeAndAfterAll
 
@@ -500,6 +501,40 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
     }
   }
 
+  test("Support for pushing down filters for decimal types") {
+    withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true") {
+      val data = (0 until 10).map(i => Tuple1(BigDecimal.valueOf(i)))
+      withTempPath { file =>
+        // It needs to repartition data so that we can have several ORC files
+        // in order to skip stripes in ORC.
+        createDataFrame(data).toDF("a").repartition(10).write.orc(file.getCanonicalPath)
+        val df = spark.read.orc(file.getCanonicalPath).where("a == 2")
+        val actual = stripSparkFilter(df).count()
+
+        assert(actual < 10)
+      }
+    }
+  }
+
+  test("Support for pushing down filters for timestamp types") {
+    withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true") {
+      val timeString = "2015-08-20 14:57:00"
+      val data = (0 until 10).map { i =>
+        val milliseconds = Timestamp.valueOf(timeString).getTime + i * 3600
+        Tuple1(new Timestamp(milliseconds))
+      }
+      withTempPath { file =>
+        // It needs to repartition data so that we can have several ORC files
+        // in order to skip stripes in ORC.
+        createDataFrame(data).toDF("a").repartition(10).write.orc(file.getCanonicalPath)
+        val df = spark.read.orc(file.getCanonicalPath).where(s"a == '$timeString'")
+        val actual = stripSparkFilter(df).count()
+
+        assert(actual < 10)
+      }
+    }
+  }
+
   test("column nullability and comment - write and then read") {
     val schema = (new StructType)
       .add("cl1", IntegerType, nullable = false, comment = "test")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org