You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by fe...@apache.org on 2017/07/10 17:40:07 UTC

spark git commit: [SPARK-21266][R][PYTHON] Support schema a DDL-formatted string in dapply/gapply/from_json

Repository: spark
Updated Branches:
  refs/heads/master 18b3b00ec -> 2bfd5accd


[SPARK-21266][R][PYTHON] Support schema a DDL-formatted string in dapply/gapply/from_json

## What changes were proposed in this pull request?

This PR supports schema in a DDL formatted string for `from_json` in R/Python and `dapply` and `gapply` in R, which are commonly used and/or consistent with Scala APIs.

Additionally, this PR exposes `structType` in R to allow working around in other possible corner cases.

**Python**

`from_json`

```python
from pyspark.sql.functions import from_json

data = [(1, '''{"a": 1}''')]
df = spark.createDataFrame(data, ("key", "value"))
df.select(from_json(df.value, "a INT").alias("json")).show()
```

**R**

`from_json`

```R
df <- sql("SELECT named_struct('name', 'Bob') as people")
df <- mutate(df, people_json = to_json(df$people))
head(select(df, from_json(df$people_json, "name STRING")))
```

`structType.character`

```R
structType("a STRING, b INT")
```

`dapply`

```R
dapply(createDataFrame(list(list(1.0)), "a"), function(x) {x}, "a DOUBLE")
```

`gapply`

```R
gapply(createDataFrame(list(list(1.0)), "a"), "a", function(key, x) { x }, "a DOUBLE")
```

## How was this patch tested?

Doc tests for `from_json` in Python and unit tests `test_sparkSQL.R` in R.

Author: hyukjinkwon <gu...@gmail.com>

Closes #18498 from HyukjinKwon/SPARK-21266.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2bfd5acc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2bfd5acc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2bfd5acc

Branch: refs/heads/master
Commit: 2bfd5accdce2ae31feeeddf213a019cf8ec97663
Parents: 18b3b00
Author: hyukjinkwon <gu...@gmail.com>
Authored: Mon Jul 10 10:40:03 2017 -0700
Committer: Felix Cheung <fe...@apache.org>
Committed: Mon Jul 10 10:40:03 2017 -0700

----------------------------------------------------------------------
 R/pkg/NAMESPACE                                 |   2 +
 R/pkg/R/DataFrame.R                             |  36 ++++-
 R/pkg/R/functions.R                             |  12 +-
 R/pkg/R/group.R                                 |   3 +
 R/pkg/R/schema.R                                |  29 +++-
 R/pkg/tests/fulltests/test_sparkSQL.R           | 136 +++++++++++--------
 python/pyspark/sql/functions.py                 |  11 +-
 .../scala/org/apache/spark/sql/functions.scala  |   7 +-
 8 files changed, 160 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2bfd5acc/R/pkg/NAMESPACE
----------------------------------------------------------------------
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index b7fdae5..232f5cf 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -429,6 +429,7 @@ export("structField",
        "structField.character",
        "print.structField",
        "structType",
+       "structType.character",
        "structType.jobj",
        "structType.structField",
        "print.structType")
@@ -465,5 +466,6 @@ S3method(print, summary.GBTRegressionModel)
 S3method(print, summary.GBTClassificationModel)
 S3method(structField, character)
 S3method(structField, jobj)
+S3method(structType, character)
 S3method(structType, jobj)
 S3method(structType, structField)

