You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ra...@apache.org on 2015/06/11 22:09:06 UTC
[6/6] phoenix git commit: PHOENIX-1968: Should support saving arrays
PHOENIX-1968: Should support saving arrays
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/f7d73496
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/f7d73496
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/f7d73496
Branch: refs/heads/4.x-HBase-1.1
Commit: f7d734966f7172c3bc4a6f0ba31594ba74ee91a1
Parents: bfd860f
Author: ravimagham <ra...@apache.org>
Authored: Thu Jun 11 12:59:48 2015 -0700
Committer: ravimagham <ra...@apache.org>
Committed: Thu Jun 11 12:59:48 2015 -0700
----------------------------------------------------------------------
.../apache/phoenix/spark/PhoenixSparkIT.scala | 21 ++++++++++++++++
.../phoenix/spark/PhoenixRecordWritable.scala | 25 ++++++++++++++++----
2 files changed, 41 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f7d73496/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
index 42e8676..5f256e6 100644
--- a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
+++ b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
@@ -415,4 +415,25 @@ class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll {
results.toList shouldEqual checkResults
}
+
+ test("Can save arrays back to phoenix") {
+ val dataSet = List((2L, Array("String1", "String2", "String3")))
+
+ sc
+ .parallelize(dataSet)
+ .saveToPhoenix(
+ "ARRAY_TEST_TABLE",
+ Seq("ID","VCARRAY"),
+ zkUrl = Some(quorumAddress)
+ )
+
+ // Load the results back
+ val stmt = conn.createStatement()
+ val rs = stmt.executeQuery("SELECT VCARRAY FROM ARRAY_TEST_TABLE WHERE ID = 2")
+ rs.next()
+ val sqlArray = rs.getArray(1).getArray().asInstanceOf[Array[String]]
+
+ // Verify the arrays are equal
+ sqlArray shouldEqual dataSet(0)._2
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f7d73496/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRecordWritable.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRecordWritable.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRecordWritable.scala
index 67e0bd2..3977657 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRecordWritable.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRecordWritable.scala
@@ -16,11 +16,12 @@ package org.apache.phoenix.spark
import java.sql.{PreparedStatement, ResultSet}
import org.apache.hadoop.mapreduce.lib.db.DBWritable
import org.apache.phoenix.mapreduce.util.ColumnInfoToStringEncoderDecoder
-import org.apache.phoenix.schema.types.{PDate, PhoenixArray}
+import org.apache.phoenix.schema.types.{PDataType, PDate, PhoenixArray}
import org.joda.time.DateTime
import scala.collection.{immutable, mutable}
import scala.collection.JavaConversions._
+
class PhoenixRecordWritable(var encodedColumns: String) extends DBWritable {
val upsertValues = mutable.ArrayBuffer[Any]()
val resultMap = mutable.Map[String, AnyRef]()
@@ -44,13 +45,27 @@ class PhoenixRecordWritable(var encodedColumns: String) extends DBWritable {
upsertValues.zip(columns).zipWithIndex.foreach {
case ((v, c), i) => {
if (v != null) {
+
// Both Java and Joda dates used to work in 4.2.3, but now they must be java.sql.Date
+ // Can override any other types here as needed
val (finalObj, finalType) = v match {
- case dt: DateTime => (new java.sql.Date(dt.getMillis), PDate.INSTANCE.getSqlType)
- case d: java.util.Date => (new java.sql.Date(d.getTime), PDate.INSTANCE.getSqlType)
- case _ => (v, c.getSqlType)
+ case dt: DateTime => (new java.sql.Date(dt.getMillis), PDate.INSTANCE)
+ case d: java.util.Date => (new java.sql.Date(d.getTime), PDate.INSTANCE)
+ case _ => (v, c.getPDataType)
+ }
+
+ // Save as array or object
+ finalObj match {
+ case obj: Array[AnyRef] => {
+ // Create a java.sql.Array, need to lookup the base sql type name
+ val sqlArray = statement.getConnection.createArrayOf(
+ PDataType.arrayBaseType(finalType).getSqlTypeName,
+ obj
+ )
+ statement.setArray(i + 1, sqlArray)
+ }
+ case _ => statement.setObject(i + 1, finalObj)
}
- statement.setObject(i + 1, finalObj, finalType)
} else {
statement.setNull(i + 1, c.getSqlType)
}