You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by an...@apache.org on 2017/04/21 06:19:04 UTC
[2/3] phoenix git commit: PHOENIX-3792 Provide way to skip
normalization of column names in phoenix-spark integration
PHOENIX-3792 Provide way to skip normalization of column names in phoenix-spark integration
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/90e32c01
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/90e32c01
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/90e32c01
Branch: refs/heads/master
Commit: 90e32c015207b39330ed7496db7a73dbc7b634f4
Parents: 28af89c
Author: Ankit Singhal <an...@gmail.com>
Authored: Fri Apr 21 11:48:16 2017 +0530
Committer: Ankit Singhal <an...@gmail.com>
Committed: Fri Apr 21 11:48:16 2017 +0530
----------------------------------------------------------------------
phoenix-spark/src/it/resources/globalSetup.sql | 1 +
.../apache/phoenix/spark/PhoenixSparkIT.scala | 27 ++++++++++++++++++--
.../phoenix/spark/DataFrameFunctions.scala | 19 +++++++++++---
.../apache/phoenix/spark/DefaultSource.scala | 2 +-
4 files changed, 42 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/90e32c01/phoenix-spark/src/it/resources/globalSetup.sql
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/it/resources/globalSetup.sql b/phoenix-spark/src/it/resources/globalSetup.sql
index dc24da7..7ac0039 100644
--- a/phoenix-spark/src/it/resources/globalSetup.sql
+++ b/phoenix-spark/src/it/resources/globalSetup.sql
@@ -17,6 +17,7 @@
CREATE TABLE table1 (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR)
CREATE TABLE table1_copy (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR)
CREATE TABLE table2 (id BIGINT NOT NULL PRIMARY KEY, table1_id BIGINT, "t2col1" VARCHAR)
+CREATE TABLE table3 (id BIGINT NOT NULL PRIMARY KEY, table3_id BIGINT, "t2col1" VARCHAR)
UPSERT INTO table1 (id, col1) VALUES (1, 'test_row_1')
UPSERT INTO table2 (id, table1_id, "t2col1") VALUES (1, 1, 'test_child_1')
UPSERT INTO table2 (id, table1_id, "t2col1") VALUES (2, 1, 'test_child_2')
http://git-wip-us.apache.org/repos/asf/phoenix/blob/90e32c01/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
index d53b5ee..b8e44fe 100644
--- a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
+++ b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
@@ -20,15 +20,38 @@ import org.apache.phoenix.util.{ColumnInfo, SchemaUtil}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SQLContext, SaveMode}
import org.joda.time.DateTime
-
+import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable.ListBuffer
-
+import org.apache.hadoop.conf.Configuration
/**
* Note: If running directly from an IDE, these are the recommended VM parameters:
* -Xmx1536m -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m
*/
class PhoenixSparkIT extends AbstractPhoenixSparkIT {
+ test("Can persist data with case senstive columns (like in avro schema) using 'DataFrame.saveToPhoenix'") {
+ val sqlContext = new SQLContext(sc)
+ val df = sqlContext.createDataFrame(
+ Seq(
+ (1, 1, "test_child_1"),
+ (2, 1, "test_child_2"))).toDF("ID", "TABLE3_ID", "t2col1")
+ df.saveToPhoenix("TABLE3", zkUrl = Some(quorumAddress),skipNormalizingIdentifier=true)
+
+ // Verify results
+ val stmt = conn.createStatement()
+ val rs = stmt.executeQuery("SELECT * FROM TABLE3")
+
+ val checkResults = List((1, 1, "test_child_1"), (2, 1, "test_child_2"))
+ val results = ListBuffer[(Long, Long, String)]()
+ while (rs.next()) {
+ results.append((rs.getLong(1), rs.getLong(2), rs.getString(3)))
+ }
+ stmt.close()
+
+ results.toList shouldEqual checkResults
+
+ }
+
test("Can convert Phoenix schema") {
val phoenixSchema = List(
new ColumnInfo("varcharColumn", PVarchar.INSTANCE.getSqlType)
http://git-wip-us.apache.org/repos/asf/phoenix/blob/90e32c01/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala
index ddf4fab..92f4c58 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala
@@ -24,13 +24,16 @@ import scala.collection.JavaConversions._
class DataFrameFunctions(data: DataFrame) extends Serializable {
-
+ def saveToPhoenix(parameters: Map[String, String]): Unit = {
+ saveToPhoenix(parameters("table"), zkUrl = parameters.get("zkUrl"), tenantId = parameters.get("TenantId"),
+ skipNormalizingIdentifier=parameters.contains("skipNormalizingIdentifier"))
+ }
def saveToPhoenix(tableName: String, conf: Configuration = new Configuration,
- zkUrl: Option[String] = None, tenantId: Option[String] = None): Unit = {
-
+ zkUrl: Option[String] = None, tenantId: Option[String] = None, skipNormalizingIdentifier: Boolean = false): Unit = {
// Retrieve the schema field names and normalize to Phoenix, need to do this outside of mapPartitions
- val fieldArray = data.schema.fieldNames.map(x => SchemaUtil.normalizeIdentifier(x))
+ val fieldArray = getFieldArray(skipNormalizingIdentifier, data)
+
// Create a configuration object to use for saving
@transient val outConfig = ConfigurationUtil.getOutputConfiguration(tableName, fieldArray, zkUrl, tenantId, Some(conf))
@@ -61,4 +64,12 @@ class DataFrameFunctions(data: DataFrame) extends Serializable {
outConfig
)
}
+
+ def getFieldArray(skipNormalizingIdentifier: Boolean = false, data: DataFrame) = {
+ if (skipNormalizingIdentifier) {
+ data.schema.fieldNames.map(x => x)
+ } else {
+ data.schema.fieldNames.map(x => SchemaUtil.normalizeIdentifier(x))
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/90e32c01/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DefaultSource.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DefaultSource.scala
index 743d196..e000b74 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DefaultSource.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DefaultSource.scala
@@ -44,7 +44,7 @@ class DefaultSource extends RelationProvider with CreatableRelationProvider {
verifyParameters(parameters)
// Save the DataFrame to Phoenix
- data.saveToPhoenix(parameters("table"), zkUrl = parameters.get("zkUrl"), tenantId = parameters.get("TenantId"))
+ data.saveToPhoenix(parameters)
// Return a relation of the saved data
createRelation(sqlContext, parameters)