You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/11/17 00:05:21 UTC

spark git commit: Revert "[SPARK-4309][SPARK-4407][SQL] Date type support for Thrift server, and fixes for complex types"

Repository: spark
Updated Branches:
  refs/heads/master cb6bd83a9 -> 45ce3273c


Revert "[SPARK-4309][SPARK-4407][SQL] Date type support for Thrift server, and fixes for complex types"

Author: Michael Armbrust <mi...@databricks.com>

Closes #3292 from marmbrus/revert4309 and squashes the following commits:

808e96e [Michael Armbrust] Revert "[SPARK-4309][SPARK-4407][SQL] Date type support for Thrift server, and fixes for complex types"


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/45ce3273
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/45ce3273
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/45ce3273

Branch: refs/heads/master
Commit: 45ce3273cb618d14ec4d20c4c95699634b951086
Parents: cb6bd83
Author: Michael Armbrust <mi...@databricks.com>
Authored: Sun Nov 16 15:05:04 2014 -0800
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Sun Nov 16 15:05:08 2014 -0800

----------------------------------------------------------------------
 .../thriftserver/HiveThriftServer2Suite.scala   |  90 ++++---------
 .../spark/sql/hive/thriftserver/Shim12.scala    |  11 +-
 .../spark/sql/hive/thriftserver/Shim13.scala    |  29 +++--
 .../org/apache/spark/sql/hive/HiveContext.scala | 127 ++++++++++---------
 4 files changed, 115 insertions(+), 142 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/45ce3273/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala
index 23d12cb..bba29b2 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala
@@ -19,10 +19,9 @@ package org.apache.spark.sql.hive.thriftserver
 
 import java.io.File
 import java.net.ServerSocket
-import java.sql.{Date, DriverManager, Statement}
+import java.sql.{DriverManager, Statement}
 import java.util.concurrent.TimeoutException
 
-import scala.collection.JavaConversions._
 import scala.collection.mutable.ArrayBuffer
 import scala.concurrent.duration._
 import scala.concurrent.{Await, Promise}
