You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/02/23 02:03:35 UTC

spark git commit: [SPARK-15615][SQL] Add an API to load DataFrame from Dataset[String] storing JSON

Repository: spark
Updated Branches:
  refs/heads/master dc005ed53 -> d3147502e


[SPARK-15615][SQL] Add an API to load DataFrame from Dataset[String] storing JSON

## What changes were proposed in this pull request?

SPARK-15615 proposes replacing the sqlContext.read.json(rdd) with a dataset equivalent.
SPARK-15463 adds a CSV API for reading from Dataset[String] so this keeps the API consistent.
I am deprecating the existing RDD based APIs.

## How was this patch tested?

There are existing tests. I left most tests to use the existing APIs as they delegate to the new json API.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: pj.fanning <pj...@workday.com>
Author: PJ Fanning <pj...@users.noreply.github.com>

Closes #16895 from pjfanning/SPARK-15615.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d3147502
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d3147502
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d3147502

Branch: refs/heads/master
Commit: d3147502e7837d81e27193164b3513bb69fa3797
Parents: dc005ed
Author: pj.fanning <pj...@workday.com>
Authored: Wed Feb 22 18:03:25 2017 -0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Wed Feb 22 18:03:25 2017 -0800

----------------------------------------------------------------------
 .../org/apache/spark/sql/DataFrameReader.scala  | 20 ++++++++++++++++++--
 1 file changed, 18 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d3147502/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index cb9493a..4c1341e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -323,6 +323,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
    * @param jsonRDD input RDD with one JSON object per record
    * @since 1.4.0
    */
+  @deprecated("Use json(Dataset[String]) instead.", "2.2.0")
   def json(jsonRDD: JavaRDD[String]): DataFrame = json(jsonRDD.rdd)
 
   /**
@@ -335,7 +336,22 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
    * @param jsonRDD input RDD with one JSON object per record
    * @since 1.4.0
    */
+  @deprecated("Use json(Dataset[String]) instead.", "2.2.0")
   def json(jsonRDD: RDD[String]): DataFrame = {
+    json(sparkSession.createDataset(jsonRDD)(Encoders.STRING))
+  }
+
+  /**
+   * Loads a `Dataset[String]` storing JSON objects (<a href="http://jsonlines.org/">JSON Lines
+   * text format or newline-delimited JSON</a>) and returns the result as a `DataFrame`.
+   *
+   * Unless the schema is specified using `schema` function, this function goes through the
+   * input once to determine the input schema.
+   *
+   * @param jsonDataset input Dataset with one JSON object per record
+   * @since 2.2.0
+   */
+  def json(jsonDataset: Dataset[String]): DataFrame = {
     val parsedOptions = new JSONOptions(
       extraOptions.toMap,
       sparkSession.sessionState.conf.sessionLocalTimeZone,
@@ -344,12 +360,12 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
 
     val schema = userSpecifiedSchema.getOrElse {
       JsonInferSchema.infer(
-        jsonRDD,
+        jsonDataset.rdd,
         parsedOptions,
         createParser)
     }
 
-    val parsed = jsonRDD.mapPartitions { iter =>
+    val parsed = jsonDataset.rdd.mapPartitions { iter =>
       val parser = new JacksonParser(schema, parsedOptions)
       iter.flatMap(parser.parse(_, createParser, UTF8String.fromString))
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org