You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by jm...@apache.org on 2015/10/18 17:51:43 UTC

phoenix git commit: PHOENIX-2328 "Unsupported filter" errors when using Spark DataFrame API

Repository: phoenix
Updated Branches:
  refs/heads/master b68521a4e -> 2a4c3c974


PHOENIX-2328 "Unsupported filter" errors when using Spark DataFrame API


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/2a4c3c97
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/2a4c3c97
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/2a4c3c97

Branch: refs/heads/master
Commit: 2a4c3c974f9bcd70dadfd2a3913539d1417eb2e2
Parents: b68521a
Author: Josh Mahonin <jm...@gmail.com>
Authored: Sun Oct 18 11:43:38 2015 -0400
Committer: Josh Mahonin <jm...@gmail.com>
Committed: Sun Oct 18 11:44:43 2015 -0400

----------------------------------------------------------------------
 .../apache/phoenix/spark/PhoenixSparkIT.scala   | 45 ++++++++++++++++++++
 .../apache/phoenix/spark/PhoenixRelation.scala  | 10 ++++-
 2 files changed, 54 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/2a4c3c97/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
index f610d44..7f97cc7 100644
--- a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
+++ b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
@@ -466,6 +466,51 @@ class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll {
 
     df.saveToPhoenix("TABLE2", zkUrl = Some(quorumAddress))
   }
+  
+  test("Ensure Dataframe supports LIKE and IN filters (PHOENIX-2328)") {
+    val sqlContext = new SQLContext(sc)
+    val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "TABLE1",
+      "zkUrl" -> quorumAddress))
+
+    // Prefix match
+    val res1 = df.filter("COL1 like 'test_row_%'")
+    res1.count() shouldEqual 2
+
+    // Suffix match
+    val res2 = df.filter("COL1 like '%_1'")
+    res2.count() shouldEqual 1
+    res2.first.getString(1) shouldEqual "test_row_1"
+
+    // Infix match
+    val res3 = df.filter("COL1 like '%_row_%'")
+    res3.count() shouldEqual 2
+
+    // Not like, match none
+    val res4 = df.filter("COL1 not like '%_row_%'")
+    res4.count() shouldEqual 0
+
+    // Not like, match all
+    val res5 = df.filter("COL1 not like '%_wor_%'")
+    res5.count() shouldEqual 2
+
+    // "IN", match all
+    val res6 = df.filter("COL1 in ('test_row_1', 'test_row_2')")
+    res6.count() shouldEqual 2
+
+    // "IN", match none
+    val res7 = df.filter("COL1 in ('foo', 'bar')")
+    res7.count() shouldEqual 0
+
+    // AND (and not again)
+    val res8 = df.filter("COL1 like '%_row_%' AND COL1 not like '%_1'")
+    res8.count() shouldEqual 1
+    res8.first.getString(1) shouldEqual "test_row_2"
+
+    // OR
+    val res9 = df.filter("COL1 like '%_1' OR COL1 like '%_2'")
+    res9.count() shouldEqual 2
+  }
+
 
   // We can load the type, but it defaults to Spark's default (precision 38, scale 10)
   ignore("Can load decimal types with accurate precision and scale (PHOENIX-2288)") {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2a4c3c97/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala
index eb347f6..3b660f9 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala
@@ -68,11 +68,16 @@ case class PhoenixRelation(tableName: String, zkUrl: String)(@transient val sqlC
     var i = 0
 
     filters.foreach(f => {
+      // Assume conjunction for multiple filters, unless otherwise specified
       if (i > 0) {
         filter.append(" AND")
       }
 
       f match {
+        // Spark 1.3.1+ supported filters
+        case And(leftFilter, rightFilter) => filter.append(buildFilter(Array(leftFilter, rightFilter)))
+        case Or(leftFilter, rightFilter) => filter.append(buildFilter(Array(leftFilter)) + " OR " + buildFilter(Array(rightFilter)))
+        case Not(aFilter) => filter.append(" NOT " + buildFilter(Array(aFilter)))
         case EqualTo(attr, value) => filter.append(s" $attr = ${compileValue(value)}")
         case GreaterThan(attr, value) => filter.append(s" $attr > ${compileValue(value)}")
         case GreaterThanOrEqual(attr, value) => filter.append(s" $attr >= ${compileValue(value)}")
@@ -80,7 +85,10 @@ case class PhoenixRelation(tableName: String, zkUrl: String)(@transient val sqlC
         case LessThanOrEqual(attr, value) => filter.append(s" $attr <= ${compileValue(value)}")
         case IsNull(attr) => filter.append(s" $attr IS NULL")
         case IsNotNull(attr) => filter.append(s" $attr IS NOT NULL")
-        case _ => throw new Exception("Unsupported filter")
+        case In(attr, values) => filter.append(s" $attr IN ${values.map(compileValue).mkString("(", ",", ")")}")
+        case StringStartsWith(attr, value) => filter.append(s" $attr LIKE ${compileValue(value + "%")}")
+        case StringEndsWith(attr, value) => filter.append(s" $attr LIKE ${compileValue("%" + value)}")
+        case StringContains(attr, value) => filter.append(s" $attr LIKE ${compileValue("%" + value + "%")}")
       }
 
       i = i + 1