@@ -52,15 +51,6 @@ import org.apache.spark.sql.hive.HiveShim
 class HiveThriftServer2Suite extends FunSuite with Logging {
   Class.forName(classOf[HiveDriver].getCanonicalName)
 
-  object TestData {
-    def getTestDataFilePath(name: String) = {
-      Thread.currentThread().getContextClassLoader.getResource(s"data/files/$name")
-    }
-
-    val smallKv = getTestDataFilePath("small_kv.txt")
-    val smallKvWithNull = getTestDataFilePath("small_kv_with_null.txt")
-  }
-
   def randomListeningPort =  {
     // Let the system to choose a random available port to avoid collision with other parallel
     // builds.
@@ -155,8 +145,12 @@ class HiveThriftServer2Suite extends FunSuite with Logging {
       }
     }
 
-    // Resets SPARK_TESTING to avoid loading Log4J configurations in testing class paths
-    val env = Seq("SPARK_TESTING" -> "0")
+    val env = Seq(
+      // Resets SPARK_TESTING to avoid loading Log4J configurations in testing class paths
+      "SPARK_TESTING" -> "0",
+      // Prevents loading classes out of the assembly jar. Otherwise Utils.sparkVersion can't read
+      // proper version information from the jar manifest.
+      "SPARK_PREPEND_CLASSES" -> "")
 
     Process(command, None, env: _*).run(ProcessLogger(
       captureThriftServerOutput("stdout"),
@@ -200,12 +194,15 @@ class HiveThriftServer2Suite extends FunSuite with Logging {
 
   test("Test JDBC query execution") {
     withJdbcStatement() { statement =>
-      val queries = Seq(
-        "SET spark.sql.shuffle.partitions=3",
-        "DROP TABLE IF EXISTS test",
-        "CREATE TABLE test(key INT, val STRING)",
-        s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test",
-        "CACHE TABLE test")
+      val dataFilePath =
+        Thread.currentThread().getContextClassLoader.getResource("data/files/small_kv.txt")
+
+      val queries =
+        s"""SET spark.sql.shuffle.partitions=3;
+           |CREATE TABLE test(key INT, val STRING);
+           |LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE test;
+           |CACHE TABLE test;
+         """.stripMargin.split(";").map(_.trim).filter(_.nonEmpty)
 
       queries.foreach(statement.execute)
 
@@ -219,10 +216,14 @@ class HiveThriftServer2Suite extends FunSuite with Logging {
 
   test("SPARK-3004 regression: result set containing NULL") {
     withJdbcStatement() { statement =>
+      val dataFilePath =
+        Thread.currentThread().getContextClassLoader.getResource(
+          "data/files/small_kv_with_null.txt")
+
       val queries = Seq(
         "DROP TABLE IF EXISTS test_null",
         "CREATE TABLE test_null(key INT, val STRING)",
-        s"LOAD DATA LOCAL INPATH '${TestData.smallKvWithNull}' OVERWRITE INTO TABLE test_null")
+        s"LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE test_null")
 
       queries.foreach(statement.execute)
 
@@ -269,10 +270,13 @@ class HiveThriftServer2Suite extends FunSuite with Logging {
 
   test("SPARK-4292 regression: result set iterator issue") {
     withJdbcStatement() { statement =>
+      val dataFilePath =
+        Thread.currentThread().getContextClassLoader.getResource("data/files/small_kv.txt")
+
       val queries = Seq(
         "DROP TABLE IF EXISTS test_4292",
         "CREATE TABLE test_4292(key INT, val STRING)",
-        s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_4292")
+        s"LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE test_4292")
 
       queries.foreach(statement.execute)
 
@@ -280,52 +284,10 @@ class HiveThriftServer2Suite extends FunSuite with Logging {
 
       Seq(238, 86, 311, 27, 165).foreach { key =>
         resultSet.next()
-        assert(resultSet.getInt(1) === key)
+        assert(resultSet.getInt(1) == key)
       }
 
       statement.executeQuery("DROP TABLE IF EXISTS test_4292")
     }
   }
-
-  test("SPARK-4309 regression: Date type support") {
-    withJdbcStatement() { statement =>
-      val queries = Seq(
-        "DROP TABLE IF EXISTS test_date",
-        "CREATE TABLE test_date(key INT, value STRING)",
-        s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_date")
-
-      queries.foreach(statement.execute)
-
-      assertResult(Date.valueOf("2011-01-01")) {
-        val resultSet = statement.executeQuery(
-          "SELECT CAST('2011-01-01' as date) FROM test_date LIMIT 1")
-        resultSet.next()
-        resultSet.getDate(1)
-      }
-    }
-  }
-
-  test("SPARK-4407 regression: Complex type support") {
-    withJdbcStatement() { statement =>
-      val queries = Seq(
-        "DROP TABLE IF EXISTS test_map",
-        "CREATE TABLE test_map(key INT, value STRING)",
-        s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_map")
-
-      queries.foreach(statement.execute)
-
-      assertResult("""{238:"val_238"}""") {
-        val resultSet = statement.executeQuery("SELECT MAP(key, value) FROM test_map LIMIT 1")
-        resultSet.next()
-        resultSet.getString(1)
-      }
-
-      assertResult("""["238","val_238"]""") {
-        val resultSet = statement.executeQuery(
-          "SELECT ARRAY(CAST(key AS STRING), value) FROM test_map LIMIT 1")
-        resultSet.next()
-        resultSet.getString(1)
-      }
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/45ce3273/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala b/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala
index 9258ad0..aa2e3ca 100644
--- a/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala
+++ b/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.hive.thriftserver
 
-import java.sql.{Date, Timestamp}
+import java.sql.Timestamp
 import java.util.{ArrayList => JArrayList, Map => JMap}
 
 import scala.collection.JavaConversions._
@@ -131,13 +131,14 @@ private[hive] class SparkExecuteStatementOperation(
         to.addColumnValue(ColumnValue.byteValue(from.getByte(ordinal)))
       case ShortType =>
         to.addColumnValue(ColumnValue.shortValue(from.getShort(ordinal)))
-      case DateType =>
-        to.addColumnValue(ColumnValue.dateValue(from(ordinal).asInstanceOf[Date]))
       case TimestampType =>
         to.addColumnValue(
           ColumnValue.timestampValue(from.get(ordinal).asInstanceOf[Timestamp]))
       case BinaryType | _: ArrayType | _: StructType | _: MapType =>
-        val hiveString = HiveContext.toHiveString((from.get(ordinal), dataTypes(ordinal)))
+        val hiveString = result
+          .queryExecution
+          .asInstanceOf[HiveContext#QueryExecution]
+          .toHiveString((from.get(ordinal), dataTypes(ordinal)))
         to.addColumnValue(ColumnValue.stringValue(hiveString))
     }
   }
@@ -162,8 +163,6 @@ private[hive] class SparkExecuteStatementOperation(
         to.addColumnValue(ColumnValue.byteValue(null))
       case ShortType =>
         to.addColumnValue(ColumnValue.shortValue(null))
-      case DateType =>
-        to.addColumnValue(ColumnValue.dateValue(null))
       case TimestampType =>
         to.addColumnValue(ColumnValue.timestampValue(null))
       case BinaryType | _: ArrayType | _: StructType | _: MapType =>

http://git-wip-us.apache.org/repos/asf/spark/blob/45ce3273/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala b/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala
index 3c7f62a..a642478 100644
--- a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala
+++ b/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.hive.thriftserver
 
 import java.security.PrivilegedExceptionAction
-import java.sql.{Date, Timestamp}
+import java.sql.Timestamp
 import java.util.concurrent.Future
 import java.util.{ArrayList => JArrayList, List => JList, Map => JMap}
 
@@ -113,7 +113,7 @@ private[hive] class SparkExecuteStatementOperation(
   def addNonNullColumnValue(from: SparkRow, to: ArrayBuffer[Any],  ordinal: Int) {
     dataTypes(ordinal) match {
       case StringType =>
-        to += from.getString(ordinal)
+        to += from.get(ordinal).asInstanceOf[String]
       case IntegerType =>
         to += from.getInt(ordinal)
       case BooleanType =>
@@ -123,20 +123,23 @@ private[hive] class SparkExecuteStatementOperation(
       case FloatType =>
         to += from.getFloat(ordinal)
       case DecimalType() =>
-        to += from.getAs[BigDecimal](ordinal).bigDecimal
+        to += from.get(ordinal).asInstanceOf[BigDecimal].bigDecimal
       case LongType =>
         to += from.getLong(ordinal)
       case ByteType =>
         to += from.getByte(ordinal)
       case ShortType =>
         to += from.getShort(ordinal)
-      case DateType =>
-        to += from.getAs[Date](ordinal)
       case TimestampType =>
-        to +=  from.getAs[Timestamp](ordinal)
-      case BinaryType | _: ArrayType | _: StructType | _: MapType =>
-        val hiveString = HiveContext.toHiveString((from.get(ordinal), dataTypes(ordinal)))
-        to += hiveString
+        to +=  from.get(ordinal).asInstanceOf[Timestamp]
+      case BinaryType =>
+        to += from.get(ordinal).asInstanceOf[String]
+      case _: ArrayType =>
+        to += from.get(ordinal).asInstanceOf[String]
+      case _: StructType =>
+        to += from.get(ordinal).asInstanceOf[String]
+      case _: MapType =>
+        to += from.get(ordinal).asInstanceOf[String]
     }
   }
 
@@ -144,9 +147,9 @@ private[hive] class SparkExecuteStatementOperation(
     validateDefaultFetchOrientation(order)
     assertState(OperationState.FINISHED)
     setHasResultSet(true)
-    val resultRowSet: RowSet = RowSetFactory.create(getResultSetSchema, getProtocolVersion)
+    val reultRowSet: RowSet = RowSetFactory.create(getResultSetSchema, getProtocolVersion)
     if (!iter.hasNext) {
-      resultRowSet
+      reultRowSet
     } else {
       // maxRowsL here typically maps to java.sql.Statement.getFetchSize, which is an int
       val maxRows = maxRowsL.toInt
@@ -163,10 +166,10 @@ private[hive] class SparkExecuteStatementOperation(
           }
           curCol += 1
         }
-        resultRowSet.addRow(row.toArray.asInstanceOf[Array[Object]])
+        reultRowSet.addRow(row.toArray.asInstanceOf[Array[Object]])
         curRow += 1
       }
-      resultRowSet
+      reultRowSet
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/45ce3273/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index feed64f..e88afaa 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -19,27 +19,36 @@ package org.apache.spark.sql.hive
 
 import java.io.{BufferedReader, File, InputStreamReader, PrintStream}
 import java.sql.{Date, Timestamp}
+import java.util.{ArrayList => JArrayList}
+
+import org.apache.hadoop.hive.common.`type`.HiveDecimal
+import org.apache.spark.sql.catalyst.types.DecimalType
+import org.apache.spark.sql.catalyst.types.decimal.Decimal
 
 import scala.collection.JavaConversions._
 import scala.language.implicitConversions
-import scala.reflect.runtime.universe.TypeTag
+import scala.reflect.runtime.universe.{TypeTag, typeTag}
 
-import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.fs.Path
 import org.apache.hadoop.hive.conf.HiveConf
 import org.apache.hadoop.hive.ql.Driver
 import org.apache.hadoop.hive.ql.metadata.Table
 import org.apache.hadoop.hive.ql.processors._
 import org.apache.hadoop.hive.ql.session.SessionState
-import org.apache.hadoop.hive.serde2.io.{DateWritable, TimestampWritable}
+import org.apache.hadoop.hive.serde2.io.TimestampWritable
+import org.apache.hadoop.hive.serde2.io.DateWritable
 
 import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.ScalaReflection
-import org.apache.spark.sql.catalyst.analysis.{Analyzer, EliminateAnalysisOperators, OverrideCatalog, OverrideFunctionRegistry}
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, EliminateAnalysisOperators}
+import org.apache.spark.sql.catalyst.analysis.{OverrideCatalog, OverrideFunctionRegistry}
 import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.types.DecimalType
-import org.apache.spark.sql.catalyst.types.decimal.Decimal
-import org.apache.spark.sql.execution.{ExtractPythonUdfs, QueryExecutionException, Command => PhysicalCommand}
+import org.apache.spark.sql.execution.ExtractPythonUdfs
+import org.apache.spark.sql.execution.QueryExecutionException
+import org.apache.spark.sql.execution.{Command => PhysicalCommand}
 import org.apache.spark.sql.hive.execution.DescribeHiveTableCommand
 import org.apache.spark.sql.sources.DataSourceStrategy
 
@@ -127,7 +136,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
     val relation = EliminateAnalysisOperators(catalog.lookupRelation(None, tableName))
 
     relation match {
-      case relation: MetastoreRelation =>
+      case relation: MetastoreRelation => {
         // This method is mainly based on
         // org.apache.hadoop.hive.ql.stats.StatsUtils.getFileSizeForTable(HiveConf, Table)
         // in Hive 0.13 (except that we do not use fs.getContentSummary).
@@ -138,7 +147,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
         // countFileSize to count the table size.
         def calculateTableSize(fs: FileSystem, path: Path): Long = {
           val fileStatus = fs.getFileStatus(path)
-          val size = if (fileStatus.isDirectory) {
+          val size = if (fileStatus.isDir) {
             fs.listStatus(path).map(status => calculateTableSize(fs, status.getPath)).sum
           } else {
             fileStatus.getLen
@@ -148,7 +157,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
         }
 
         def getFileSizeForTable(conf: HiveConf, table: Table): Long = {
-          val path = table.getPath
+          val path = table.getPath()
           var size: Long = 0L
           try {
             val fs = path.getFileSystem(conf)
@@ -178,14 +187,15 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
           val hiveTTable = relation.hiveQlTable.getTTable
           hiveTTable.setParameters(tableParameters)
           val tableFullName =
-            relation.hiveQlTable.getDbName + "." + relation.hiveQlTable.getTableName
+            relation.hiveQlTable.getDbName() + "." + relation.hiveQlTable.getTableName()
 
           catalog.client.alterTable(tableFullName, new Table(hiveTTable))
         }
+      }
       case otherRelation =>
         throw new NotImplementedError(
           s"Analyze has only implemented for Hive tables, " +
-            s"but $tableName is a ${otherRelation.nodeName}")
+            s"but ${tableName} is a ${otherRelation.nodeName}")
     }
   }
 
@@ -364,6 +374,50 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
   /** Extends QueryExecution with hive specific features. */
   protected[sql] abstract class QueryExecution extends super.QueryExecution {
 
+    protected val primitiveTypes =
+      Seq(StringType, IntegerType, LongType, DoubleType, FloatType, BooleanType, ByteType,
+        ShortType, DateType, TimestampType, BinaryType)
+
+    protected[sql] def toHiveString(a: (Any, DataType)): String = a match {
+      case (struct: Row, StructType(fields)) =>
+        struct.zip(fields).map {
+          case (v, t) => s""""${t.name}":${toHiveStructString(v, t.dataType)}"""
+        }.mkString("{", ",", "}")
+      case (seq: Seq[_], ArrayType(typ, _)) =>
+        seq.map(v => (v, typ)).map(toHiveStructString).mkString("[", ",", "]")
+      case (map: Map[_,_], MapType(kType, vType, _)) =>
+        map.map {
+          case (key, value) =>
+            toHiveStructString((key, kType)) + ":" + toHiveStructString((value, vType))
+        }.toSeq.sorted.mkString("{", ",", "}")
+      case (null, _) => "NULL"
+      case (d: Date, DateType) => new DateWritable(d).toString
+      case (t: Timestamp, TimestampType) => new TimestampWritable(t).toString
+      case (bin: Array[Byte], BinaryType) => new String(bin, "UTF-8")
+      case (decimal: Decimal, DecimalType()) =>  // Hive strips trailing zeros so use its toString
+        HiveShim.createDecimal(decimal.toBigDecimal.underlying()).toString
+      case (other, tpe) if primitiveTypes contains tpe => other.toString
+    }
+
+    /** Hive outputs fields of structs slightly differently than top level attributes. */
+    protected def toHiveStructString(a: (Any, DataType)): String = a match {
+      case (struct: Row, StructType(fields)) =>
+        struct.zip(fields).map {
+          case (v, t) => s""""${t.name}":${toHiveStructString(v, t.dataType)}"""
+        }.mkString("{", ",", "}")
+      case (seq: Seq[_], ArrayType(typ, _)) =>
+        seq.map(v => (v, typ)).map(toHiveStructString).mkString("[", ",", "]")
+      case (map: Map[_, _], MapType(kType, vType, _)) =>
+        map.map {
+          case (key, value) =>
+            toHiveStructString((key, kType)) + ":" + toHiveStructString((value, vType))
+        }.toSeq.sorted.mkString("{", ",", "}")
+      case (null, _) => "null"
+      case (s: String, StringType) => "\"" + s + "\""
+      case (decimal, DecimalType()) => decimal.toString
+      case (other, tpe) if primitiveTypes contains tpe => other.toString
+    }
+
     /**
      * Returns the result as a hive compatible sequence of strings.  For native commands, the
      * execution is simply passed back to Hive.
@@ -381,7 +435,8 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
         // We need the types so we can output struct field names
         val types = analyzed.output.map(_.dataType)
         // Reformat to match hive tab delimited output.
-        result.map(_.zip(types).map(HiveContext.toHiveString)).map(_.mkString("\t")).toSeq
+        val asString = result.map(_.zip(types).map(toHiveString)).map(_.mkString("\t")).toSeq
+        asString
     }
 
     override def simpleString: String =
@@ -392,49 +447,3 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
       }
   }
 }
-
-object HiveContext {
-  protected val primitiveTypes =
-    Seq(StringType, IntegerType, LongType, DoubleType, FloatType, BooleanType, ByteType,
-      ShortType, DateType, TimestampType, BinaryType)
-
-  protected[sql] def toHiveString(a: (Any, DataType)): String = a match {
-    case (struct: Row, StructType(fields)) =>
-      struct.zip(fields).map {
-        case (v, t) => s""""${t.name}":${toHiveStructString(v, t.dataType)}"""
-      }.mkString("{", ",", "}")
-    case (seq: Seq[_], ArrayType(typ, _)) =>
-      seq.map(v => (v, typ)).map(toHiveStructString).mkString("[", ",", "]")
-    case (map: Map[_,_], MapType(kType, vType, _)) =>
-      map.map {
-        case (key, value) =>
-          toHiveStructString((key, kType)) + ":" + toHiveStructString((value, vType))
-      }.toSeq.sorted.mkString("{", ",", "}")
-    case (null, _) => "NULL"
-    case (d: Date, DateType) => new DateWritable(d).toString
-    case (t: Timestamp, TimestampType) => new TimestampWritable(t).toString
-    case (bin: Array[Byte], BinaryType) => new String(bin, "UTF-8")
-    case (decimal: Decimal, DecimalType()) =>  // Hive strips trailing zeros so use its toString
-      HiveShim.createDecimal(decimal.toBigDecimal.underlying()).toString
-    case (other, tpe) if primitiveTypes contains tpe => other.toString
-  }
-
-  /** Hive outputs fields of structs slightly differently than top level attributes. */
-  protected def toHiveStructString(a: (Any, DataType)): String = a match {
-    case (struct: Row, StructType(fields)) =>
-      struct.zip(fields).map {
-        case (v, t) => s""""${t.name}":${toHiveStructString(v, t.dataType)}"""
-      }.mkString("{", ",", "}")
-    case (seq: Seq[_], ArrayType(typ, _)) =>
-      seq.map(v => (v, typ)).map(toHiveStructString).mkString("[", ",", "]")
-    case (map: Map[_, _], MapType(kType, vType, _)) =>
-      map.map {
-        case (key, value) =>
-          toHiveStructString((key, kType)) + ":" + toHiveStructString((value, vType))
-      }.toSeq.sorted.mkString("{", ",", "}")
-    case (null, _) => "null"
-    case (s: String, StringType) => "\"" + s + "\""
-    case (decimal, DecimalType()) => decimal.toString
-    case (other, tpe) if primitiveTypes contains tpe => other.toString
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org