You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/03/17 21:52:03 UTC

spark git commit: [SPARK-19967][SQL] Add from_json in FunctionRegistry

Repository: spark
Updated Branches:
  refs/heads/master bfdeea5c6 -> 7de66bae5


[SPARK-19967][SQL] Add from_json in FunctionRegistry

## What changes were proposed in this pull request?
This pr added entries in `FunctionRegistry` and supported `from_json` in SQL.

## How was this patch tested?
Added tests in `JsonFunctionsSuite` and `SQLQueryTestSuite`.

Author: Takeshi Yamamuro <ya...@apache.org>

Closes #17320 from maropu/SPARK-19967.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7de66bae
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7de66bae
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7de66bae

Branch: refs/heads/master
Commit: 7de66bae58733595cb88ec899640f7acf734d5c4
Parents: bfdeea5
Author: Takeshi Yamamuro <ya...@apache.org>
Authored: Fri Mar 17 14:51:59 2017 -0700
Committer: Xiao Li <ga...@gmail.com>
Committed: Fri Mar 17 14:51:59 2017 -0700

----------------------------------------------------------------------
 .../catalyst/analysis/FunctionRegistry.scala    |   1 +
 .../catalyst/expressions/jsonExpressions.scala  |  36 ++++++-
 .../sql-tests/inputs/json-functions.sql         |  13 +++
 .../sql-tests/results/json-functions.sql.out    | 107 ++++++++++++++++++-
 .../apache/spark/sql/JsonFunctionsSuite.scala   |  36 +++++++
 5 files changed, 189 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7de66bae/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index 0dcb440..0486e67 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -426,6 +426,7 @@ object FunctionRegistry {
 
     // json
     expression[StructToJson]("to_json"),
+    expression[JsonToStruct]("from_json"),
 
     // Cast aliases (SPARK-16730)
     castAlias("boolean", BooleanType),

http://git-wip-us.apache.org/repos/asf/spark/blob/7de66bae/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
index 18b5f2f..37e4bb5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
@@ -26,6 +26,7 @@ import com.fasterxml.jackson.core._
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
 import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
+import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.json._
 import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData, ParseModes}
