You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2015/12/11 03:44:10 UTC

[32/52] [abbrv] phoenix git commit: PHOENIX-2469 Problem with arrays in phoenix-spark (Dawid Wysakowicz)

PHOENIX-2469 Problem with arrays in phoenix-spark (Dawid Wysakowicz)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/b8faae52
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/b8faae52
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/b8faae52

Branch: refs/heads/calcite
Commit: b8faae52c6bee91393678e74de09ab8a215da856
Parents: 00ee941
Author: Josh Mahonin <jm...@interset.com>
Authored: Wed Dec 2 09:17:11 2015 -0500
Committer: Josh Mahonin <jm...@interset.com>
Committed: Wed Dec 2 09:17:11 2015 -0500

----------------------------------------------------------------------
 phoenix-spark/pom.xml                           |   2 +-
 phoenix-spark/src/it/resources/setup.sql        |   7 ++
 .../apache/phoenix/spark/PhoenixSparkIT.scala   | 112 ++++++++++++++++++-
 .../phoenix/spark/PhoenixRecordWritable.scala   |  45 +++++---
 4 files changed, 145 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/b8faae52/phoenix-spark/pom.xml
----------------------------------------------------------------------
diff --git a/phoenix-spark/pom.xml b/phoenix-spark/pom.xml
index c0ad626..437f1c3 100644
--- a/phoenix-spark/pom.xml
+++ b/phoenix-spark/pom.xml
@@ -34,7 +34,7 @@
   <name>Phoenix - Spark</name>
 
   <properties>
-    <spark.version>1.5.0</spark.version>
+    <spark.version>1.5.2</spark.version>
     <scala.version>2.10.4</scala.version>
     <scala.binary.version>2.10</scala.binary.version>
     <top.dir>${project.basedir}/..</top.dir>

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b8faae52/phoenix-spark/src/it/resources/setup.sql
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/it/resources/setup.sql b/phoenix-spark/src/it/resources/setup.sql
index 814d311..137d019 100644
--- a/phoenix-spark/src/it/resources/setup.sql
+++ b/phoenix-spark/src/it/resources/setup.sql
@@ -30,6 +30,13 @@ UPSERT INTO "table3" ("id", "col1") VALUES (1, 'foo')
 UPSERT INTO "table3" ("id", "col1") VALUES (2, 'bar')
 CREATE TABLE ARRAY_TEST_TABLE (ID BIGINT NOT NULL PRIMARY KEY, VCARRAY VARCHAR[])
 UPSERT INTO ARRAY_TEST_TABLE (ID, VCARRAY) VALUES (1, ARRAY['String1', 'String2', 'String3'])
+CREATE TABLE ARRAYBUFFER_TEST_TABLE (ID BIGINT NOT NULL PRIMARY KEY, VCARRAY VARCHAR[], INTARRAY INTEGER[])
+UPSERT INTO ARRAYBUFFER_TEST_TABLE (ID, VCARRAY, INTARRAY) VALUES (1, ARRAY['String1', 'String2', 'String3'], ARRAY[1, 2, 3])
+CREATE TABLE ARRAY_ANYVAL_TEST_TABLE (ID BIGINT NOT NULL PRIMARY KEY, INTARRAY INTEGER[], BIGINTARRAY BIGINT[])
+UPSERT INTO ARRAY_ANYVAL_TEST_TABLE (ID, INTARRAY, BIGINTARRAY) VALUES (1, ARRAY[1, 2, 3], ARRAY[1, 2, 3])
+CREATE TABLE ARRAY_BYTE_TEST_TABLE (ID BIGINT NOT NULL PRIMARY KEY, BYTEARRAY TINYINT[])
+UPSERT INTO ARRAY_BYTE_TEST_TABLE (ID, BYTEARRAY) VALUES (1, ARRAY[1, 2, 3])
+CREATE TABLE VARBINARY_TEST_TABLE (ID BIGINT NOT NULL PRIMARY KEY, BIN BINARY(1), VARBIN VARBINARY, BINARRAY BINARY(1)[])
 CREATE TABLE DATE_PREDICATE_TEST_TABLE (ID BIGINT NOT NULL, TIMESERIES_KEY TIMESTAMP NOT NULL CONSTRAINT pk PRIMARY KEY (ID, TIMESERIES_KEY))
 UPSERT INTO DATE_PREDICATE_TEST_TABLE (ID, TIMESERIES_KEY) VALUES (1, CAST(CURRENT_TIME() AS TIMESTAMP))
 CREATE TABLE OUTPUT_TEST_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER, col3 DATE)

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b8faae52/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
index 1a9be6b..e6348f0 100644
--- a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
+++ b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
@@ -16,19 +16,17 @@ package org.apache.phoenix.spark
 import java.sql.{Connection, DriverManager}
 import java.util.Date
 
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants, HBaseTestingUtility}
+import org.apache.hadoop.hbase.{HConstants}
 import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT
 import org.apache.phoenix.query.BaseTest
