You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by jm...@apache.org on 2015/10/18 17:51:43 UTC
phoenix git commit: PHOENIX-2328 "Unsupported filter" errors when
using Spark DataFrame API
Repository: phoenix
Updated Branches:
refs/heads/master b68521a4e -> 2a4c3c974
PHOENIX-2328 "Unsupported filter" errors when using Spark DataFrame API
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/2a4c3c97
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/2a4c3c97
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/2a4c3c97
Branch: refs/heads/master
Commit: 2a4c3c974f9bcd70dadfd2a3913539d1417eb2e2
Parents: b68521a
Author: Josh Mahonin <jm...@gmail.com>
Authored: Sun Oct 18 11:43:38 2015 -0400
Committer: Josh Mahonin <jm...@gmail.com>
Committed: Sun Oct 18 11:44:43 2015 -0400
----------------------------------------------------------------------
.../apache/phoenix/spark/PhoenixSparkIT.scala | 45 ++++++++++++++++++++
.../apache/phoenix/spark/PhoenixRelation.scala | 10 ++++-
2 files changed, 54 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2a4c3c97/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
index f610d44..7f97cc7 100644
--- a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
+++ b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
@@ -466,6 +466,51 @@ class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll {
df.saveToPhoenix("TABLE2", zkUrl = Some(quorumAddress))
}
+
+ test("Ensure Dataframe supports LIKE and IN filters (PHOENIX-2328)") {
+ val sqlContext = new SQLContext(sc)
+ val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "TABLE1",
+ "zkUrl" -> quorumAddress))
+
+ // Prefix match
+ val res1 = df.filter("COL1 like 'test_row_%'")
+ res1.count() shouldEqual 2
+
+ // Suffix match
+ val res2 = df.filter("COL1 like '%_1'")
+ res2.count() shouldEqual 1
+ res2.first.getString(1) shouldEqual "test_row_1"
+
+ // Infix match
+ val res3 = df.filter("COL1 like '%_row_%'")
+ res3.count() shouldEqual 2
+
+ // Not like, match none
+ val res4 = df.filter("COL1 not like '%_row_%'")
+ res4.count() shouldEqual 0
+
+ // Not like, match all
+ val res5 = df.filter("COL1 not like '%_wor_%'")
+ res5.count() shouldEqual 2
+
+ // "IN", match all
+ val res6 = df.filter("COL1 in ('test_row_1', 'test_row_2')")
+ res6.count() shouldEqual 2
+
+ // "IN", match none
+ val res7 = df.filter("COL1 in ('foo', 'bar')")
+ res7.count() shouldEqual 0
+
+ // AND (and not again)
+ val res8 = df.filter("COL1 like '%_row_%' AND COL1 not like '%_1'")
+ res8.count() shouldEqual 1
+ res8.first.getString(1) shouldEqual "test_row_2"
+
+ // OR
+ val res9 = df.filter("COL1 like '%_1' OR COL1 like '%_2'")
+ res9.count() shouldEqual 2
+ }
+
// We can load the type, but it defaults to Spark's default (precision 38, scale 10)
ignore("Can load decimal types with accurate precision and scale (PHOENIX-2288)") {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2a4c3c97/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala
index eb347f6..3b660f9 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala
@@ -68,11 +68,16 @@ case class PhoenixRelation(tableName: String, zkUrl: String)(@transient val sqlC
var i = 0
filters.foreach(f => {
+ // Assume conjunction for multiple filters, unless otherwise specified
if (i > 0) {
filter.append(" AND")
}
f match {
+ // Spark 1.3.1+ supported filters
+ case And(leftFilter, rightFilter) => filter.append(buildFilter(Array(leftFilter, rightFilter)))
+ case Or(leftFilter, rightFilter) => filter.append(buildFilter(Array(leftFilter)) + " OR " + buildFilter(Array(rightFilter)))
+ case Not(aFilter) => filter.append(" NOT " + buildFilter(Array(aFilter)))
case EqualTo(attr, value) => filter.append(s" $attr = ${compileValue(value)}")
case GreaterThan(attr, value) => filter.append(s" $attr > ${compileValue(value)}")
case GreaterThanOrEqual(attr, value) => filter.append(s" $attr >= ${compileValue(value)}")
@@ -80,7 +85,10 @@ case class PhoenixRelation(tableName: String, zkUrl: String)(@transient val sqlC
case LessThanOrEqual(attr, value) => filter.append(s" $attr <= ${compileValue(value)}")
case IsNull(attr) => filter.append(s" $attr IS NULL")
case IsNotNull(attr) => filter.append(s" $attr IS NOT NULL")
- case _ => throw new Exception("Unsupported filter")
+ case In(attr, values) => filter.append(s" $attr IN ${values.map(compileValue).mkString("(", ",", ")")}")
+ case StringStartsWith(attr, value) => filter.append(s" $attr LIKE ${compileValue(value + "%")}")
+ case StringEndsWith(attr, value) => filter.append(s" $attr LIKE ${compileValue("%" + value)}")
+ case StringContains(attr, value) => filter.append(s" $attr LIKE ${compileValue("%" + value + "%")}")
}
i = i + 1