You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yh...@apache.org on 2015/05/27 21:42:22 UTC

spark git commit: [SPARK-7790] [SQL] date and decimal conversion for dynamic partition key

Repository: spark
Updated Branches:
  refs/heads/master 6fec1a940 -> 8161562ea


[SPARK-7790] [SQL] date and decimal conversion for dynamic partition key

Author: Daoyuan Wang <da...@intel.com>

Closes #6318 from adrian-wang/dynpart and squashes the following commits:

ad73b61 [Daoyuan Wang] not use sqlTestUtils for try catch because dont have sqlcontext here
6c33b51 [Daoyuan Wang] fix according to liancheng
f0f8074 [Daoyuan Wang] some specific types as dynamic partition


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8161562e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8161562e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8161562e

Branch: refs/heads/master
Commit: 8161562eabc1eff430cfd9d8eaf413a8c4ef2cfb
Parents: 6fec1a9
Author: Daoyuan Wang <da...@intel.com>
Authored: Wed May 27 12:42:13 2015 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Wed May 27 12:42:13 2015 -0700

----------------------------------------------------------------------
 .../hive/execution/InsertIntoHiveTable.scala    |  2 +-
 .../spark/sql/hive/hiveWriterContainers.scala   | 17 ++++++++--
 .../sql/hive/execution/SQLQuerySuite.scala      | 33 ++++++++++++++++++++
 3 files changed, 48 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8161562e/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index c0b0b10..7a6ca48 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -106,7 +106,7 @@ case class InsertIntoHiveTable(
         }
 
         writerContainer
-          .getLocalFileWriter(row)
+          .getLocalFileWriter(row, table.schema)
           .write(serializer.serialize(outputData, standardOI))
       }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/8161562e/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
index cbc381c..50b209f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
@@ -34,8 +34,10 @@ import org.apache.hadoop.hive.common.FileUtils
 import org.apache.spark.mapred.SparkHadoopMapRedUtil
 import org.apache.spark.sql.Row
 import org.apache.spark.{Logging, SerializableWritable, SparkHadoopWriter}
+import org.apache.spark.sql.catalyst.util.DateUtils
 import org.apache.spark.sql.hive.{ShimFileSinkDesc => FileSinkDesc}
 import org.apache.spark.sql.hive.HiveShim._
+import org.apache.spark.sql.types._
 
 /**
  * Internal helper class that saves an RDD using a Hive OutputFormat.
@@ -92,7 +94,7 @@ private[hive] class SparkHiveWriterContainer(
     "part-" + numberFormat.format(splitID) + extension
   }
 
-  def getLocalFileWriter(row: Row): FileSinkOperator.RecordWriter = writer
+  def getLocalFileWriter(row: Row, schema: StructType): FileSinkOperator.RecordWriter = writer
 
   def close() {
     // Seems the boolean value passed into close does not matter.
@@ -195,11 +197,20 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer(
     jobConf.setBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, oldMarker)
   }
 
-  override def getLocalFileWriter(row: Row): FileSinkOperator.RecordWriter = {
+  override def getLocalFileWriter(row: Row, schema: StructType): FileSinkOperator.RecordWriter = {
+    def convertToHiveRawString(col: String, value: Any): String = {
+      val raw = String.valueOf(value)
+      schema(col).dataType match {
+        case DateType => DateUtils.toString(raw.toInt)
+        case _: DecimalType => BigDecimal(raw).toString()
+        case _ => raw
+      }
+    }
+
     val dynamicPartPath = dynamicPartColNames
       .zip(row.toSeq.takeRight(dynamicPartColNames.length))
       .map { case (col, rawVal) =>
-        val string = if (rawVal == null) null else String.valueOf(rawVal)
+        val string = if (rawVal == null) null else convertToHiveRawString(col, rawVal)
         val colString =
           if (string == null || string.isEmpty) {
             defaultPartName

http://git-wip-us.apache.org/repos/asf/spark/blob/8161562e/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index b707f5e..538e661 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -837,4 +837,37 @@ class SQLQuerySuite extends QueryTest {
         java.lang.Math.exp(1.0).toString,
         java.lang.Math.floor(1.9).toString))
   }
+
+  test("dynamic partition value test") {
+    try {
+      sql("set hive.exec.dynamic.partition.mode=nonstrict")
+      // date
+      sql("drop table if exists dynparttest1")
+      sql("create table dynparttest1 (value int) partitioned by (pdate date)")
+      sql(
+        """
+          |insert into table dynparttest1 partition(pdate)
+          | select count(*), cast('2015-05-21' as date) as pdate from src
+        """.stripMargin)
+      checkAnswer(
+        sql("select * from dynparttest1"),
+        Seq(Row(500, java.sql.Date.valueOf("2015-05-21"))))
+
+      // decimal
+      sql("drop table if exists dynparttest2")
+      sql("create table dynparttest2 (value int) partitioned by (pdec decimal(5, 1))")
+      sql(
+        """
+          |insert into table dynparttest2 partition(pdec)
+          | select count(*), cast('100.12' as decimal(5, 1)) as pdec from src
+        """.stripMargin)
+      checkAnswer(
+        sql("select * from dynparttest2"),
+        Seq(Row(500, new java.math.BigDecimal("100.1"))))
+    } finally {
+      sql("drop table if exists dynparttest1")
+      sql("drop table if exists dynparttest2")
+      sql("set hive.exec.dynamic.partition.mode=strict")
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org