-import org.apache.phoenix.schema.{TableNotFoundException, ColumnNotFoundException}
+import org.apache.phoenix.schema.{ColumnNotFoundException}
 import org.apache.phoenix.schema.types.PVarchar
 import org.apache.phoenix.util.{SchemaUtil, ColumnInfo}
-import org.apache.spark.sql.{Row, SaveMode, execution, SQLContext}
+import org.apache.spark.sql.{Row, SaveMode, SQLContext}
 import org.apache.spark.sql.types._
 import org.apache.spark.{SparkConf, SparkContext}
 import org.joda.time.DateTime
 import org.scalatest._
-import org.apache.phoenix.spark._
 
 import scala.collection.mutable.ListBuffer
 
@@ -470,7 +468,7 @@ class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll {
 
     df.saveToPhoenix("TABLE2", zkUrl = Some(quorumAddress))
   }
-  
+
   test("Ensure Dataframe supports LIKE and IN filters (PHOENIX-2328)") {
     val sqlContext = new SQLContext(sc)
     val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "TABLE1",
@@ -527,4 +525,106 @@ class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll {
     assert(df.select("COL1").first().getShort(0).toInt == 32767)
     assert(df.select("COL2").first().getByte(0).toInt == 127)
   }
+
+  test("Can save arrays from custom dataframes back to phoenix") {
+    val dataSet = List(Row(2L, Array("String1", "String2", "String3"), Array(1, 2, 3)))
+
+    val sqlContext = new SQLContext(sc)
+
+    val schema = StructType(
+      Seq(StructField("ID", LongType, nullable = false),
+        StructField("VCARRAY", ArrayType(StringType)),
+        StructField("INTARRAY", ArrayType(IntegerType))))
+
+    val rowRDD = sc.parallelize(dataSet)
+
+    // Apply the schema to the RDD.
+    val df = sqlContext.createDataFrame(rowRDD, schema)
+
+    df.write
+      .format("org.apache.phoenix.spark")
+      .options(Map("table" -> "ARRAYBUFFER_TEST_TABLE", "zkUrl" -> quorumAddress))
+      .mode(SaveMode.Overwrite)
+      .save()
+
+    // Load the results back
+    val stmt = conn.createStatement()
+    val rs = stmt.executeQuery("SELECT VCARRAY, INTARRAY FROM ARRAYBUFFER_TEST_TABLE WHERE ID = 2")
+    rs.next()
+    val stringArray = rs.getArray(1).getArray().asInstanceOf[Array[String]]
+    val intArray = rs.getArray(2).getArray().asInstanceOf[Array[Int]]
+
+    // Verify the arrays are equal
+    stringArray shouldEqual dataSet(0).getAs[Array[String]](1)
+    intArray shouldEqual dataSet(0).getAs[Array[Int]](2)
+  }
+
+  test("Can save arrays of AnyVal type back to phoenix") {
+    val dataSet = List((2L, Array(1, 2, 3), Array(1L, 2L, 3L)))
+
+    sc
+      .parallelize(dataSet)
+      .saveToPhoenix(
+        "ARRAY_ANYVAL_TEST_TABLE",
+        Seq("ID", "INTARRAY", "BIGINTARRAY"),
+        zkUrl = Some(quorumAddress)
+      )
+
+    // Load the results back
+    val stmt = conn.createStatement()
+    val rs = stmt.executeQuery("SELECT INTARRAY, BIGINTARRAY FROM ARRAY_ANYVAL_TEST_TABLE WHERE ID = 2")
+    rs.next()
+    val intArray = rs.getArray(1).getArray().asInstanceOf[Array[Int]]
+    val longArray = rs.getArray(2).getArray().asInstanceOf[Array[Long]]
+
+    // Verify the arrays are equal
+    intArray shouldEqual dataSet(0)._2
+    longArray shouldEqual dataSet(0)._3
+  }
+
+  test("Can save arrays of Byte type back to phoenix") {
+    val dataSet = List((2L, Array(1.toByte, 2.toByte, 3.toByte)))
+
+    sc
+      .parallelize(dataSet)
+      .saveToPhoenix(
+        "ARRAY_BYTE_TEST_TABLE",
+        Seq("ID", "BYTEARRAY"),
+        zkUrl = Some(quorumAddress)
+      )
+
+    // Load the results back
+    val stmt = conn.createStatement()
+    val rs = stmt.executeQuery("SELECT BYTEARRAY FROM ARRAY_BYTE_TEST_TABLE WHERE ID = 2")
+    rs.next()
+    val byteArray = rs.getArray(1).getArray().asInstanceOf[Array[Byte]]
+
+    // Verify the arrays are equal
+    byteArray shouldEqual dataSet(0)._2
+  }
+
+  test("Can save binary types back to phoenix") {
+    val dataSet = List((2L, Array[Byte](1), Array[Byte](1,2,3), Array[Array[Byte]](Array[Byte](1), Array[Byte](2))))
+
+    sc
+      .parallelize(dataSet)
+      .saveToPhoenix(
+        "VARBINARY_TEST_TABLE",
+        Seq("ID", "BIN", "VARBIN", "BINARRAY"),
+        zkUrl = Some(quorumAddress)
+      )
+
+    // Load the results back
+    val stmt = conn.createStatement()
+    val rs = stmt.executeQuery("SELECT BIN, VARBIN, BINARRAY FROM VARBINARY_TEST_TABLE WHERE ID = 2")
+    rs.next()
+    val byte = rs.getBytes("BIN")
+    val varByte = rs.getBytes("VARBIN")
+    val varByteArray = rs.getArray("BINARRAY").getArray().asInstanceOf[Array[Array[Byte]]]
+
+    // Verify the arrays are equal
+    byte shouldEqual dataSet(0)._2
+    varByte shouldEqual dataSet(0)._3
+    varByteArray shouldEqual dataSet(0)._4
+  }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b8faae52/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRecordWritable.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRecordWritable.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRecordWritable.scala
index f11f9cc..c35cc54 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRecordWritable.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRecordWritable.scala
@@ -14,13 +14,11 @@
 package org.apache.phoenix.spark
 
 import java.sql.{PreparedStatement, ResultSet}
-import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapreduce.lib.db.DBWritable
-import org.apache.phoenix.schema.types.{PDataType, PDate, PhoenixArray}
+import org.apache.phoenix.schema.types._
 import org.apache.phoenix.util.ColumnInfo
 import org.joda.time.DateTime
-import scala.collection.{immutable, mutable}
-import scala.collection.JavaConversions._
+import scala.collection.{mutable, immutable}
 
 
 class PhoenixRecordWritable(columnMetaDataList: List[ColumnInfo]) extends DBWritable {
@@ -52,16 +50,35 @@ class PhoenixRecordWritable(columnMetaDataList: List[ColumnInfo]) extends DBWrit
             case _ => (v, c.getPDataType)
           }
 
-          // Save as array or object
-          finalObj match {
-            case obj: Array[AnyRef] => {
-              // Create a java.sql.Array, need to lookup the base sql type name
-              val sqlArray = statement.getConnection.createArrayOf(
-                PDataType.arrayBaseType(finalType).getSqlTypeName,
-                obj
-              )
-              statement.setArray(i + 1, sqlArray)
-            }
+
+          // Helper method to create an SQL array for a specific PDatatype, and set it on the statement
+          def setArrayInStatement(obj: Array[AnyRef]): Unit = {
+            // Create a java.sql.Array, need to lookup the base sql type name
+            val sqlArray = statement.getConnection.createArrayOf(
+              PDataType.arrayBaseType(finalType).getSqlTypeName,
+              obj
+            )
+            statement.setArray(i + 1, sqlArray)
+          }
+
+          // Determine whether to save as an array or object
+          (finalObj, finalType) match {
+            case (obj: Array[AnyRef], _) => setArrayInStatement(obj)
+            case (obj: mutable.ArrayBuffer[AnyVal], _) => setArrayInStatement(obj.map(_.asInstanceOf[AnyRef]).toArray)
+            case (obj: mutable.ArrayBuffer[AnyRef], _) => setArrayInStatement(obj.toArray)
+            case (obj: mutable.WrappedArray[AnyVal], _) => setArrayInStatement(obj.map(_.asInstanceOf[AnyRef]).toArray)
+            case (obj: mutable.WrappedArray[AnyRef], _) => setArrayInStatement(obj.toArray)
+            case (obj: Array[Int], _) => setArrayInStatement(obj.map(_.asInstanceOf[AnyRef]))
+            case (obj: Array[Long], _) => setArrayInStatement(obj.map(_.asInstanceOf[AnyRef]))
+            case (obj: Array[Char], _) => setArrayInStatement(obj.map(_.asInstanceOf[AnyRef]))
+            case (obj: Array[Short], _) => setArrayInStatement(obj.map(_.asInstanceOf[AnyRef]))
+            case (obj: Array[Float], _) => setArrayInStatement(obj.map(_.asInstanceOf[AnyRef]))
+            case (obj: Array[Double], _) => setArrayInStatement(obj.map(_.asInstanceOf[AnyRef]))
+            // PVarbinary and PBinary come in as Array[Byte] but they're SQL objects
+            case (obj: Array[Byte], _ : PVarbinary) => statement.setObject(i + 1, obj)
+            case (obj: Array[Byte], _ : PBinary) => statement.setObject(i + 1, obj)
+            // Otherwise set as array type
+            case (obj: Array[Byte], _) => setArrayInStatement(obj.map(_.asInstanceOf[AnyRef]))
             case _ => statement.setObject(i + 1, finalObj)
           }
         } else {