You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2016/11/01 19:46:47 UTC

spark git commit: [SPARK-17764][SQL] Add `to_json` supporting to convert nested struct column to JSON string

Repository: spark
Updated Branches:
  refs/heads/master cfac17ee1 -> 01dd00830


[SPARK-17764][SQL] Add `to_json` supporting to convert nested struct column to JSON string

## What changes were proposed in this pull request?

This PR proposes to add `to_json` function in contrast with `from_json` in Scala, Java and Python.

It'd be useful if we can convert a same column from/to json. Also, some datasources do not support nested types. If we are forced to save a dataframe into those data sources, we might be able to work around by this function.

The usage is as below:

``` scala
val df = Seq(Tuple1(Tuple1(1))).toDF("a")
df.select(to_json($"a").as("json")).show()
```

``` bash
+--------+
|    json|
+--------+
|{"_1":1}|
+--------+
```
## How was this patch tested?

Unit tests in `JsonFunctionsSuite` and `JsonExpressionsSuite`.

Author: hyukjinkwon <gu...@gmail.com>

Closes #15354 from HyukjinKwon/SPARK-17764.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/01dd0083
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/01dd0083
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/01dd0083

Branch: refs/heads/master
Commit: 01dd0083011741c2bbe5ae1d2a25f2c9a1302b76
Parents: cfac17e
Author: hyukjinkwon <gu...@gmail.com>
Authored: Tue Nov 1 12:46:41 2016 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Tue Nov 1 12:46:41 2016 -0700

----------------------------------------------------------------------
 python/pyspark/sql/functions.py                 |  23 +++
 python/pyspark/sql/readwriter.py                |   2 +-
 python/pyspark/sql/streaming.py                 |   2 +-
 .../catalyst/expressions/jsonExpressions.scala  |  48 ++++-
 .../sql/catalyst/json/JacksonGenerator.scala    | 197 ++++++++++++++++++
 .../spark/sql/catalyst/json/JacksonUtils.scala  |  26 +++
 .../expressions/JsonExpressionsSuite.scala      |   9 +
 .../scala/org/apache/spark/sql/Dataset.scala    |   2 +-
 .../datasources/json/JacksonGenerator.scala     | 198 -------------------
 .../datasources/json/JsonFileFormat.scala       |   2 +-
 .../scala/org/apache/spark/sql/functions.scala  |  44 ++++-
 .../apache/spark/sql/JsonFunctionsSuite.scala   |  30 ++-
 12 files changed, 372 insertions(+), 211 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/01dd0083/python/pyspark/sql/functions.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 7fa3fd2..45e3c22 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -1744,6 +1744,29 @@ def from_json(col, schema, options={}):
     return Column(jc)
 
 
+@ignore_unicode_prefix
+@since(2.1)
+def to_json(col, options={}):
+    """
+    Converts a column containing a [[StructType]] into a JSON string. Throws an exception,
+    in the case of an unsupported type.
+
+    :param col: name of column containing the struct
+    :param options: options to control converting. accepts the same options as the json datasource
+
+    >>> from pyspark.sql import Row
+    >>> from pyspark.sql.types import *
+    >>> data = [(1, Row(name='Alice', age=2))]
+    >>> df = spark.createDataFrame(data, ("key", "value"))
+    >>> df.select(to_json(df.value).alias("json")).collect()
+    [Row(json=u'{"age":2,"name":"Alice"}')]
+    """
+
+    sc = SparkContext._active_spark_context
+    jc = sc._jvm.functions.to_json(_to_java_column(col), options)
+    return Column(jc)
+
+
 @since(1.5)
 def size(col):
     """

http://git-wip-us.apache.org/repos/asf/spark/blob/01dd0083/python/pyspark/sql/readwriter.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index bc786ef..b0c51b1 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -161,7 +161,7 @@ class DataFrameReader(OptionUtils):
              mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None):
         """
         Loads a JSON file (`JSON Lines text format or newline-delimited JSON
-        <[http://jsonlines.org/>`_) or an RDD of Strings storing JSON objects (one object per
+        <http://jsonlines.org/>`_) or an RDD of Strings storing JSON objects (one object per
         record) and returns the result as a :class`DataFrame`.
 
         If the ``schema`` parameter is not specified, this function goes

