You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by jm...@apache.org on 2016/05/15 15:32:50 UTC

phoenix git commit: PHOENIX-2784 phoenix-spark: Allow coercion of DATE to TIMESTAMP for DataFrame

Repository: phoenix
Updated Branches:
  refs/heads/master ae14e38cc -> 98e783cf6


PHOENIX-2784 phoenix-spark: Allow coercion of DATE to TIMESTAMP for DataFrame


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/98e783cf
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/98e783cf
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/98e783cf

Branch: refs/heads/master
Commit: 98e783cf6d7b4644660f48961795699d5374ce3f
Parents: ae14e38
Author: Josh Mahonin <jm...@gmail.com>
Authored: Sun May 15 11:30:32 2016 -0400
Committer: Josh Mahonin <jm...@gmail.com>
Committed: Sun May 15 11:30:32 2016 -0400

----------------------------------------------------------------------
 .../apache/phoenix/spark/PhoenixSparkIT.scala   | 18 ++++++++++---
 .../apache/phoenix/spark/DefaultSource.scala    |  3 ++-
 .../org/apache/phoenix/spark/PhoenixRDD.scala   | 27 ++++++++++++++------
 .../apache/phoenix/spark/PhoenixRelation.scala  |  8 +++---
 4 files changed, 41 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/98e783cf/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
index ad4791d..941c192 100644
--- a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
+++ b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
@@ -632,12 +632,24 @@ class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll {
     val dt = df.select("COL1").first().getDate(0).getTime
     val epoch = new Date().getTime
 
-    // NOTE: Spark DateType drops hour, minute, second, as per the java.sql.Date spec. Unfortunately if you want to
-    // read the full date row, you need to use the RDD integration instead. In the future we could force the schema
-    // converter to cast the date as a timestamp instead...
+    // NOTE: Spark DateType drops hour, minute, second, as per the java.sql.Date spec
+    // Use 'dateAsTimestamp' option to coerce DATE to TIMESTAMP without losing resolution
 
     // Note that Spark also applies the timezone offset to the returned date epoch. Rather than perform timezone
     // gymnastics, just make sure we're within 24H of the epoch generated just now
     assert(Math.abs(epoch - dt) < 86400000)
   }
+
+  test("Can coerce Phoenix DATE columns to TIMESTAMP through DataFrame API") {
+    val sqlContext = new SQLContext(sc)
+    val df = sqlContext.read
+      .format("org.apache.phoenix.spark")
+      .options(Map("table" -> "DATE_TEST", "zkUrl" -> quorumAddress, "dateAsTimestamp" -> "true"))
+      .load
+    val dtRes = df.select("COL1").first()
+    val ts = dtRes.getTimestamp(0).getTime
+    val epoch = new Date().getTime
+
+    assert(Math.abs(epoch - ts) < 300000)
+  }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/98e783cf/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DefaultSource.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DefaultSource.scala
index 6327ec2..15d1944 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DefaultSource.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DefaultSource.scala
@@ -29,7 +29,8 @@ class DefaultSource extends RelationProvider with CreatableRelationProvider {
 
     new PhoenixRelation(
       parameters("table"),
-      parameters("zkUrl")
+      parameters("zkUrl"),
+      parameters.contains("dateAsTimestamp")
     )(sqlContext)
   }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/98e783cf/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala
index 6560fd3..cfaefa1 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala
@@ -33,7 +33,7 @@ import scala.collection.JavaConverters._
 
 class PhoenixRDD(sc: SparkContext, table: String, columns: Seq[String],
                  predicate: Option[String] = None, zkUrl: Option[String] = None,
-                 @transient conf: Configuration)
+                 @transient conf: Configuration, dateAsTimestamp: Boolean = false)
   extends RDD[PhoenixRecordWritable](sc, Nil) with Logging {
 
   // Make sure to register the Phoenix driver
@@ -102,23 +102,33 @@ class PhoenixRDD(sc: SparkContext, table: String, columns: Seq[String],
 
   // Convert our PhoenixRDD to a DataFrame
   def toDataFrame(sqlContext: SQLContext): DataFrame = {
-    val columnList = PhoenixConfigurationUtil
+    val columnInfoList = PhoenixConfigurationUtil
       .getSelectColumnMetadataList(new Configuration(phoenixConf))
       .asScala
 
-    val columnNames: Seq[String] = columnList.map(ci => {
-      ci.getDisplayName
+    // Keep track of the sql type and column names.
+    val columns: Seq[(String, Int)] = columnInfoList.map(ci => {
+      (ci.getDisplayName, ci.getSqlType)
     })
 
+
     // Lookup the Spark catalyst types from the Phoenix schema
-    val structFields = phoenixSchemaToCatalystSchema(columnList).toArray
+    val structFields = phoenixSchemaToCatalystSchema(columnInfoList).toArray
 
     // Create the data frame from the converted Spark schema
     sqlContext.createDataFrame(map(pr => {
 
       // Create a sequence of column data
-      val rowSeq = columnNames.map { name =>
-        pr.resultMap(name)
+      val rowSeq = columns.map { case (name, sqlType) =>
+        val res = pr.resultMap(name)
+
+        // Special handling for data types
+        if(dateAsTimestamp && sqlType == 91) { // 91 is the defined type for Date
+          new java.sql.Timestamp(res.asInstanceOf[java.sql.Date].getTime)
+        }
+        else {
+          res
+        }
       }
 
       // Create a Spark Row from the sequence
@@ -148,7 +158,8 @@ class PhoenixRDD(sc: SparkContext, table: String, columns: Seq[String],
       if (columnInfo.getPrecision < 0) DecimalType(38, 18) else DecimalType(columnInfo.getPrecision, columnInfo.getScale)
     case t if t.isInstanceOf[PTimestamp] || t.isInstanceOf[PUnsignedTimestamp] => TimestampType
     case t if t.isInstanceOf[PTime] || t.isInstanceOf[PUnsignedTime] => TimestampType
-    case t if t.isInstanceOf[PDate] || t.isInstanceOf[PUnsignedDate] => DateType
+    case t if (t.isInstanceOf[PDate] || t.isInstanceOf[PUnsignedDate]) && !dateAsTimestamp => DateType
+    case t if (t.isInstanceOf[PDate] || t.isInstanceOf[PUnsignedDate]) && dateAsTimestamp => TimestampType
     case t if t.isInstanceOf[PBoolean] => BooleanType
     case t if t.isInstanceOf[PVarbinary] || t.isInstanceOf[PBinary] => BinaryType
     case t if t.isInstanceOf[PIntegerArray] || t.isInstanceOf[PUnsignedIntArray] => ArrayType(IntegerType, containsNull = true)

http://git-wip-us.apache.org/repos/asf/phoenix/blob/98e783cf/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala
index 3b660f9..8d7f9f7 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.{Row, SQLContext}
 import org.apache.spark.sql.sources._
 import org.apache.phoenix.util.StringUtil.escapeStringConstant
 
-case class PhoenixRelation(tableName: String, zkUrl: String)(@transient val sqlContext: SQLContext)
+case class PhoenixRelation(tableName: String, zkUrl: String, dateAsTimestamp: Boolean = false)(@transient val sqlContext: SQLContext)
     extends BaseRelation with PrunedFilteredScan {
 
   /*
@@ -41,7 +41,8 @@ case class PhoenixRelation(tableName: String, zkUrl: String)(@transient val sqlC
       requiredColumns,
       Some(buildFilter(filters)),
       Some(zkUrl),
-      new Configuration()
+      new Configuration(),
+      dateAsTimestamp
     ).toDataFrame(sqlContext).rdd
   }
 
@@ -53,7 +54,8 @@ case class PhoenixRelation(tableName: String, zkUrl: String)(@transient val sqlC
       Seq(),
       None,
       Some(zkUrl),
-      new Configuration()
+      new Configuration(),
+      dateAsTimestamp
     ).toDataFrame(sqlContext).schema
   }