You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/12/30 22:34:40 UTC

spark git commit: [SPARK-12409][SPARK-12387][SPARK-12391][SQL] Support AND/OR/IN/LIKE push-down filters for JDBC

Repository: spark
Updated Branches:
  refs/heads/master 27a42c710 -> 5c2682b0c


[SPARK-12409][SPARK-12387][SPARK-12391][SQL] Support AND/OR/IN/LIKE push-down filters for JDBC

This is rework from #10386 and add more tests and LIKE push-down support.

Author: Takeshi YAMAMURO <li...@gmail.com>

Closes #10468 from maropu/SupportMorePushdownInJdbc.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5c2682b0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5c2682b0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5c2682b0

Branch: refs/heads/master
Commit: 5c2682b0c8fd2aeae2af1adb716ee0d5f8b85135
Parents: 27a42c7
Author: Takeshi YAMAMURO <li...@gmail.com>
Authored: Wed Dec 30 13:34:37 2015 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Wed Dec 30 13:34:37 2015 -0800

----------------------------------------------------------------------
 .../execution/datasources/jdbc/JDBCRDD.scala    |  9 ++++++-
 .../org/apache/spark/sql/jdbc/JDBCSuite.scala   | 28 +++++++++++++++++++-
 2 files changed, 35 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5c2682b0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
index 4e2f505..7072ee4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
@@ -179,6 +179,7 @@ private[sql] object JDBCRDD extends Logging {
     case stringValue: String => s"'${escapeSql(stringValue)}'"
     case timestampValue: Timestamp => "'" + timestampValue + "'"
     case dateValue: Date => "'" + dateValue + "'"
+    case arrayValue: Array[Object] => arrayValue.map(compileValue).mkString(", ")
     case _ => value
   }
 
@@ -191,13 +192,19 @@ private[sql] object JDBCRDD extends Logging {
    */
   private def compileFilter(f: Filter): String = f match {
     case EqualTo(attr, value) => s"$attr = ${compileValue(value)}"
-    case Not(EqualTo(attr, value)) => s"$attr != ${compileValue(value)}"
+    case Not(f) => s"(NOT (${compileFilter(f)}))"
     case LessThan(attr, value) => s"$attr < ${compileValue(value)}"
     case GreaterThan(attr, value) => s"$attr > ${compileValue(value)}"
     case LessThanOrEqual(attr, value) => s"$attr <= ${compileValue(value)}"
     case GreaterThanOrEqual(attr, value) => s"$attr >= ${compileValue(value)}"
+    case StringStartsWith(attr, value) => s"${attr} LIKE '${value}%'"
+    case StringEndsWith(attr, value) => s"${attr} LIKE '%${value}'"
+    case StringContains(attr, value) => s"${attr} LIKE '%${value}%'"
     case IsNull(attr) => s"$attr IS NULL"
     case IsNotNull(attr) => s"$attr IS NOT NULL"
+    case In(attr, value) => s"$attr IN (${compileValue(value)})"
+    case Or(f1, f2) => s"(${compileFilter(f1)}) OR (${compileFilter(f2)})"
+    case And(f1, f2) => s"(${compileFilter(f1)}) AND (${compileFilter(f2)})"
     case _ => null
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5c2682b0/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 4044a10..00e37f1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -26,6 +26,7 @@ import org.scalatest.BeforeAndAfter
 import org.scalatest.PrivateMethodTester
 
 import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.Row
 import org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types._
@@ -186,8 +187,26 @@ class JDBCSuite extends SparkFunSuite
     assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME = 'fred'")).collect().size == 1)
     assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME > 'fred'")).collect().size == 2)
     assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME != 'fred'")).collect().size == 2)
+    assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME IN ('mary', 'fred')"))
+      .collect().size == 2)
+    assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME NOT IN ('fred')"))
+      .collect().size === 2)
+    assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE THEID = 1 OR NAME = 'mary'"))
+      .collect().size == 2)
+    assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE THEID = 1 OR NAME = 'mary' "
+      + "AND THEID = 2")).collect().size == 2)
+    assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME LIKE 'fr%'")).collect().size == 1)
+    assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME LIKE '%ed'")).collect().size == 1)
+    assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME LIKE '%re%'")).collect().size == 1)
     assert(stripSparkFilter(sql("SELECT * FROM nulltypes WHERE A IS NULL")).collect().size == 1)
     assert(stripSparkFilter(sql("SELECT * FROM nulltypes WHERE A IS NOT NULL")).collect().size == 0)
+
+    // This is a test to reflect discussion in SPARK-12218.
+    // The older versions of spark have this kind of bugs in parquet data source.
+    val df1 = sql("SELECT * FROM foobar WHERE NOT (THEID != 2 AND NAME != 'mary')")
+    val df2 = sql("SELECT * FROM foobar WHERE NOT (THEID != 2) OR NOT (NAME != 'mary')")
+    assert(df1.collect.toSet === Set(Row("mary", 2)))
+    assert(df2.collect.toSet === Set(Row("mary", 2)))
   }
 
   test("SELECT * WHERE (quoted strings)") {
@@ -437,7 +456,11 @@ class JDBCSuite extends SparkFunSuite
     val compileFilter = PrivateMethod[String]('compileFilter)
     def doCompileFilter(f: Filter): String = JDBCRDD invokePrivate compileFilter(f)
     assert(doCompileFilter(EqualTo("col0", 3)) === "col0 = 3")
-    assert(doCompileFilter(Not(EqualTo("col1", "abc"))) === "col1 != 'abc'")
+    assert(doCompileFilter(Not(EqualTo("col1", "abc"))) === "(NOT (col1 = 'abc'))")
+    assert(doCompileFilter(And(EqualTo("col0", 0), EqualTo("col1", "def")))
+      === "(col0 = 0) AND (col1 = 'def')")
+    assert(doCompileFilter(Or(EqualTo("col0", 2), EqualTo("col1", "ghi")))
+      === "(col0 = 2) OR (col1 = 'ghi')")
     assert(doCompileFilter(LessThan("col0", 5)) === "col0 < 5")
     assert(doCompileFilter(LessThan("col3",
       Timestamp.valueOf("1995-11-21 00:00:00.0"))) === "col3 < '1995-11-21 00:00:00.0'")
@@ -445,6 +468,9 @@ class JDBCSuite extends SparkFunSuite
     assert(doCompileFilter(LessThanOrEqual("col0", 5)) === "col0 <= 5")
     assert(doCompileFilter(GreaterThan("col0", 3)) === "col0 > 3")
     assert(doCompileFilter(GreaterThanOrEqual("col0", 3)) === "col0 >= 3")
+    assert(doCompileFilter(In("col1", Array("jkl"))) === "col1 IN ('jkl')")
+    assert(doCompileFilter(Not(In("col1", Array("mno", "pqr"))))
+      === "(NOT (col1 IN ('mno', 'pqr')))")
     assert(doCompileFilter(IsNull("col1")) === "col1 IS NULL")
     assert(doCompileFilter(IsNotNull("col1")) === "col1 IS NOT NULL")
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org