http://git-wip-us.apache.org/repos/asf/spark/blob/01dd0083/python/pyspark/sql/streaming.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index 559647b..1c94413 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -641,7 +641,7 @@ class DataStreamReader(OptionUtils):
              timestampFormat=None):
         """
         Loads a JSON file stream (`JSON Lines text format or newline-delimited JSON
-        <[http://jsonlines.org/>`_) and returns a :class`DataFrame`.
+        <http://jsonlines.org/>`_) and returns a :class`DataFrame`.
 
         If the ``schema`` parameter is not specified, this function goes
         through the input once to determine the input schema.

http://git-wip-us.apache.org/repos/asf/spark/blob/01dd0083/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
index 65dbd6a..244a5a3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
@@ -17,16 +17,17 @@
 
 package org.apache.spark.sql.catalyst.expressions
 
-import java.io.{ByteArrayOutputStream, StringWriter}
+import java.io.{ByteArrayOutputStream, CharArrayWriter, StringWriter}
 
 import scala.util.parsing.combinator.RegexParsers
 
 import com.fasterxml.jackson.core._
 
+import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
 import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptions, SparkSQLJsonProcessingException}
+import org.apache.spark.sql.catalyst.json._
 import org.apache.spark.sql.catalyst.util.ParseModes
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
@@ -494,3 +495,46 @@ case class JsonToStruct(schema: StructType, options: Map[String, String], child:
 
   override def inputTypes: Seq[AbstractDataType] = StringType :: Nil
 }
+
+/**
+ * Converts a [[StructType]] to a json output string.
+ */
+case class StructToJson(options: Map[String, String], child: Expression)
+  extends Expression with CodegenFallback with ExpectsInputTypes {
+  override def nullable: Boolean = true
+
+  @transient
+  lazy val writer = new CharArrayWriter()
+
+  @transient
+  lazy val gen =
+    new JacksonGenerator(child.dataType.asInstanceOf[StructType], writer)
+
+  override def dataType: DataType = StringType
+  override def children: Seq[Expression] = child :: Nil
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+    if (StructType.acceptsType(child.dataType)) {
+      try {
+        JacksonUtils.verifySchema(child.dataType.asInstanceOf[StructType])
+        TypeCheckResult.TypeCheckSuccess
+      } catch {
+        case e: UnsupportedOperationException =>
+          TypeCheckResult.TypeCheckFailure(e.getMessage)
+      }
+    } else {
+      TypeCheckResult.TypeCheckFailure(
+        s"$prettyName requires that the expression is a struct expression.")
+    }
+  }
+
+  override def eval(input: InternalRow): Any = {
+    gen.write(child.eval(input).asInstanceOf[InternalRow])
+    gen.flush()
+    val json = writer.toString
+    writer.reset()
+    UTF8String.fromString(json)
+  }
+
+  override def inputTypes: Seq[AbstractDataType] = StructType :: Nil
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/01dd0083/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
new file mode 100644
index 0000000..4b548e0
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.json
+
+import java.io.Writer
+
+import com.fasterxml.jackson.core._
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
+import org.apache.spark.sql.catalyst.util.{ArrayData, DateTimeUtils, MapData}
+import org.apache.spark.sql.types._
+
+private[sql] class JacksonGenerator(
+    schema: StructType,
+    writer: Writer,
+    options: JSONOptions = new JSONOptions(Map.empty[String, String])) {
+  // A `ValueWriter` is responsible for writing a field of an `InternalRow` to appropriate
+  // JSON data. Here we are using `SpecializedGetters` rather than `InternalRow` so that
+  // we can directly access data in `ArrayData` without the help of `SpecificMutableRow`.
+  private type ValueWriter = (SpecializedGetters, Int) => Unit
+
+  // `ValueWriter`s for all fields of the schema
+  private val rootFieldWriters: Array[ValueWriter] = schema.map(_.dataType).map(makeWriter).toArray
+
+  private val gen = new JsonFactory().createGenerator(writer).setRootValueSeparator(null)
+
+  private def makeWriter(dataType: DataType): ValueWriter = dataType match {
+    case NullType =>
+      (row: SpecializedGetters, ordinal: Int) =>
+        gen.writeNull()
+
+    case BooleanType =>
+      (row: SpecializedGetters, ordinal: Int) =>
+        gen.writeBoolean(row.getBoolean(ordinal))
+
+    case ByteType =>
+      (row: SpecializedGetters, ordinal: Int) =>
+        gen.writeNumber(row.getByte(ordinal))
+
+    case ShortType =>
+      (row: SpecializedGetters, ordinal: Int) =>
+        gen.writeNumber(row.getShort(ordinal))
+
+    case IntegerType =>
+      (row: SpecializedGetters, ordinal: Int) =>
+        gen.writeNumber(row.getInt(ordinal))
+
+    case LongType =>
+      (row: SpecializedGetters, ordinal: Int) =>
+        gen.writeNumber(row.getLong(ordinal))
+
+    case FloatType =>
+      (row: SpecializedGetters, ordinal: Int) =>
+        gen.writeNumber(row.getFloat(ordinal))
+
+    case DoubleType =>
+      (row: SpecializedGetters, ordinal: Int) =>
+        gen.writeNumber(row.getDouble(ordinal))
+
+    case StringType =>
+      (row: SpecializedGetters, ordinal: Int) =>
+        gen.writeString(row.getUTF8String(ordinal).toString)
+
+    case TimestampType =>
+      (row: SpecializedGetters, ordinal: Int) =>
+        val timestampString =
+          options.timestampFormat.format(DateTimeUtils.toJavaTimestamp(row.getLong(ordinal)))
+        gen.writeString(timestampString)
+
+    case DateType =>
+      (row: SpecializedGetters, ordinal: Int) =>
+        val dateString =
+          options.dateFormat.format(DateTimeUtils.toJavaDate(row.getInt(ordinal)))
+        gen.writeString(dateString)
+
+    case BinaryType =>
+      (row: SpecializedGetters, ordinal: Int) =>
+        gen.writeBinary(row.getBinary(ordinal))
+
+    case dt: DecimalType =>
+      (row: SpecializedGetters, ordinal: Int) =>
+        gen.writeNumber(row.getDecimal(ordinal, dt.precision, dt.scale).toJavaBigDecimal)
+
+    case st: StructType =>
+      val fieldWriters = st.map(_.dataType).map(makeWriter)
+      (row: SpecializedGetters, ordinal: Int) =>
+        writeObject(writeFields(row.getStruct(ordinal, st.length), st, fieldWriters))
+
+    case at: ArrayType =>
+      val elementWriter = makeWriter(at.elementType)
+      (row: SpecializedGetters, ordinal: Int) =>
+        writeArray(writeArrayData(row.getArray(ordinal), elementWriter))
+
+    case mt: MapType =>
+      val valueWriter = makeWriter(mt.valueType)
+      (row: SpecializedGetters, ordinal: Int) =>
+        writeObject(writeMapData(row.getMap(ordinal), mt, valueWriter))
+
+    // For UDT values, they should be in the SQL type's corresponding value type.
+    // We should not see values in the user-defined class at here.
+    // For example, VectorUDT's SQL type is an array of double. So, we should expect that v is
+    // an ArrayData at here, instead of a Vector.
+    case t: UserDefinedType[_] =>
+      makeWriter(t.sqlType)
+
+    case _ =>
+      (row: SpecializedGetters, ordinal: Int) =>
+        val v = row.get(ordinal, dataType)
+        sys.error(s"Failed to convert value $v (class of ${v.getClass}}) " +
+          s"with the type of $dataType to JSON.")
+  }
+
+  private def writeObject(f: => Unit): Unit = {
+    gen.writeStartObject()
+    f
+    gen.writeEndObject()
+  }
+
+  private def writeFields(
+      row: InternalRow, schema: StructType, fieldWriters: Seq[ValueWriter]): Unit = {
+    var i = 0
+    while (i < row.numFields) {
+      val field = schema(i)
+      if (!row.isNullAt(i)) {
+        gen.writeFieldName(field.name)
+        fieldWriters(i).apply(row, i)
+      }
+      i += 1
+    }
+  }
+
+  private def writeArray(f: => Unit): Unit = {
+    gen.writeStartArray()
+    f
+    gen.writeEndArray()
+  }
+
+  private def writeArrayData(
+      array: ArrayData, fieldWriter: ValueWriter): Unit = {
+    var i = 0
+    while (i < array.numElements()) {
+      if (!array.isNullAt(i)) {
+        fieldWriter.apply(array, i)
+      } else {
+        gen.writeNull()
+      }
+      i += 1
+    }
+  }
+
+  private def writeMapData(
+      map: MapData, mapType: MapType, fieldWriter: ValueWriter): Unit = {
+    val keyArray = map.keyArray()
+    val valueArray = map.valueArray()
+    var i = 0
+    while (i < map.numElements()) {
+      gen.writeFieldName(keyArray.get(i, mapType.keyType).toString)
+      if (!valueArray.isNullAt(i)) {
+        fieldWriter.apply(valueArray, i)
+      } else {
+        gen.writeNull()
+      }
+      i += 1
+    }
+  }
+
+  def close(): Unit = gen.close()
+
+  def flush(): Unit = gen.flush()
+
+  /**
+   * Transforms a single InternalRow to JSON using Jackson
+   *
+   * @param row The row to convert
+   */
+  def write(row: InternalRow): Unit = {
+    writeObject {
+      writeFields(row, schema, rootFieldWriters)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/01dd0083/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala
index c4d9abb..3b23c6c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.catalyst.json
 
 import com.fasterxml.jackson.core.{JsonParser, JsonToken}
 
+import org.apache.spark.sql.types._
+
 object JacksonUtils {
   /**
    * Advance the parser until a null or a specific token is found
@@ -29,4 +31,28 @@ object JacksonUtils {
       case x => x != stopOn
     }
   }
+
+  /**
+   * Verify if the schema is supported in JSON parsing.
+   */
+  def verifySchema(schema: StructType): Unit = {
+    def verifyType(name: String, dataType: DataType): Unit = dataType match {
+      case NullType | BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType |
+           DoubleType | StringType | TimestampType | DateType | BinaryType | _: DecimalType =>
+
+      case st: StructType => st.foreach(field => verifyType(field.name, field.dataType))
+
+      case at: ArrayType => verifyType(name, at.elementType)
+
+      case mt: MapType => verifyType(name, mt.keyType)
+
+      case udt: UserDefinedType[_] => verifyType(name, udt.sqlType)
+
+      case _ =>
+        throw new UnsupportedOperationException(
+          s"Unable to convert column $name of type ${dataType.simpleString} to JSON.")
+    }
+
+    schema.foreach(field => verifyType(field.name, field.dataType))
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/01dd0083/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
index 8462393..f9db649 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
@@ -343,4 +343,13 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
       null
     )
   }
+
+  test("to_json") {
+    val schema = StructType(StructField("a", IntegerType) :: Nil)
+    val struct = Literal.create(create_row(1), schema)
+    checkEvaluation(
+      StructToJson(Map.empty, struct),
+      """{"a":1}"""
+    )
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/01dd0083/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 6e0a247..eb2b20a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -37,6 +37,7 @@ import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.encoders._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.json.JacksonGenerator
 import org.apache.spark.sql.catalyst.optimizer.CombineUnions
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
@@ -45,7 +46,6 @@ import org.apache.spark.sql.catalyst.util.usePrettyExpression
 import org.apache.spark.sql.execution.{FileRelation, LogicalRDD, QueryExecution, SQLExecution}
 import org.apache.spark.sql.execution.command.{CreateViewCommand, ExplainCommand, GlobalTempView, LocalTempView}
 import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.execution.datasources.json.JacksonGenerator
 import org.apache.spark.sql.execution.python.EvaluatePython
 import org.apache.spark.sql.streaming.DataStreamWriter
 import org.apache.spark.sql.types._

http://git-wip-us.apache.org/repos/asf/spark/blob/01dd0083/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
deleted file mode 100644
index 5b55b70..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources.json
-
-import java.io.Writer
-
-import com.fasterxml.jackson.core._
-
-import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.json.JSONOptions
-import org.apache.spark.sql.catalyst.util.{ArrayData, DateTimeUtils, MapData}
-import org.apache.spark.sql.types._
-
-private[sql] class JacksonGenerator(
-    schema: StructType,
-    writer: Writer,
-    options: JSONOptions = new JSONOptions(Map.empty[String, String])) {
-  // A `ValueWriter` is responsible for writing a field of an `InternalRow` to appropriate
-  // JSON data. Here we are using `SpecializedGetters` rather than `InternalRow` so that
-  // we can directly access data in `ArrayData` without the help of `SpecificMutableRow`.
-  private type ValueWriter = (SpecializedGetters, Int) => Unit
-
-  // `ValueWriter`s for all fields of the schema
-  private val rootFieldWriters: Array[ValueWriter] = schema.map(_.dataType).map(makeWriter).toArray
-
-  private val gen = new JsonFactory().createGenerator(writer).setRootValueSeparator(null)
-
-  private def makeWriter(dataType: DataType): ValueWriter = dataType match {
-    case NullType =>
-      (row: SpecializedGetters, ordinal: Int) =>
-        gen.writeNull()
-
-    case BooleanType =>
-      (row: SpecializedGetters, ordinal: Int) =>
-        gen.writeBoolean(row.getBoolean(ordinal))
-
-    case ByteType =>
-      (row: SpecializedGetters, ordinal: Int) =>
-        gen.writeNumber(row.getByte(ordinal))
-
-    case ShortType =>
-      (row: SpecializedGetters, ordinal: Int) =>
-        gen.writeNumber(row.getShort(ordinal))
-
-    case IntegerType =>
-      (row: SpecializedGetters, ordinal: Int) =>
-        gen.writeNumber(row.getInt(ordinal))
-
-    case LongType =>
-      (row: SpecializedGetters, ordinal: Int) =>
-        gen.writeNumber(row.getLong(ordinal))
-
-    case FloatType =>
-      (row: SpecializedGetters, ordinal: Int) =>
-        gen.writeNumber(row.getFloat(ordinal))
-
-    case DoubleType =>
-      (row: SpecializedGetters, ordinal: Int) =>
-        gen.writeNumber(row.getDouble(ordinal))
-
-    case StringType =>
-      (row: SpecializedGetters, ordinal: Int) =>
-        gen.writeString(row.getUTF8String(ordinal).toString)
-
-    case TimestampType =>
-      (row: SpecializedGetters, ordinal: Int) =>
-        val timestampString =
-          options.timestampFormat.format(DateTimeUtils.toJavaTimestamp(row.getLong(ordinal)))
-        gen.writeString(timestampString)
-
-    case DateType =>
-      (row: SpecializedGetters, ordinal: Int) =>
-        val dateString =
-          options.dateFormat.format(DateTimeUtils.toJavaDate(row.getInt(ordinal)))
-        gen.writeString(dateString)
-
-    case BinaryType =>
-      (row: SpecializedGetters, ordinal: Int) =>
-        gen.writeBinary(row.getBinary(ordinal))
-
-    case dt: DecimalType =>
-      (row: SpecializedGetters, ordinal: Int) =>
-        gen.writeNumber(row.getDecimal(ordinal, dt.precision, dt.scale).toJavaBigDecimal)
-
-    case st: StructType =>
-      val fieldWriters = st.map(_.dataType).map(makeWriter)
-      (row: SpecializedGetters, ordinal: Int) =>
-        writeObject(writeFields(row.getStruct(ordinal, st.length), st, fieldWriters))
-
-    case at: ArrayType =>
-      val elementWriter = makeWriter(at.elementType)
-      (row: SpecializedGetters, ordinal: Int) =>
-        writeArray(writeArrayData(row.getArray(ordinal), elementWriter))
-
-    case mt: MapType =>
-      val valueWriter = makeWriter(mt.valueType)
-      (row: SpecializedGetters, ordinal: Int) =>
-        writeObject(writeMapData(row.getMap(ordinal), mt, valueWriter))
-
-    // For UDT values, they should be in the SQL type's corresponding value type.
-    // We should not see values in the user-defined class at here.
-    // For example, VectorUDT's SQL type is an array of double. So, we should expect that v is
-    // an ArrayData at here, instead of a Vector.
-    case t: UserDefinedType[_] =>
-      makeWriter(t.sqlType)
-
-    case _ =>
-      (row: SpecializedGetters, ordinal: Int) =>
-        val v = row.get(ordinal, dataType)
-        sys.error(s"Failed to convert value $v (class of ${v.getClass}}) " +
-          s"with the type of $dataType to JSON.")
-  }
-
-  private def writeObject(f: => Unit): Unit = {
-    gen.writeStartObject()
-    f
-    gen.writeEndObject()
-  }
-
-  private def writeFields(
-      row: InternalRow, schema: StructType, fieldWriters: Seq[ValueWriter]): Unit = {
-    var i = 0
-    while (i < row.numFields) {
-      val field = schema(i)
-      if (!row.isNullAt(i)) {
-        gen.writeFieldName(field.name)
-        fieldWriters(i).apply(row, i)
-      }
-      i += 1
-    }
-  }
-
-  private def writeArray(f: => Unit): Unit = {
-    gen.writeStartArray()
-    f
-    gen.writeEndArray()
-  }
-
-  private def writeArrayData(
-      array: ArrayData, fieldWriter: ValueWriter): Unit = {
-    var i = 0
-    while (i < array.numElements()) {
-      if (!array.isNullAt(i)) {
-        fieldWriter.apply(array, i)
-      } else {
-        gen.writeNull()
-      }
-      i += 1
-    }
-  }
-
-  private def writeMapData(
-      map: MapData, mapType: MapType, fieldWriter: ValueWriter): Unit = {
-    val keyArray = map.keyArray()
-    val valueArray = map.valueArray()
-    var i = 0
-    while (i < map.numElements()) {
-      gen.writeFieldName(keyArray.get(i, mapType.keyType).toString)
-      if (!valueArray.isNullAt(i)) {
-        fieldWriter.apply(valueArray, i)
-      } else {
-        gen.writeNull()
-      }
-      i += 1
-    }
-  }
-
-  def close(): Unit = gen.close()
-
-  def flush(): Unit = gen.flush()
-
-  /**
-   * Transforms a single InternalRow to JSON using Jackson
-   *
-   * @param row The row to convert
-   */
-  def write(row: InternalRow): Unit = {
-    writeObject {
-      writeFields(row, schema, rootFieldWriters)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/01dd0083/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
index 5a409c0..0e38aef 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
@@ -32,7 +32,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptions}
+import org.apache.spark.sql.catalyst.json.{JacksonGenerator, JacksonParser, JSONOptions}
 import org.apache.spark.sql.catalyst.util.CompressionCodecs
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.text.TextOutputWriter

http://git-wip-us.apache.org/repos/asf/spark/blob/01dd0083/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 5f1efd2..944a476 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -2883,10 +2883,10 @@ object functions {
    * (Scala-specific) Parses a column containing a JSON string into a [[StructType]] with the
    * specified schema. Returns `null`, in the case of an unparseable string.
    *
+   * @param e a string column containing JSON data.
    * @param schema the schema to use when parsing the json string
    * @param options options to control how the json is parsed. accepts the same options and the
    *                json data source.
-   * @param e a string column containing JSON data.
    *
    * @group collection_funcs
    * @since 2.1.0
@@ -2936,6 +2936,48 @@ object functions {
   def from_json(e: Column, schema: String, options: java.util.Map[String, String]): Column =
     from_json(e, DataType.fromJson(schema).asInstanceOf[StructType], options)
 
+
+  /**
+   * (Scala-specific) Converts a column containing a [[StructType]] into a JSON string with the
+   * specified schema. Throws an exception, in the case of an unsupported type.
+   *
+   * @param e a struct column.
+   * @param options options to control how the struct column is converted into a json string.
+   *                accepts the same options and the json data source.
+   *
+   * @group collection_funcs
+   * @since 2.1.0
+   */
+  def to_json(e: Column, options: Map[String, String]): Column = withExpr {
+    StructToJson(options, e.expr)
+  }
+
+  /**
+   * (Java-specific) Converts a column containing a [[StructType]] into a JSON string with the
+   * specified schema. Throws an exception, in the case of an unsupported type.
+   *
+   * @param e a struct column.
+   * @param options options to control how the struct column is converted into a json string.
+   *                accepts the same options and the json data source.
+   *
+   * @group collection_funcs
+   * @since 2.1.0
+   */
+  def to_json(e: Column, options: java.util.Map[String, String]): Column =
+    to_json(e, options.asScala.toMap)
+
+  /**
+   * Converts a column containing a [[StructType]] into a JSON string with the
+   * specified schema. Throws an exception, in the case of an unsupported type.
+   *
+   * @param e a struct column.
+   *
+   * @group collection_funcs
+   * @since 2.1.0
+   */
+  def to_json(e: Column): Column =
+    to_json(e, Map.empty[String, String])
+
   /**
    * Returns length of array or map.
    *

http://git-wip-us.apache.org/repos/asf/spark/blob/01dd0083/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
index 518d6e9..59ae889 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
@@ -17,9 +17,9 @@
 
 package org.apache.spark.sql
 
-import org.apache.spark.sql.functions.from_json
+import org.apache.spark.sql.functions.{from_json, struct, to_json}
 import org.apache.spark.sql.test.SharedSQLContext
-import org.apache.spark.sql.types.{IntegerType, StructType}
+import org.apache.spark.sql.types.{CalendarIntervalType, IntegerType, StructType}
 
 class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
   import testImplicits._
@@ -31,7 +31,6 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
       Row("alice", "5"))
   }
 
-
   val tuples: Seq[(String, String)] =
     ("1", """{"f1": "value1", "f2": "value2", "f3": 3, "f5": 5.23}""") ::
     ("2", """{"f1": "value12", "f3": "value3", "f2": 2, "f4": 4.01}""") ::
@@ -97,7 +96,7 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
     checkAnswer(expr, expected)
   }
 
-  test("json_parser") {
+  test("from_json") {
     val df = Seq("""{"a": 1}""").toDS()
     val schema = new StructType().add("a", IntegerType)
 
@@ -106,7 +105,7 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
       Row(Row(1)) :: Nil)
   }
 
-  test("json_parser missing columns") {
+  test("from_json missing columns") {
     val df = Seq("""{"a": 1}""").toDS()
     val schema = new StructType().add("b", IntegerType)
 
@@ -115,7 +114,7 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
       Row(Row(null)) :: Nil)
   }
 
-  test("json_parser invalid json") {
+  test("from_json invalid json") {
     val df = Seq("""{"a" 1}""").toDS()
     val schema = new StructType().add("a", IntegerType)
 
@@ -123,4 +122,23 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
       df.select(from_json($"value", schema)),
       Row(null) :: Nil)
   }
+
+  test("to_json") {
+    val df = Seq(Tuple1(Tuple1(1))).toDF("a")
+
+    checkAnswer(
+      df.select(to_json($"a")),
+      Row("""{"_1":1}""") :: Nil)
+  }
+
+  test("to_json unsupported type") {
+    val df = Seq(Tuple1(Tuple1("interval -3 month 7 hours"))).toDF("a")
+      .select(struct($"a._1".cast(CalendarIntervalType).as("a")).as("c"))
+    val e = intercept[AnalysisException]{
+      // Unsupported type throws an exception
+      df.select(to_json($"c")).collect()
+    }
+    assert(e.getMessage.contains(
+      "Unable to convert column a of type calendarinterval to JSON."))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org