@@ -483,6 +484,17 @@ case class JsonTuple(children: Seq[Expression])
 /**
  * Converts an json input string to a [[StructType]] or [[ArrayType]] with the specified schema.
  */
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = "_FUNC_(jsonStr, schema[, options]) - Returns a struct value with the given `jsonStr` and `schema`.",
+  extended = """
+    Examples:
+      > SELECT _FUNC_('{"a":1, "b":0.8}', 'a INT, b DOUBLE');
+       {"a":1, "b":0.8}
+      > SELECT _FUNC_('{"time":"26/08/2015"}', 'time Timestamp', map('timestampFormat', 'dd/MM/yyyy'));
+       {"time":"2015-08-26 00:00:00.0"}
+  """)
+// scalastyle:on line.size.limit
 case class JsonToStruct(
     schema: DataType,
     options: Map[String, String],
@@ -494,6 +506,21 @@ case class JsonToStruct(
   def this(schema: DataType, options: Map[String, String], child: Expression) =
     this(schema, options, child, None)
 
+  // Used in `FunctionRegistry`
+  def this(child: Expression, schema: Expression) =
+    this(
+      schema = JsonExprUtils.validateSchemaLiteral(schema),
+      options = Map.empty[String, String],
+      child = child,
+      timeZoneId = None)
+
+  def this(child: Expression, schema: Expression, options: Expression) =
+    this(
+      schema = JsonExprUtils.validateSchemaLiteral(schema),
+      options = JsonExprUtils.convertToMapData(options),
+      child = child,
+      timeZoneId = None)
+
   override def checkInputDataTypes(): TypeCheckResult = schema match {
     case _: StructType | ArrayType(_: StructType, _) =>
       super.checkInputDataTypes()
@@ -589,7 +616,7 @@ case class StructToJson(
   def this(child: Expression) = this(Map.empty, child, None)
   def this(child: Expression, options: Expression) =
     this(
-      options = StructToJson.convertToMapData(options),
+      options = JsonExprUtils.convertToMapData(options),
       child = child,
       timeZoneId = None)
 
@@ -634,7 +661,12 @@ case class StructToJson(
   override def inputTypes: Seq[AbstractDataType] = StructType :: Nil
 }
 
-object StructToJson {
+object JsonExprUtils {
+
+  def validateSchemaLiteral(exp: Expression): StructType = exp match {
+    case Literal(s, StringType) => CatalystSqlParser.parseTableSchema(s.toString)
+    case e => throw new AnalysisException(s"Expected a string literal instead of $e")
+  }
 
   def convertToMapData(exp: Expression): Map[String, String] = exp match {
     case m: CreateMap

http://git-wip-us.apache.org/repos/asf/spark/blob/7de66bae/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql b/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql
index 9308560..83243c5 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql
@@ -5,4 +5,17 @@ select to_json(named_struct('a', 1, 'b', 2));
 select to_json(named_struct('time', to_timestamp('2015-08-26', 'yyyy-MM-dd')), map('timestampFormat', 'dd/MM/yyyy'));
 -- Check if errors handled
 select to_json(named_struct('a', 1, 'b', 2), named_struct('mode', 'PERMISSIVE'));
+select to_json(named_struct('a', 1, 'b', 2), map('mode', 1));
 select to_json();
+
+-- from_json
+describe function from_json;
+describe function extended from_json;
+select from_json('{"a":1}', 'a INT');
+select from_json('{"time":"26/08/2015"}', 'time Timestamp', map('timestampFormat', 'dd/MM/yyyy'));
+-- Check if errors handled
+select from_json('{"a":1}', 1);
+select from_json('{"a":1}', 'a InvalidType');
+select from_json('{"a":1}', 'a INT', named_struct('mode', 'PERMISSIVE'));
+select from_json('{"a":1}', 'a INT', map('mode', 1));
+select from_json();

http://git-wip-us.apache.org/repos/asf/spark/blob/7de66bae/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out b/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out
index d8aa4fb..b57cbbc 100644
--- a/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out
@@ -1,5 +1,5 @@
 -- Automatically generated by SQLQueryTestSuite
--- Number of queries: 6
+-- Number of queries: 16
 
 
 -- !query 0
@@ -55,9 +55,112 @@ Must use a map() function for options;; line 1 pos 7
 
 
 -- !query 5
-select to_json()
+select to_json(named_struct('a', 1, 'b', 2), map('mode', 1))
 -- !query 5 schema
 struct<>
 -- !query 5 output
 org.apache.spark.sql.AnalysisException
+A type of keys and values in map() must be string, but got MapType(StringType,IntegerType,false);; line 1 pos 7
+
+
+-- !query 6
+select to_json()
+-- !query 6 schema
+struct<>
+-- !query 6 output
+org.apache.spark.sql.AnalysisException
 Invalid number of arguments for function to_json; line 1 pos 7
+
+
+-- !query 7
+describe function from_json
+-- !query 7 schema
+struct<function_desc:string>
+-- !query 7 output
+Class: org.apache.spark.sql.catalyst.expressions.JsonToStruct
+Function: from_json
+Usage: from_json(jsonStr, schema[, options]) - Returns a struct value with the given `jsonStr` and `schema`.
+
+
+-- !query 8
+describe function extended from_json
+-- !query 8 schema
+struct<function_desc:string>
+-- !query 8 output
+Class: org.apache.spark.sql.catalyst.expressions.JsonToStruct
+Extended Usage:
+    Examples:
+      > SELECT from_json('{"a":1, "b":0.8}', 'a INT, b DOUBLE');
+       {"a":1, "b":0.8}
+      > SELECT from_json('{"time":"26/08/2015"}', 'time Timestamp', map('timestampFormat', 'dd/MM/yyyy'));
+       {"time":"2015-08-26 00:00:00.0"}
+  
+Function: from_json
+Usage: from_json(jsonStr, schema[, options]) - Returns a struct value with the given `jsonStr` and `schema`.
+
+
+-- !query 9
+select from_json('{"a":1}', 'a INT')
+-- !query 9 schema
+struct<jsontostruct({"a":1}):struct<a:int>>
+-- !query 9 output
+{"a":1}
+
+
+-- !query 10
+select from_json('{"time":"26/08/2015"}', 'time Timestamp', map('timestampFormat', 'dd/MM/yyyy'))
+-- !query 10 schema
+struct<jsontostruct({"time":"26/08/2015"}):struct<time:timestamp>>
+-- !query 10 output
+{"time":2015-08-26 00:00:00.0}
+
+
+-- !query 11
+select from_json('{"a":1}', 1)
+-- !query 11 schema
+struct<>
+-- !query 11 output
+org.apache.spark.sql.AnalysisException
+Expected a string literal instead of 1;; line 1 pos 7
+
+
+-- !query 12
+select from_json('{"a":1}', 'a InvalidType')
+-- !query 12 schema
+struct<>
+-- !query 12 output
+org.apache.spark.sql.AnalysisException
+
+DataType invalidtype() is not supported.(line 1, pos 2)
+
+== SQL ==
+a InvalidType
+--^^^
+; line 1 pos 7
+
+
+-- !query 13
+select from_json('{"a":1}', 'a INT', named_struct('mode', 'PERMISSIVE'))
+-- !query 13 schema
+struct<>
+-- !query 13 output
+org.apache.spark.sql.AnalysisException
+Must use a map() function for options;; line 1 pos 7
+
+
+-- !query 14
+select from_json('{"a":1}', 'a INT', map('mode', 1))
+-- !query 14 schema
+struct<>
+-- !query 14 output
+org.apache.spark.sql.AnalysisException
+A type of keys and values in map() must be string, but got MapType(StringType,IntegerType,false);; line 1 pos 7
+
+
+-- !query 15
+select from_json()
+-- !query 15 schema
+struct<>
+-- !query 15 output
+org.apache.spark.sql.AnalysisException
+Invalid number of arguments for function from_json; line 1 pos 7

http://git-wip-us.apache.org/repos/asf/spark/blob/7de66bae/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
index cdea3b9..2345b82 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
@@ -220,4 +220,40 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
     assert(errMsg2.getMessage.startsWith(
       "A type of keys and values in map() must be string, but got"))
   }
+
+  test("SPARK-19967 Support from_json in SQL") {
+    val df1 = Seq("""{"a": 1}""").toDS()
+    checkAnswer(
+      df1.selectExpr("from_json(value, 'a INT')"),
+      Row(Row(1)) :: Nil)
+
+    val df2 = Seq("""{"c0": "a", "c1": 1, "c2": {"c20": 3.8, "c21": 8}}""").toDS()
+    checkAnswer(
+      df2.selectExpr("from_json(value, 'c0 STRING, c1 INT, c2 STRUCT<c20: DOUBLE, c21: INT>')"),
+      Row(Row("a", 1, Row(3.8, 8))) :: Nil)
+
+    val df3 = Seq("""{"time": "26/08/2015 18:00"}""").toDS()
+    checkAnswer(
+      df3.selectExpr(
+        "from_json(value, 'time Timestamp', map('timestampFormat', 'dd/MM/yyyy HH:mm'))"),
+      Row(Row(java.sql.Timestamp.valueOf("2015-08-26 18:00:00.0"))))
+
+    val errMsg1 = intercept[AnalysisException] {
+      df3.selectExpr("from_json(value, 1)")
+    }
+    assert(errMsg1.getMessage.startsWith("Expected a string literal instead of"))
+    val errMsg2 = intercept[AnalysisException] {
+      df3.selectExpr("""from_json(value, 'time InvalidType')""")
+    }
+    assert(errMsg2.getMessage.contains("DataType invalidtype() is not supported"))
+    val errMsg3 = intercept[AnalysisException] {
+      df3.selectExpr("from_json(value, 'time Timestamp', named_struct('a', 1))")
+    }
+    assert(errMsg3.getMessage.startsWith("Must use a map() function for options"))
+    val errMsg4 = intercept[AnalysisException] {
+      df3.selectExpr("from_json(value, 'time Timestamp', map('a', 1))")
+    }
+    assert(errMsg4.getMessage.startsWith(
+      "A type of keys and values in map() must be string, but got"))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org