You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by jm...@apache.org on 2016/05/15 15:32:50 UTC
phoenix git commit: PHOENIX-2784 phoenix-spark: Allow coercion of
DATE to TIMESTAMP for DataFrame
Repository: phoenix
Updated Branches:
refs/heads/master ae14e38cc -> 98e783cf6
PHOENIX-2784 phoenix-spark: Allow coercion of DATE to TIMESTAMP for DataFrame
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/98e783cf
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/98e783cf
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/98e783cf
Branch: refs/heads/master
Commit: 98e783cf6d7b4644660f48961795699d5374ce3f
Parents: ae14e38
Author: Josh Mahonin <jm...@gmail.com>
Authored: Sun May 15 11:30:32 2016 -0400
Committer: Josh Mahonin <jm...@gmail.com>
Committed: Sun May 15 11:30:32 2016 -0400
----------------------------------------------------------------------
.../apache/phoenix/spark/PhoenixSparkIT.scala | 18 ++++++++++---
.../apache/phoenix/spark/DefaultSource.scala | 3 ++-
.../org/apache/phoenix/spark/PhoenixRDD.scala | 27 ++++++++++++++------
.../apache/phoenix/spark/PhoenixRelation.scala | 8 +++---
4 files changed, 41 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/98e783cf/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
index ad4791d..941c192 100644
--- a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
+++ b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
@@ -632,12 +632,24 @@ class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll {
val dt = df.select("COL1").first().getDate(0).getTime
val epoch = new Date().getTime
- // NOTE: Spark DateType drops hour, minute, second, as per the java.sql.Date spec. Unfortunately if you want to
- // read the full date row, you need to use the RDD integration instead. In the future we could force the schema
- // converter to cast the date as a timestamp instead...
+ // NOTE: Spark DateType drops hour, minute, second, as per the java.sql.Date spec
+ // Use 'dateAsTimestamp' option to coerce DATE to TIMESTAMP without losing resolution
// Note that Spark also applies the timezone offset to the returned date epoch. Rather than perform timezone
// gymnastics, just make sure we're within 24H of the epoch generated just now
assert(Math.abs(epoch - dt) < 86400000)
}
+
+ test("Can coerce Phoenix DATE columns to TIMESTAMP through DataFrame API") {
+ val sqlContext = new SQLContext(sc)
+ val df = sqlContext.read
+ .format("org.apache.phoenix.spark")
+ .options(Map("table" -> "DATE_TEST", "zkUrl" -> quorumAddress, "dateAsTimestamp" -> "true"))
+ .load
+ val dtRes = df.select("COL1").first()
+ val ts = dtRes.getTimestamp(0).getTime
+ val epoch = new Date().getTime
+
+ assert(Math.abs(epoch - ts) < 300000)
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/98e783cf/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DefaultSource.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DefaultSource.scala
index 6327ec2..15d1944 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DefaultSource.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DefaultSource.scala
@@ -29,7 +29,8 @@ class DefaultSource extends RelationProvider with CreatableRelationProvider {
new PhoenixRelation(
parameters("table"),
- parameters("zkUrl")
+ parameters("zkUrl"),
+ parameters.contains("dateAsTimestamp")
)(sqlContext)
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/98e783cf/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala
index 6560fd3..cfaefa1 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala
@@ -33,7 +33,7 @@ import scala.collection.JavaConverters._
class PhoenixRDD(sc: SparkContext, table: String, columns: Seq[String],
predicate: Option[String] = None, zkUrl: Option[String] = None,
- @transient conf: Configuration)
+ @transient conf: Configuration, dateAsTimestamp: Boolean = false)
extends RDD[PhoenixRecordWritable](sc, Nil) with Logging {
// Make sure to register the Phoenix driver
@@ -102,23 +102,33 @@ class PhoenixRDD(sc: SparkContext, table: String, columns: Seq[String],
// Convert our PhoenixRDD to a DataFrame
def toDataFrame(sqlContext: SQLContext): DataFrame = {
- val columnList = PhoenixConfigurationUtil
+ val columnInfoList = PhoenixConfigurationUtil
.getSelectColumnMetadataList(new Configuration(phoenixConf))
.asScala
- val columnNames: Seq[String] = columnList.map(ci => {
- ci.getDisplayName
+ // Keep track of the sql type and column names.
+ val columns: Seq[(String, Int)] = columnInfoList.map(ci => {
+ (ci.getDisplayName, ci.getSqlType)
})
+
// Lookup the Spark catalyst types from the Phoenix schema
- val structFields = phoenixSchemaToCatalystSchema(columnList).toArray
+ val structFields = phoenixSchemaToCatalystSchema(columnInfoList).toArray
// Create the data frame from the converted Spark schema
sqlContext.createDataFrame(map(pr => {
// Create a sequence of column data
- val rowSeq = columnNames.map { name =>
- pr.resultMap(name)
+ val rowSeq = columns.map { case (name, sqlType) =>
+ val res = pr.resultMap(name)
+
+ // Special handling for data types
+ if(dateAsTimestamp && sqlType == 91) { // 91 is the defined type for Date
+ new java.sql.Timestamp(res.asInstanceOf[java.sql.Date].getTime)
+ }
+ else {
+ res
+ }
}
// Create a Spark Row from the sequence
@@ -148,7 +158,8 @@ class PhoenixRDD(sc: SparkContext, table: String, columns: Seq[String],
if (columnInfo.getPrecision < 0) DecimalType(38, 18) else DecimalType(columnInfo.getPrecision, columnInfo.getScale)
case t if t.isInstanceOf[PTimestamp] || t.isInstanceOf[PUnsignedTimestamp] => TimestampType
case t if t.isInstanceOf[PTime] || t.isInstanceOf[PUnsignedTime] => TimestampType
- case t if t.isInstanceOf[PDate] || t.isInstanceOf[PUnsignedDate] => DateType
+ case t if (t.isInstanceOf[PDate] || t.isInstanceOf[PUnsignedDate]) && !dateAsTimestamp => DateType
+ case t if (t.isInstanceOf[PDate] || t.isInstanceOf[PUnsignedDate]) && dateAsTimestamp => TimestampType
case t if t.isInstanceOf[PBoolean] => BooleanType
case t if t.isInstanceOf[PVarbinary] || t.isInstanceOf[PBinary] => BinaryType
case t if t.isInstanceOf[PIntegerArray] || t.isInstanceOf[PUnsignedIntArray] => ArrayType(IntegerType, containsNull = true)
http://git-wip-us.apache.org/repos/asf/phoenix/blob/98e783cf/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala
index 3b660f9..8d7f9f7 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRelation.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.sources._
import org.apache.phoenix.util.StringUtil.escapeStringConstant
-case class PhoenixRelation(tableName: String, zkUrl: String)(@transient val sqlContext: SQLContext)
+case class PhoenixRelation(tableName: String, zkUrl: String, dateAsTimestamp: Boolean = false)(@transient val sqlContext: SQLContext)
extends BaseRelation with PrunedFilteredScan {
/*
@@ -41,7 +41,8 @@ case class PhoenixRelation(tableName: String, zkUrl: String)(@transient val sqlC
requiredColumns,
Some(buildFilter(filters)),
Some(zkUrl),
- new Configuration()
+ new Configuration(),
+ dateAsTimestamp
).toDataFrame(sqlContext).rdd
}
@@ -53,7 +54,8 @@ case class PhoenixRelation(tableName: String, zkUrl: String)(@transient val sqlC
Seq(),
None,
Some(zkUrl),
- new Configuration()
+ new Configuration(),
+ dateAsTimestamp
).toDataFrame(sqlContext).schema
}