You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2015/12/22 17:40:17 UTC

spark git commit: [SPARK-11677][SQL][FOLLOW-UP] Add tests for checking the ORC filter creation against pushed down filters.

Repository: spark
Updated Branches:
  refs/heads/master 42bfde298 -> 364d244a5


[SPARK-11677][SQL][FOLLOW-UP] Add tests for checking the ORC filter creation against pushed down filters.

https://issues.apache.org/jira/browse/SPARK-11677
Although it checks correctly the filters by the number of results if ORC filter-push-down is enabled, the filters themselves are not being tested.
So, this PR includes the test similarly with `ParquetFilterSuite`.
Since the results are checked by `OrcQuerySuite`, this `OrcFilterSuite` only checks if the appropriate filters are created.

One thing different with `ParquetFilterSuite` here is, it does not check the results because that is checked in `OrcQuerySuite`.

Author: hyukjinkwon <gu...@gmail.com>

Closes #10341 from HyukjinKwon/SPARK-11677-followup.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/364d244a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/364d244a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/364d244a

Branch: refs/heads/master
Commit: 364d244a50aab9169ec1abe7e327004e681f8a71
Parents: 42bfde2
Author: hyukjinkwon <gu...@gmail.com>
Authored: Wed Dec 23 00:39:49 2015 +0800
Committer: Cheng Lian <li...@databricks.com>
Committed: Wed Dec 23 00:39:49 2015 +0800

----------------------------------------------------------------------
 .../spark/sql/hive/orc/OrcFilterSuite.scala     | 236 +++++++++++++++++++
 1 file changed, 236 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/364d244a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala
