You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/11/19 02:40:27 UTC

spark git commit: [SPARK-4468][SQL] Backports #3334 to branch-1.1

Repository: spark
Updated Branches:
  refs/heads/branch-1.1 ae9b1f690 -> f9739b9c8


[SPARK-4468][SQL] Backports #3334 to branch-1.1

<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/3338)
<!-- Reviewable:end -->

Author: Cheng Lian <li...@databricks.com>

Closes #3338 from liancheng/spark-3334-for-1.1 and squashes the following commits:

bd17512 [Cheng Lian] Backports #3334 to branch-1.1


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f9739b9c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f9739b9c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f9739b9c

Branch: refs/heads/branch-1.1
Commit: f9739b9c886b1c207753ebf7067c09a60eff1695
Parents: ae9b1f6
Author: Cheng Lian <li...@databricks.com>
Authored: Tue Nov 18 17:40:24 2014 -0800
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Tue Nov 18 17:40:24 2014 -0800

----------------------------------------------------------------------
 .../spark/sql/parquet/ParquetFilters.scala      |  13 ++-
 .../spark/sql/parquet/ParquetQuerySuite.scala   | 107 ++++++++++++-------
 2 files changed, 75 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f9739b9c/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala
index 7c83f1c..0365c34 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala
@@ -213,22 +213,27 @@ private[sql] object ParquetFilters {
         Some(createEqualityFilter(right.name, left, p))
       case p @ EqualTo(left: NamedExpression, right: Literal) if !left.nullable =>
         Some(createEqualityFilter(left.name, right, p))
+
       case p @ LessThan(left: Literal, right: NamedExpression) if !right.nullable =>
-        Some(createLessThanFilter(right.name, left, p))
+        Some(createGreaterThanFilter(right.name, left, p))
       case p @ LessThan(left: NamedExpression, right: Literal) if !left.nullable =>
         Some(createLessThanFilter(left.name, right, p))
+
       case p @ LessThanOrEqual(left: Literal, right: NamedExpression) if !right.nullable =>
-        Some(createLessThanOrEqualFilter(right.name, left, p))
+        Some(createGreaterThanOrEqualFilter(right.name, left, p))
       case p @ LessThanOrEqual(left: NamedExpression, right: Literal) if !left.nullable =>
         Some(createLessThanOrEqualFilter(left.name, right, p))
+
       case p @ GreaterThan(left: Literal, right: NamedExpression) if !right.nullable =>
-        Some(createGreaterThanFilter(right.name, left, p))
+        Some(createLessThanFilter(right.name, left, p))
       case p @ GreaterThan(left: NamedExpression, right: Literal) if !left.nullable =>
         Some(createGreaterThanFilter(left.name, right, p))
+
       case p @ GreaterThanOrEqual(left: Literal, right: NamedExpression) if !right.nullable =>
-        Some(createGreaterThanOrEqualFilter(right.name, left, p))
+        Some(createLessThanOrEqualFilter(right.name, left, p))
       case p @ GreaterThanOrEqual(left: NamedExpression, right: Literal) if !left.nullable =>
         Some(createGreaterThanOrEqualFilter(left.name, right, p))
+
       case _ => None
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/f9739b9c/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
index c6b790a..10df1fa 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
@@ -17,20 +17,19 @@
 
 package org.apache.spark.sql.parquet
 
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.mapreduce.Job
 import org.scalatest.{BeforeAndAfterAll, FunSuiteLike}
-
 import parquet.hadoop.ParquetFileWriter
 import parquet.hadoop.util.ContextUtil
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.mapreduce.Job
 
 import org.apache.spark.SparkContext
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.{SqlLexical, SqlParser}
 import org.apache.spark.sql.catalyst.analysis.{Star, UnresolvedAttribute}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.types.{BooleanType, IntegerType}
 import org.apache.spark.sql.catalyst.util.getTempFilePath
+import org.apache.spark.sql.catalyst.{SqlLexical, SqlParser}
 import org.apache.spark.sql.test.TestSQLContext
 import org.apache.spark.sql.test.TestSQLContext._
 import org.apache.spark.util.Utils
@@ -453,43 +452,46 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
   }
 
   test("create RecordFilter for simple predicates") {
-    val attribute1 = new AttributeReference("first", IntegerType, false)()
-    val predicate1 = new EqualTo(attribute1, new Literal(1, IntegerType))
-    val filter1 = ParquetFilters.createFilter(predicate1)
-    assert(filter1.isDefined)
-    assert(filter1.get.predicate == predicate1, "predicates do not match")
-    assert(filter1.get.isInstanceOf[ComparisonFilter])
-    val cmpFilter1 = filter1.get.asInstanceOf[ComparisonFilter]
-    assert(cmpFilter1.columnName == "first", "column name incorrect")
-
-    val predicate2 = new LessThan(attribute1, new Literal(4, IntegerType))
-    val filter2 = ParquetFilters.createFilter(predicate2)
-    assert(filter2.isDefined)
-    assert(filter2.get.predicate == predicate2, "predicates do not match")
-    assert(filter2.get.isInstanceOf[ComparisonFilter])
-    val cmpFilter2 = filter2.get.asInstanceOf[ComparisonFilter]
-    assert(cmpFilter2.columnName == "first", "column name incorrect")
-
-    val predicate3 = new And(predicate1, predicate2)
-    val filter3 = ParquetFilters.createFilter(predicate3)
-    assert(filter3.isDefined)
-    assert(filter3.get.predicate == predicate3, "predicates do not match")
-    assert(filter3.get.isInstanceOf[AndFilter])
-
-    val predicate4 = new Or(predicate1, predicate2)
-    val filter4 = ParquetFilters.createFilter(predicate4)
-    assert(filter4.isDefined)
-    assert(filter4.get.predicate == predicate4, "predicates do not match")
-    assert(filter4.get.isInstanceOf[OrFilter])
-
-    val attribute2 = new AttributeReference("second", IntegerType, false)()
-    val predicate5 = new GreaterThan(attribute1, attribute2)
-    val badfilter = ParquetFilters.createFilter(predicate5)
-    assert(badfilter.isDefined === false)
-
-    val predicate6 = And(GreaterThan(attribute1, attribute2), GreaterThan(attribute1, attribute2))
-    val badfilter2 = ParquetFilters.createFilter(predicate6)
-    assert(badfilter2.isDefined === false)
+    def checkFilter(predicate: Predicate): Option[CatalystFilter] = {
+      ParquetFilters.createFilter(predicate).map { f =>
+        assertResult(predicate)(f.predicate)
+        f
+      }.orElse {
+        fail(s"filter $predicate not pushed down")
+      }
+    }
+
+    def checkComparisonFilter(predicate: Predicate, columnName: String): Unit = {
+      assertResult(columnName, "column name incorrect") {
+        checkFilter(predicate).map(_.asInstanceOf[ComparisonFilter].columnName).get
+      }
+    }
+
+    def checkInvalidFilter(predicate: Predicate): Unit = {
+      assert(ParquetFilters.createFilter(predicate).isEmpty)
+    }
+
+    val a = 'a.int.notNull
+    val b = 'b.int.notNull
+
+    checkComparisonFilter(a === 1, "a")
+    checkComparisonFilter(Literal(1) === a, "a")
+
+    checkComparisonFilter(a < 4, "a")
+    checkComparisonFilter(a > 4, "a")
+    checkComparisonFilter(a <= 4, "a")
+    checkComparisonFilter(a >= 4, "a")
+
+    checkComparisonFilter(Literal(4) > a, "a")
+    checkComparisonFilter(Literal(4) < a, "a")
+    checkComparisonFilter(Literal(4) >= a, "a")
+    checkComparisonFilter(Literal(4) <= a, "a")
+
+    checkFilter(a === 1 && a < 4)
+    checkFilter(a === 1 || a < 4)
+
+    checkInvalidFilter(a > b)
+    checkInvalidFilter((a > b) && (a > b))
   }
 
   test("test filter by predicate pushdown") {
@@ -516,6 +518,29 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
         assert(result2(49)(1) === 199)
       }
     }