http://git-wip-us.apache.org/repos/asf/spark/blob/2bfd5acc/R/pkg/R/DataFrame.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index 3b9d42d..e7a166c 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -1391,6 +1391,10 @@ setMethod("summarize",
           })
 
 dapplyInternal <- function(x, func, schema) {
+  if (is.character(schema)) {
+    schema <- structType(schema)
+  }
+
   packageNamesArr <- serialize(.sparkREnv[[".packages"]],
                                connection = NULL)
 
@@ -1408,6 +1412,8 @@ dapplyInternal <- function(x, func, schema) {
   dataFrame(sdf)
 }
 
+setClassUnion("characterOrstructType", c("character", "structType"))
+
 #' dapply
 #'
 #' Apply a function to each partition of a SparkDataFrame.
@@ -1418,10 +1424,11 @@ dapplyInternal <- function(x, func, schema) {
 #'             to each partition will be passed.
 #'             The output of func should be a R data.frame.
 #' @param schema The schema of the resulting SparkDataFrame after the function is applied.
-#'               It must match the output of func.
+#'               It must match the output of func. Since Spark 2.3, the DDL-formatted string
+#'               is also supported for the schema.
 #' @family SparkDataFrame functions
 #' @rdname dapply
-#' @aliases dapply,SparkDataFrame,function,structType-method
+#' @aliases dapply,SparkDataFrame,function,characterOrstructType-method
 #' @name dapply
 #' @seealso \link{dapplyCollect}
 #' @export
@@ -1444,6 +1451,17 @@ dapplyInternal <- function(x, func, schema) {
 #'              y <- cbind(y, y[1] + 1L)
 #'            },
 #'            schema)
+#'
+#'   # The schema also can be specified in a DDL-formatted string.
+#'   schema <- "a INT, d DOUBLE, c STRING, d INT"
+#'   df1 <- dapply(
+#'            df,
+#'            function(x) {
+#'              y <- x[x[1] > 1, ]
+#'              y <- cbind(y, y[1] + 1L)
+#'            },
+#'            schema)
+#'
 #'   collect(df1)
 #'   # the result
 #'   #       a b c d
@@ -1452,7 +1470,7 @@ dapplyInternal <- function(x, func, schema) {
 #' }
 #' @note dapply since 2.0.0
 setMethod("dapply",
-          signature(x = "SparkDataFrame", func = "function", schema = "structType"),
+          signature(x = "SparkDataFrame", func = "function", schema = "characterOrstructType"),
           function(x, func, schema) {
             dapplyInternal(x, func, schema)
           })
@@ -1522,6 +1540,7 @@ setMethod("dapplyCollect",
 #' @param schema the schema of the resulting SparkDataFrame after the function is applied.
 #'               The schema must match to output of \code{func}. It has to be defined for each
 #'               output column with preferred output column name and corresponding data type.
+#'               Since Spark 2.3, the DDL-formatted string is also supported for the schema.
 #' @return A SparkDataFrame.
 #' @family SparkDataFrame functions
 #' @aliases gapply,SparkDataFrame-method
@@ -1541,7 +1560,7 @@ setMethod("dapplyCollect",
 #'
 #' Here our output contains three columns, the key which is a combination of two
 #' columns with data types integer and string and the mean which is a double.
-#' schema <-  structType(structField("a", "integer"), structField("c", "string"),
+#' schema <- structType(structField("a", "integer"), structField("c", "string"),
 #'   structField("avg", "double"))
 #' result <- gapply(
 #'   df,
@@ -1550,6 +1569,15 @@ setMethod("dapplyCollect",
 #'     y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
 #' }, schema)
 #'
+#' The schema also can be specified in a DDL-formatted string.
+#' schema <- "a INT, c STRING, avg DOUBLE"
+#' result <- gapply(
+#'   df,
+#'   c("a", "c"),
+#'   function(key, x) {
+#'     y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
+#' }, schema)
+#'
 #' We can also group the data and afterwards call gapply on GroupedData.
 #' For Example:
 #' gdf <- group_by(df, "a", "c")

http://git-wip-us.apache.org/repos/asf/spark/blob/2bfd5acc/R/pkg/R/functions.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R
index f28d26a..86507f1 100644
--- a/R/pkg/R/functions.R
+++ b/R/pkg/R/functions.R
@@ -2174,8 +2174,9 @@ setMethod("date_format", signature(y = "Column", x = "character"),
 #'
 #' @rdname column_collection_functions
 #' @param schema a structType object to use as the schema to use when parsing the JSON string.
+#'               Since Spark 2.3, the DDL-formatted string is also supported for the schema.
 #' @param as.json.array indicating if input string is JSON array of objects or a single object.
-#' @aliases from_json from_json,Column,structType-method
+#' @aliases from_json from_json,Column,characterOrstructType-method
 #' @export
 #' @examples
 #'
@@ -2188,10 +2189,15 @@ setMethod("date_format", signature(y = "Column", x = "character"),
 #' df2 <- sql("SELECT named_struct('name', 'Bob') as people")
 #' df2 <- mutate(df2, people_json = to_json(df2$people))
 #' schema <- structType(structField("name", "string"))
-#' head(select(df2, from_json(df2$people_json, schema)))}
+#' head(select(df2, from_json(df2$people_json, schema)))
+#' head(select(df2, from_json(df2$people_json, "name STRING")))}
 #' @note from_json since 2.2.0
-setMethod("from_json", signature(x = "Column", schema = "structType"),
+setMethod("from_json", signature(x = "Column", schema = "characterOrstructType"),
           function(x, schema, as.json.array = FALSE, ...) {
+            if (is.character(schema)) {
+              schema <- structType(schema)
+            }
+
             if (as.json.array) {
               jschema <- callJStatic("org.apache.spark.sql.types.DataTypes",
                                      "createArrayType",

http://git-wip-us.apache.org/repos/asf/spark/blob/2bfd5acc/R/pkg/R/group.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/group.R b/R/pkg/R/group.R
index 17f5283..0a7be0e 100644
--- a/R/pkg/R/group.R
+++ b/R/pkg/R/group.R
@@ -233,6 +233,9 @@ setMethod("gapplyCollect",
           })
 
 gapplyInternal <- function(x, func, schema) {
+  if (is.character(schema)) {
+    schema <- structType(schema)
+  }
   packageNamesArr <- serialize(.sparkREnv[[".packages"]],
                        connection = NULL)
   broadcastArr <- lapply(ls(.broadcastNames),

http://git-wip-us.apache.org/repos/asf/spark/blob/2bfd5acc/R/pkg/R/schema.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/schema.R b/R/pkg/R/schema.R
index cb5bdb9..d1ed683 100644
--- a/R/pkg/R/schema.R
+++ b/R/pkg/R/schema.R
@@ -23,18 +23,24 @@
 #' Create a structType object that contains the metadata for a SparkDataFrame. Intended for
 #' use with createDataFrame and toDF.
 #'
-#' @param x a structField object (created with the field() function)
+#' @param x a structField object (created with the \code{structField} method). Since Spark 2.3,
+#'          this can be a DDL-formatted string, which is a comma separated list of field
+#'          definitions, e.g., "a INT, b STRING".
 #' @param ... additional structField objects
 #' @return a structType object
 #' @rdname structType
 #' @export
 #' @examples
 #'\dontrun{
-#' schema <-  structType(structField("a", "integer"), structField("c", "string"),
+#' schema <- structType(structField("a", "integer"), structField("c", "string"),
 #'                       structField("avg", "double"))
 #' df1 <- gapply(df, list("a", "c"),
 #'               function(key, x) { y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE) },
 #'               schema)
+#' schema <- structType("a INT, c STRING, avg DOUBLE")
+#' df1 <- gapply(df, list("a", "c"),
+#'               function(key, x) { y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE) },
+#'               schema)
 #' }
 #' @note structType since 1.4.0
 structType <- function(x, ...) {
@@ -68,6 +74,23 @@ structType.structField <- function(x, ...) {
   structType(stObj)
 }
 
+#' @rdname structType
+#' @method structType character
+#' @export
+structType.character <- function(x, ...) {
+  if (!is.character(x)) {
+    stop("schema must be a DDL-formatted string.")
+  }
+  if (length(list(...)) > 0) {
+    stop("multiple DDL-formatted strings are not supported")
+  }
+
+  stObj <- handledCallJStatic("org.apache.spark.sql.types.StructType",
+                              "fromDDL",
+                              x)
+  structType(stObj)
+}
+
 #' Print a Spark StructType.
 #'
 #' This function prints the contents of a StructType returned from the
@@ -102,7 +125,7 @@ print.structType <- function(x, ...) {
 #' field1 <- structField("a", "integer")
 #' field2 <- structField("c", "string")
 #' field3 <- structField("avg", "double")
-#' schema <-  structType(field1, field2, field3)
+#' schema <- structType(field1, field2, field3)
 #' df1 <- gapply(df, list("a", "c"),
 #'               function(key, x) { y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE) },
 #'               schema)

http://git-wip-us.apache.org/repos/asf/spark/blob/2bfd5acc/R/pkg/tests/fulltests/test_sparkSQL.R
----------------------------------------------------------------------
diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R
index a2bcb5a..77052d4 100644
--- a/R/pkg/tests/fulltests/test_sparkSQL.R
+++ b/R/pkg/tests/fulltests/test_sparkSQL.R
@@ -146,6 +146,13 @@ test_that("structType and structField", {
   expect_is(testSchema, "structType")
   expect_is(testSchema$fields()[[2]], "structField")
   expect_equal(testSchema$fields()[[1]]$dataType.toString(), "StringType")
+
+  testSchema <- structType("a STRING, b INT")
+  expect_is(testSchema, "structType")
+  expect_is(testSchema$fields()[[2]], "structField")
+  expect_equal(testSchema$fields()[[1]]$dataType.toString(), "StringType")
+
+  expect_error(structType("A stri"), "DataType stri is not supported.")
 })
 
 test_that("structField type strings", {
@@ -1480,13 +1487,15 @@ test_that("column functions", {
   j <- collect(select(df, alias(to_json(df$info), "json")))
   expect_equal(j[order(j$json), ][1], "{\"age\":16,\"height\":176.5}")
   df <- as.DataFrame(j)
-  schema <- structType(structField("age", "integer"),
-                       structField("height", "double"))
-  s <- collect(select(df, alias(from_json(df$json, schema), "structcol")))
-  expect_equal(ncol(s), 1)
-  expect_equal(nrow(s), 3)
-  expect_is(s[[1]][[1]], "struct")
-  expect_true(any(apply(s, 1, function(x) { x[[1]]$age == 16 } )))
+  schemas <- list(structType(structField("age", "integer"), structField("height", "double")),
+                  "age INT, height DOUBLE")
+  for (schema in schemas) {
+    s <- collect(select(df, alias(from_json(df$json, schema), "structcol")))
+    expect_equal(ncol(s), 1)
+    expect_equal(nrow(s), 3)
+    expect_is(s[[1]][[1]], "struct")
+    expect_true(any(apply(s, 1, function(x) { x[[1]]$age == 16 } )))
+  }
 
   # passing option
   df <- as.DataFrame(list(list("col" = "{\"date\":\"21/10/2014\"}")))
@@ -1504,14 +1513,15 @@ test_that("column functions", {
   # check if array type in string is correctly supported.
   jsonArr <- "[{\"name\":\"Bob\"}, {\"name\":\"Alice\"}]"
   df <- as.DataFrame(list(list("people" = jsonArr)))
-  schema <- structType(structField("name", "string"))
-  arr <- collect(select(df, alias(from_json(df$people, schema, as.json.array = TRUE), "arrcol")))
-  expect_equal(ncol(arr), 1)
-  expect_equal(nrow(arr), 1)
-  expect_is(arr[[1]][[1]], "list")
-  expect_equal(length(arr$arrcol[[1]]), 2)
-  expect_equal(arr$arrcol[[1]][[1]]$name, "Bob")
-  expect_equal(arr$arrcol[[1]][[2]]$name, "Alice")
+  for (schema in list(structType(structField("name", "string")), "name STRING")) {
+    arr <- collect(select(df, alias(from_json(df$people, schema, as.json.array = TRUE), "arrcol")))
+    expect_equal(ncol(arr), 1)
+    expect_equal(nrow(arr), 1)
+    expect_is(arr[[1]][[1]], "list")
+    expect_equal(length(arr$arrcol[[1]]), 2)
+    expect_equal(arr$arrcol[[1]][[1]]$name, "Bob")
+    expect_equal(arr$arrcol[[1]][[2]]$name, "Alice")
+  }
 
   # Test create_array() and create_map()
   df <- as.DataFrame(data.frame(
@@ -2885,30 +2895,33 @@ test_that("dapply() and dapplyCollect() on a DataFrame", {
   expect_identical(ldf, result)
 
   # Filter and add a column
-  schema <- structType(structField("a", "integer"), structField("b", "double"),
-                       structField("c", "string"), structField("d", "integer"))
-  df1 <- dapply(
-           df,
-           function(x) {
-             y <- x[x$a > 1, ]
-             y <- cbind(y, y$a + 1L)
-           },
-           schema)
-  result <- collect(df1)
-  expected <- ldf[ldf$a > 1, ]
-  expected$d <- expected$a + 1L
-  rownames(expected) <- NULL
-  expect_identical(expected, result)
-
-  result <- dapplyCollect(
-              df,
-              function(x) {
-                y <- x[x$a > 1, ]
-                y <- cbind(y, y$a + 1L)
-              })
-  expected1 <- expected
-  names(expected1) <- names(result)
-  expect_identical(expected1, result)
+  schemas <- list(structType(structField("a", "integer"), structField("b", "double"),
+                             structField("c", "string"), structField("d", "integer")),
+                  "a INT, b DOUBLE, c STRING, d INT")
+  for (schema in schemas) {
+    df1 <- dapply(
+             df,
+             function(x) {
+               y <- x[x$a > 1, ]
+               y <- cbind(y, y$a + 1L)
+             },
+             schema)
+    result <- collect(df1)
+    expected <- ldf[ldf$a > 1, ]
+    expected$d <- expected$a + 1L
+    rownames(expected) <- NULL
+    expect_identical(expected, result)
+
+    result <- dapplyCollect(
+                df,
+                function(x) {
+                  y <- x[x$a > 1, ]
+                  y <- cbind(y, y$a + 1L)
+                })
+    expected1 <- expected
+    names(expected1) <- names(result)
+    expect_identical(expected1, result)
+  }
 
   # Remove the added column
   df2 <- dapply(
@@ -3020,29 +3033,32 @@ test_that("gapply() and gapplyCollect() on a DataFrame", {
 
   # Computes the sum of second column by grouping on the first and third columns
   # and checks if the sum is larger than 2
-  schema <- structType(structField("a", "integer"), structField("e", "boolean"))
-  df2 <- gapply(
-    df,
-    c(df$"a", df$"c"),
-    function(key, x) {
-      y <- data.frame(key[1], sum(x$b) > 2)
-    },
-    schema)
-  actual <- collect(df2)$e
-  expected <- c(TRUE, TRUE)
-  expect_identical(actual, expected)
-
-  df2Collect <- gapplyCollect(
-    df,
-    c(df$"a", df$"c"),
-    function(key, x) {
-      y <- data.frame(key[1], sum(x$b) > 2)
-      colnames(y) <- c("a", "e")
-      y
-    })
-    actual <- df2Collect$e
+  schemas <- list(structType(structField("a", "integer"), structField("e", "boolean")),
+                  "a INT, e BOOLEAN")
+  for (schema in schemas) {
+    df2 <- gapply(
+      df,
+      c(df$"a", df$"c"),
+      function(key, x) {
+        y <- data.frame(key[1], sum(x$b) > 2)
+      },
+      schema)
+    actual <- collect(df2)$e
+    expected <- c(TRUE, TRUE)
     expect_identical(actual, expected)
 
+    df2Collect <- gapplyCollect(
+      df,
+      c(df$"a", df$"c"),
+      function(key, x) {
+        y <- data.frame(key[1], sum(x$b) > 2)
+        colnames(y) <- c("a", "e")
+        y
+      })
+      actual <- df2Collect$e
+      expect_identical(actual, expected)
+  }
+
   # Computes the arithmetic mean of the second column by grouping
   # on the first and third columns. Output the groupping value and the average.
   schema <-  structType(structField("a", "integer"), structField("c", "string"),

http://git-wip-us.apache.org/repos/asf/spark/blob/2bfd5acc/python/pyspark/sql/functions.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 5d8ded8..f3e7d03 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -1883,15 +1883,20 @@ def from_json(col, schema, options={}):
     string.
 
     :param col: string column in json format
-    :param schema: a StructType or ArrayType of StructType to use when parsing the json column
+    :param schema: a StructType or ArrayType of StructType to use when parsing the json column.
     :param options: options to control parsing. accepts the same options as the json datasource
 
+    .. note:: Since Spark 2.3, the DDL-formatted string or a JSON format string is also
+              supported for ``schema``.
+
     >>> from pyspark.sql.types import *
     >>> data = [(1, '''{"a": 1}''')]
     >>> schema = StructType([StructField("a", IntegerType())])
     >>> df = spark.createDataFrame(data, ("key", "value"))
     >>> df.select(from_json(df.value, schema).alias("json")).collect()
     [Row(json=Row(a=1))]
+    >>> df.select(from_json(df.value, "a INT").alias("json")).collect()
+    [Row(json=Row(a=1))]
     >>> data = [(1, '''[{"a": 1}]''')]
     >>> schema = ArrayType(StructType([StructField("a", IntegerType())]))
     >>> df = spark.createDataFrame(data, ("key", "value"))
@@ -1900,7 +1905,9 @@ def from_json(col, schema, options={}):
     """
 
     sc = SparkContext._active_spark_context
-    jc = sc._jvm.functions.from_json(_to_java_column(col), schema.json(), options)
+    if isinstance(schema, DataType):
+        schema = schema.json()
+    jc = sc._jvm.functions.from_json(_to_java_column(col), schema, options)
     return Column(jc)
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2bfd5acc/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 0c7b483..ebdeb42 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -2114,7 +2114,7 @@ object functions {
    * Calculates the hash code of given columns, and returns the result as an int column.
    *
    * @group misc_funcs
-   * @since 2.0
+   * @since 2.0.0
    */
   @scala.annotation.varargs
   def hash(cols: Column*): Column = withExpr {
@@ -3074,9 +3074,8 @@ object functions {
    * string.
    *
    * @param e a string column containing JSON data.
-   * @param schema the schema to use when parsing the json string as a json string. In Spark 2.1,
-   *               the user-provided schema has to be in JSON format. Since Spark 2.2, the DDL
-   *               format is also supported for the schema.
+   * @param schema the schema to use when parsing the json string as a json string, it could be a
+   *               JSON format string or a DDL-formatted string.
    *
    * @group collection_funcs
    * @since 2.3.0


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org