You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by jm...@apache.org on 2015/07/07 01:42:12 UTC
phoenix git commit: PHOENIX-2036 PhoenixConfigurationUtil should
provide a pre-normalize table name to PhoenixRuntime
Repository: phoenix
Updated Branches:
refs/heads/master 1e606d579 -> 39c982f92
PHOENIX-2036 PhoenixConfigurationUtil should provide a pre-normalize table name to PhoenixRuntime
Update phoenix-spark to follow the same normalization requirement.
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/39c982f9
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/39c982f9
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/39c982f9
Branch: refs/heads/master
Commit: 39c982f923033b97c477464d0c4e27221421774d
Parents: 1e606d5
Author: Josh Mahonin <jm...@gmail.com>
Authored: Mon Jul 6 19:39:31 2015 -0400
Committer: Josh Mahonin <jm...@apache.org>
Committed: Mon Jul 6 19:41:38 2015 -0400
----------------------------------------------------------------------
phoenix-spark/src/it/resources/setup.sql | 4 +-
.../apache/phoenix/spark/PhoenixSparkIT.scala | 58 ++++++++++++--------
.../org/apache/phoenix/spark/PhoenixRDD.scala | 24 +++-----
3 files changed, 46 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/39c982f9/phoenix-spark/src/it/resources/setup.sql
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/it/resources/setup.sql b/phoenix-spark/src/it/resources/setup.sql
index 40157a2..154a996 100644
--- a/phoenix-spark/src/it/resources/setup.sql
+++ b/phoenix-spark/src/it/resources/setup.sql
@@ -32,4 +32,6 @@ CREATE TABLE ARRAY_TEST_TABLE (ID BIGINT NOT NULL PRIMARY KEY, VCARRAY VARCHAR[]
UPSERT INTO ARRAY_TEST_TABLE (ID, VCARRAY) VALUES (1, ARRAY['String1', 'String2', 'String3'])
CREATE TABLE DATE_PREDICATE_TEST_TABLE (ID BIGINT NOT NULL, TIMESERIES_KEY TIMESTAMP NOT NULL CONSTRAINT pk PRIMARY KEY (ID, TIMESERIES_KEY))
UPSERT INTO DATE_PREDICATE_TEST_TABLE (ID, TIMESERIES_KEY) VALUES (1, CAST(CURRENT_TIME() AS TIMESTAMP))
-CREATE TABLE OUTPUT_TEST_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER, col3 DATE)
\ No newline at end of file
+CREATE TABLE OUTPUT_TEST_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER, col3 DATE)
+CREATE TABLE CUSTOM_ENTITY."z02"(id BIGINT NOT NULL PRIMARY KEY)
+UPSERT INTO CUSTOM_ENTITY."z02" (id) VALUES(1)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/39c982f9/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
index 5f256e6..e1c9df4 100644
--- a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
+++ b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
@@ -20,9 +20,9 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants, HBaseTestingUtility}
import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT
import org.apache.phoenix.query.BaseTest
-import org.apache.phoenix.schema.ColumnNotFoundException
+import org.apache.phoenix.schema.{TableNotFoundException, ColumnNotFoundException}
import org.apache.phoenix.schema.types.PVarchar
-import org.apache.phoenix.util.ColumnInfo
+import org.apache.phoenix.util.{SchemaUtil, ColumnInfo}
import org.apache.spark.sql.{SaveMode, execution, SQLContext}
import org.apache.spark.sql.types.{LongType, DataType, StringType, StructField}
import org.apache.spark.{SparkConf, SparkContext}
@@ -96,23 +96,6 @@ class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll {
PhoenixSparkITHelper.doTeardown
}
- def buildSql(table: String, columns: Seq[String], predicate: Option[String]): String = {
- val query = "SELECT %s FROM \"%s\"" format(columns.map(f => "\"" + f + "\"").mkString(", "), table)
-
- query + (predicate match {
- case Some(p: String) => " WHERE " + p
- case _ => ""
- })
- }
-
- test("Can create valid SQL") {
- val rdd = new PhoenixRDD(sc, "MyTable", Array("Foo", "Bar"),
- conf = hbaseConfiguration)
-
- rdd.buildSql("MyTable", Array("Foo", "Bar"), None) should
- equal("SELECT \"Foo\", \"Bar\" FROM \"MyTable\"")
- }
-
test("Can convert Phoenix schema") {
val phoenixSchema = List(
new ColumnInfo("varcharColumn", PVarchar.INSTANCE.getSqlType)
@@ -154,7 +137,9 @@ class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll {
val sqlContext = new SQLContext(sc)
- val df1 = sqlContext.phoenixTableAsDataFrame("table3", Array("id", "col1"),
+ val df1 = sqlContext.phoenixTableAsDataFrame(
+ SchemaUtil.getEscapedArgument("table3"),
+ Array("id", "col1"),
zkUrl = Some(quorumAddress))
df1.registerTempTable("table3")
@@ -191,10 +176,12 @@ class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll {
}
test("Using a predicate referring to a non-existent column should fail") {
- intercept[RuntimeException] {
+ intercept[Exception] {
val sqlContext = new SQLContext(sc)
- val df1 = sqlContext.phoenixTableAsDataFrame("table3", Array("id", "col1"),
+ val df1 = sqlContext.phoenixTableAsDataFrame(
+ SchemaUtil.getEscapedArgument("table3"),
+ Array("id", "col1"),
predicate = Some("foo = bar"),
conf = hbaseConfiguration)
@@ -210,7 +197,9 @@ class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll {
test("Can create schema RDD with predicate that will never match") {
val sqlContext = new SQLContext(sc)
- val df1 = sqlContext.phoenixTableAsDataFrame("table3", Array("id", "col1"),
+ val df1 = sqlContext.phoenixTableAsDataFrame(
+ SchemaUtil.getEscapedArgument("table3"),
+ Array("id", "col1"),
predicate = Some("\"id\" = -1"),
conf = hbaseConfiguration)
@@ -436,4 +425,27 @@ class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll {
// Verify the arrays are equal
sqlArray shouldEqual dataSet(0)._2
}
+
+ test("Can read from table with schema and escaped table name") {
+ // Manually escape
+ val rdd1 = sc.phoenixTableAsRDD(
+ "CUSTOM_ENTITY.\"z02\"",
+ Seq("ID"),
+ conf = hbaseConfiguration)
+
+ var count = rdd1.count()
+
+ count shouldEqual 1L
+
+ // Use SchemaUtil
+ val rdd2 = sc.phoenixTableAsRDD(
+ SchemaUtil.getEscapedFullTableName("CUSTOM_ENTITY.z02"),
+ Seq("ID"),
+ conf = hbaseConfiguration)
+
+ count = rdd2.count()
+
+ count shouldEqual 1L
+
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/39c982f9/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala
index 773026d..427fb24 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala
@@ -63,30 +63,22 @@ class PhoenixRDD(sc: SparkContext, table: String, columns: Seq[String],
}
}
- def buildSql(table: String, columns: Seq[String], predicate: Option[String]): String = {
- val query = "SELECT %s FROM \"%s\"".format(
- if (columns.isEmpty) "*" else columns.map(f => "\"" + f + "\"").mkString(", "),
- table
- )
-
- query + (predicate match {
- case Some(p: String) if p.length > 0 => " WHERE " + p
- case _ => ""
- })
- }
-
def getPhoenixConfiguration: Configuration = {
// This is just simply not serializable, so don't try, but clone it because
// PhoenixConfigurationUtil mutates it.
val config = HBaseConfiguration.create(conf)
- PhoenixConfigurationUtil.setInputQuery(config, buildSql(table, columns, predicate))
+ PhoenixConfigurationUtil.setInputClass(config, classOf[PhoenixRecordWritable])
+ PhoenixConfigurationUtil.setInputTableName(config, table)
+
if(!columns.isEmpty) {
- PhoenixConfigurationUtil.setSelectColumnNames(config, columns.mkString(","))
+ PhoenixConfigurationUtil.setSelectColumnNames(config, columns.toArray)
+ }
+
+ if(predicate.isDefined) {
+ PhoenixConfigurationUtil.setInputTableConditions(config, predicate.get)
}
- PhoenixConfigurationUtil.setInputTableName(config, table)
- PhoenixConfigurationUtil.setInputClass(config, classOf[PhoenixRecordWritable])
// Override the Zookeeper URL if present. Throw exception if no address given.
zkUrl match {