+    for(myval <- Seq("myint", "mylong", "mydouble", "myfloat")) {
+      val query1 = sql(s"SELECT * FROM testfiltersource WHERE 150 > $myval AND 100 <= $myval")
+      assert(
+        query1.queryExecution.executedPlan(0)(0).isInstanceOf[ParquetTableScan],
+        "Top operator should be ParquetTableScan after pushdown")
+      val result1 = query1.collect()
+      assert(result1.size === 50)
+      assert(result1(0)(1) === 100)
+      assert(result1(49)(1) === 149)
+      val query2 = sql(s"SELECT * FROM testfiltersource WHERE 150 < $myval AND 200 >= $myval")
+      assert(
+        query2.queryExecution.executedPlan(0)(0).isInstanceOf[ParquetTableScan],
+        "Top operator should be ParquetTableScan after pushdown")
+      val result2 = query2.collect()
+      assert(result2.size === 50)
+      if (myval == "myint" || myval == "mylong") {
+        assert(result2(0)(1) === 151)
+        assert(result2(49)(1) === 200)
+      } else {
+        assert(result2(0)(1) === 150)
+        assert(result2(49)(1) === 199)
+      }
+    }
     for(myval <- Seq("myint", "mylong")) {
       val query3 = sql(s"SELECT * FROM testfiltersource WHERE $myval > 190 OR $myval < 10")
       assert(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org