new file mode 100644
index 0000000..7b61b63
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.orc
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.hive.ql.io.sarg.{SearchArgument, PredicateLeaf}
+
+import org.apache.spark.sql.{Column, DataFrame, QueryTest}
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, LogicalRelation}
+
+/**
+ * A test suite that tests ORC filter API based filter pushdown optimization.
+ */
+class OrcFilterSuite extends QueryTest with OrcTest {
+  private def checkFilterPredicate(
+      df: DataFrame,
+      predicate: Predicate,
+      checker: (SearchArgument) => Unit): Unit = {
+    val output = predicate.collect { case a: Attribute => a }.distinct
+    val query = df
+      .select(output.map(e => Column(e)): _*)
+      .where(Column(predicate))
+
+    var maybeRelation: Option[OrcRelation] = None
+    val maybeAnalyzedPredicate = query.queryExecution.optimizedPlan.collect {
+      case PhysicalOperation(_, filters, LogicalRelation(orcRelation: OrcRelation, _)) =>
+        maybeRelation = Some(orcRelation)
+        filters
+    }.flatten.reduceLeftOption(_ && _)
+    assert(maybeAnalyzedPredicate.isDefined, "No filter is analyzed from the given query")
+
+    val (_, selectedFilters) =
+      DataSourceStrategy.selectFilters(maybeRelation.get, maybeAnalyzedPredicate.toSeq)
+    assert(selectedFilters.nonEmpty, "No filter is pushed down")
+
+    val maybeFilter = OrcFilters.createFilter(selectedFilters.toArray)
+    assert(maybeFilter.isDefined, s"Couldn't generate filter predicate for $selectedFilters")
+    checker(maybeFilter.get)
+  }
+
+  private def checkFilterPredicate
+      (predicate: Predicate, filterOperator: PredicateLeaf.Operator)
+      (implicit df: DataFrame): Unit = {
+    def checkComparisonOperator(filter: SearchArgument) = {
+      val operator = filter.getLeaves.asScala.head.getOperator
+      assert(operator === filterOperator)
+    }
+    checkFilterPredicate(df, predicate, checkComparisonOperator)
+  }
+
+  private def checkFilterPredicate
+      (predicate: Predicate, stringExpr: String)
+      (implicit df: DataFrame): Unit = {
+    def checkLogicalOperator(filter: SearchArgument) = {
+      assert(filter.toString == stringExpr)
+    }
+    checkFilterPredicate(df, predicate, checkLogicalOperator)
+  }
+
+  test("filter pushdown - boolean") {
+    withOrcDataFrame((true :: false :: Nil).map(b => Tuple1.apply(Option(b)))) { implicit df =>
+      checkFilterPredicate('_1.isNull, PredicateLeaf.Operator.IS_NULL)
+    }
+  }
+
+  test("filter pushdown - integer") {
+    withOrcDataFrame((1 to 4).map(i => Tuple1(Option(i)))) { implicit df =>
+      checkFilterPredicate('_1.isNull, PredicateLeaf.Operator.IS_NULL)
+
+      checkFilterPredicate('_1 === 1, PredicateLeaf.Operator.EQUALS)
+      checkFilterPredicate('_1 <=> 1, PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+
+      checkFilterPredicate('_1 < 2, PredicateLeaf.Operator.LESS_THAN)
+      checkFilterPredicate('_1 > 3, PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate('_1 <= 1, PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate('_1 >= 4, PredicateLeaf.Operator.LESS_THAN)
+
+      checkFilterPredicate(Literal(1) === '_1, PredicateLeaf.Operator.EQUALS)
+      checkFilterPredicate(Literal(1) <=> '_1, PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+      checkFilterPredicate(Literal(2) > '_1, PredicateLeaf.Operator.LESS_THAN)
+      checkFilterPredicate(Literal(3) < '_1, PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate(Literal(1) >= '_1, PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate(Literal(4) <= '_1, PredicateLeaf.Operator.LESS_THAN)
+    }
+  }
+
+  test("filter pushdown - long") {
+    withOrcDataFrame((1 to 4).map(i => Tuple1(Option(i.toLong)))) { implicit df =>
+      checkFilterPredicate('_1.isNull, PredicateLeaf.Operator.IS_NULL)
+
+      checkFilterPredicate('_1 === 1, PredicateLeaf.Operator.EQUALS)
+      checkFilterPredicate('_1 <=> 1, PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+
+      checkFilterPredicate('_1 < 2, PredicateLeaf.Operator.LESS_THAN)
+      checkFilterPredicate('_1 > 3, PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate('_1 <= 1, PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate('_1 >= 4, PredicateLeaf.Operator.LESS_THAN)
+
+      checkFilterPredicate(Literal(1) === '_1, PredicateLeaf.Operator.EQUALS)
+      checkFilterPredicate(Literal(1) <=> '_1, PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+      checkFilterPredicate(Literal(2) > '_1, PredicateLeaf.Operator.LESS_THAN)
+      checkFilterPredicate(Literal(3) < '_1, PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate(Literal(1) >= '_1, PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate(Literal(4) <= '_1, PredicateLeaf.Operator.LESS_THAN)
+    }
+  }
+
+  test("filter pushdown - float") {
+    withOrcDataFrame((1 to 4).map(i => Tuple1(Option(i.toFloat)))) { implicit df =>
+      checkFilterPredicate('_1.isNull, PredicateLeaf.Operator.IS_NULL)
+
+      checkFilterPredicate('_1 === 1, PredicateLeaf.Operator.EQUALS)
+      checkFilterPredicate('_1 <=> 1, PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+
+      checkFilterPredicate('_1 < 2, PredicateLeaf.Operator.LESS_THAN)
+      checkFilterPredicate('_1 > 3, PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate('_1 <= 1, PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate('_1 >= 4, PredicateLeaf.Operator.LESS_THAN)
+
+      checkFilterPredicate(Literal(1) === '_1, PredicateLeaf.Operator.EQUALS)
+      checkFilterPredicate(Literal(1) <=> '_1, PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+      checkFilterPredicate(Literal(2) > '_1, PredicateLeaf.Operator.LESS_THAN)
+      checkFilterPredicate(Literal(3) < '_1, PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate(Literal(1) >= '_1, PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate(Literal(4) <= '_1, PredicateLeaf.Operator.LESS_THAN)
+    }
+  }
+
+  test("filter pushdown - double") {
+    withOrcDataFrame((1 to 4).map(i => Tuple1(Option(i.toDouble)))) { implicit df =>
+      checkFilterPredicate('_1.isNull, PredicateLeaf.Operator.IS_NULL)
+
+      checkFilterPredicate('_1 === 1, PredicateLeaf.Operator.EQUALS)
+      checkFilterPredicate('_1 <=> 1, PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+
+      checkFilterPredicate('_1 < 2, PredicateLeaf.Operator.LESS_THAN)
+      checkFilterPredicate('_1 > 3, PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate('_1 <= 1, PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate('_1 >= 4, PredicateLeaf.Operator.LESS_THAN)
+
+      checkFilterPredicate(Literal(1) === '_1, PredicateLeaf.Operator.EQUALS)
+      checkFilterPredicate(Literal(1) <=> '_1, PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+      checkFilterPredicate(Literal(2) > '_1, PredicateLeaf.Operator.LESS_THAN)
+      checkFilterPredicate(Literal(3) < '_1, PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate(Literal(1) >= '_1, PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate(Literal(4) <= '_1, PredicateLeaf.Operator.LESS_THAN)
+    }
+  }
+
+  test("filter pushdown - string") {
+    withOrcDataFrame((1 to 4).map(i => Tuple1(i.toString))) { implicit df =>
+      checkFilterPredicate('_1.isNull, PredicateLeaf.Operator.IS_NULL)
+
+      checkFilterPredicate('_1 === "1", PredicateLeaf.Operator.EQUALS)
+      checkFilterPredicate('_1 <=> "1", PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+
+      checkFilterPredicate('_1 < "2", PredicateLeaf.Operator.LESS_THAN)
+      checkFilterPredicate('_1 > "3", PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate('_1 <= "1", PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate('_1 >= "4", PredicateLeaf.Operator.LESS_THAN)
+
+      checkFilterPredicate(Literal("1") === '_1, PredicateLeaf.Operator.EQUALS)
+      checkFilterPredicate(Literal("1") <=> '_1, PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+      checkFilterPredicate(Literal("2") > '_1, PredicateLeaf.Operator.LESS_THAN)
+      checkFilterPredicate(Literal("3") < '_1, PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate(Literal("1") >= '_1, PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate(Literal("4") <= '_1, PredicateLeaf.Operator.LESS_THAN)
+    }
+  }
+
+  test("filter pushdown - binary") {
+    implicit class IntToBinary(int: Int) {
+      def b: Array[Byte] = int.toString.getBytes("UTF-8")
+    }
+
+    withOrcDataFrame((1 to 4).map(i => Tuple1(i.b))) { implicit df =>
+      checkFilterPredicate('_1.isNull, PredicateLeaf.Operator.IS_NULL)
+    }
+  }
+
+  test("filter pushdown - combinations with logical operators") {
+    withOrcDataFrame((1 to 4).map(i => Tuple1(Option(i)))) { implicit df =>
+      // Because `ExpressionTree` is not accessible at Hive 1.2.x, this should be checked
+      // in string form in order to check filter creation including logical operators
+      // such as `and`, `or` or `not`. So, this function uses `SearchArgument.toString()`
+      // to produce string expression and then compare it to given string expression below.
+      // This might have to be changed after Hive version is upgraded.
+      checkFilterPredicate(
+        '_1.isNotNull,
+        """leaf-0 = (IS_NULL _1)
+          |expr = (not leaf-0)""".stripMargin.trim
+      )
+      checkFilterPredicate(
+        '_1 !== 1,
+        """leaf-0 = (EQUALS _1 1)
+          |expr = (not leaf-0)""".stripMargin.trim
+      )
+      checkFilterPredicate(
+        !('_1 < 4),
+        """leaf-0 = (LESS_THAN _1 4)
+          |expr = (not leaf-0)""".stripMargin.trim
+      )
+      checkFilterPredicate(
+        '_1 < 2 || '_1 > 3,
+        """leaf-0 = (LESS_THAN _1 2)
+          |leaf-1 = (LESS_THAN_EQUALS _1 3)
+          |expr = (or leaf-0 (not leaf-1))""".stripMargin.trim
+      )
+      checkFilterPredicate(
+        '_1 < 2 && '_1 > 3,
+        """leaf-0 = (LESS_THAN _1 2)
+          |leaf-1 = (LESS_THAN_EQUALS _1 3)
+          |expr = (and leaf-0 (not leaf-1))""".stripMargin.trim
+      )
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org