You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by fe...@apache.org on 2017/07/10 17:40:07 UTC
spark git commit: [SPARK-21266][R][PYTHON] Support schema a
DDL-formatted string in dapply/gapply/from_json
Repository: spark
Updated Branches:
refs/heads/master 18b3b00ec -> 2bfd5accd
[SPARK-21266][R][PYTHON] Support schema a DDL-formatted string in dapply/gapply/from_json
## What changes were proposed in this pull request?
This PR supports schema in a DDL formatted string for `from_json` in R/Python and `dapply` and `gapply` in R, which are commonly used and/or consistent with Scala APIs.
Additionally, this PR exposes `structType` in R to allow working around in other possible corner cases.
**Python**
`from_json`
```python
from pyspark.sql.functions import from_json
data = [(1, '''{"a": 1}''')]
df = spark.createDataFrame(data, ("key", "value"))
df.select(from_json(df.value, "a INT").alias("json")).show()
```
**R**
`from_json`
```R
df <- sql("SELECT named_struct('name', 'Bob') as people")
df <- mutate(df, people_json = to_json(df$people))
head(select(df, from_json(df$people_json, "name STRING")))
```
`structType.character`
```R
structType("a STRING, b INT")
```
`dapply`
```R
dapply(createDataFrame(list(list(1.0)), "a"), function(x) {x}, "a DOUBLE")
```
`gapply`
```R
gapply(createDataFrame(list(list(1.0)), "a"), "a", function(key, x) { x }, "a DOUBLE")
```
## How was this patch tested?
Doc tests for `from_json` in Python and unit tests `test_sparkSQL.R` in R.
Author: hyukjinkwon <gu...@gmail.com>
Closes #18498 from HyukjinKwon/SPARK-21266.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2bfd5acc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2bfd5acc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2bfd5acc
Branch: refs/heads/master
Commit: 2bfd5accdce2ae31feeeddf213a019cf8ec97663
Parents: 18b3b00
Author: hyukjinkwon <gu...@gmail.com>
Authored: Mon Jul 10 10:40:03 2017 -0700
Committer: Felix Cheung <fe...@apache.org>
Committed: Mon Jul 10 10:40:03 2017 -0700
----------------------------------------------------------------------
R/pkg/NAMESPACE | 2 +
R/pkg/R/DataFrame.R | 36 ++++-
R/pkg/R/functions.R | 12 +-
R/pkg/R/group.R | 3 +
R/pkg/R/schema.R | 29 +++-
R/pkg/tests/fulltests/test_sparkSQL.R | 136 +++++++++++--------
python/pyspark/sql/functions.py | 11 +-
.../scala/org/apache/spark/sql/functions.scala | 7 +-
8 files changed, 160 insertions(+), 76 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/2bfd5acc/R/pkg/NAMESPACE
----------------------------------------------------------------------
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index b7fdae5..232f5cf 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -429,6 +429,7 @@ export("structField",
"structField.character",
"print.structField",
"structType",
+ "structType.character",
"structType.jobj",
"structType.structField",
"print.structType")
@@ -465,5 +466,6 @@ S3method(print, summary.GBTRegressionModel)
S3method(print, summary.GBTClassificationModel)
S3method(structField, character)
S3method(structField, jobj)
+S3method(structType, character)
S3method(structType, jobj)
S3method(structType, structField)
http://git-wip-us.apache.org/repos/asf/spark/blob/2bfd5acc/R/pkg/R/DataFrame.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index 3b9d42d..e7a166c 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -1391,6 +1391,10 @@ setMethod("summarize",
})
dapplyInternal <- function(x, func, schema) {
+ if (is.character(schema)) {
+ schema <- structType(schema)
+ }
+
packageNamesArr <- serialize(.sparkREnv[[".packages"]],
connection = NULL)
@@ -1408,6 +1412,8 @@ dapplyInternal <- function(x, func, schema) {
dataFrame(sdf)
}
+setClassUnion("characterOrstructType", c("character", "structType"))
+
#' dapply
#'
#' Apply a function to each partition of a SparkDataFrame.
@@ -1418,10 +1424,11 @@ dapplyInternal <- function(x, func, schema) {
#' to each partition will be passed.
#' The output of func should be a R data.frame.
#' @param schema The schema of the resulting SparkDataFrame after the function is applied.
-#' It must match the output of func.
+#' It must match the output of func. Since Spark 2.3, the DDL-formatted string
+#' is also supported for the schema.
#' @family SparkDataFrame functions
#' @rdname dapply
-#' @aliases dapply,SparkDataFrame,function,structType-method
+#' @aliases dapply,SparkDataFrame,function,characterOrstructType-method
#' @name dapply
#' @seealso \link{dapplyCollect}
#' @export
@@ -1444,6 +1451,17 @@ dapplyInternal <- function(x, func, schema) {
#' y <- cbind(y, y[1] + 1L)
#' },
#' schema)
+#'
+#' # The schema also can be specified in a DDL-formatted string.
+#' schema <- "a INT, d DOUBLE, c STRING, d INT"
+#' df1 <- dapply(
+#' df,
+#' function(x) {
+#' y <- x[x[1] > 1, ]
+#' y <- cbind(y, y[1] + 1L)
+#' },
+#' schema)
+#'
#' collect(df1)
#' # the result
#' # a b c d
@@ -1452,7 +1470,7 @@ dapplyInternal <- function(x, func, schema) {
#' }
#' @note dapply since 2.0.0
setMethod("dapply",
- signature(x = "SparkDataFrame", func = "function", schema = "structType"),
+ signature(x = "SparkDataFrame", func = "function", schema = "characterOrstructType"),
function(x, func, schema) {
dapplyInternal(x, func, schema)
})
@@ -1522,6 +1540,7 @@ setMethod("dapplyCollect",
#' @param schema the schema of the resulting SparkDataFrame after the function is applied.
#' The schema must match to output of \code{func}. It has to be defined for each
#' output column with preferred output column name and corresponding data type.
+#' Since Spark 2.3, the DDL-formatted string is also supported for the schema.
#' @return A SparkDataFrame.
#' @family SparkDataFrame functions
#' @aliases gapply,SparkDataFrame-method
@@ -1541,7 +1560,7 @@ setMethod("dapplyCollect",
#'
#' Here our output contains three columns, the key which is a combination of two
#' columns with data types integer and string and the mean which is a double.
-#' schema <- structType(structField("a", "integer"), structField("c", "string"),
+#' schema <- structType(structField("a", "integer"), structField("c", "string"),
#' structField("avg", "double"))
#' result <- gapply(
#' df,
@@ -1550,6 +1569,15 @@ setMethod("dapplyCollect",
#' y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
#' }, schema)
#'
+#' The schema also can be specified in a DDL-formatted string.
+#' schema <- "a INT, c STRING, avg DOUBLE"
+#' result <- gapply(
+#' df,
+#' c("a", "c"),
+#' function(key, x) {
+#' y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
+#' }, schema)
+#'
#' We can also group the data and afterwards call gapply on GroupedData.
#' For Example:
#' gdf <- group_by(df, "a", "c")
http://git-wip-us.apache.org/repos/asf/spark/blob/2bfd5acc/R/pkg/R/functions.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R
index f28d26a..86507f1 100644
--- a/R/pkg/R/functions.R
+++ b/R/pkg/R/functions.R
@@ -2174,8 +2174,9 @@ setMethod("date_format", signature(y = "Column", x = "character"),
#'
#' @rdname column_collection_functions
#' @param schema a structType object to use as the schema to use when parsing the JSON string.
+#' Since Spark 2.3, the DDL-formatted string is also supported for the schema.
#' @param as.json.array indicating if input string is JSON array of objects or a single object.
-#' @aliases from_json from_json,Column,structType-method
+#' @aliases from_json from_json,Column,characterOrstructType-method
#' @export
#' @examples
#'
@@ -2188,10 +2189,15 @@ setMethod("date_format", signature(y = "Column", x = "character"),
#' df2 <- sql("SELECT named_struct('name', 'Bob') as people")
#' df2 <- mutate(df2, people_json = to_json(df2$people))
#' schema <- structType(structField("name", "string"))
-#' head(select(df2, from_json(df2$people_json, schema)))}
+#' head(select(df2, from_json(df2$people_json, schema)))
+#' head(select(df2, from_json(df2$people_json, "name STRING")))}
#' @note from_json since 2.2.0
-setMethod("from_json", signature(x = "Column", schema = "structType"),
+setMethod("from_json", signature(x = "Column", schema = "characterOrstructType"),
function(x, schema, as.json.array = FALSE, ...) {
+ if (is.character(schema)) {
+ schema <- structType(schema)
+ }
+
if (as.json.array) {
jschema <- callJStatic("org.apache.spark.sql.types.DataTypes",
"createArrayType",
http://git-wip-us.apache.org/repos/asf/spark/blob/2bfd5acc/R/pkg/R/group.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/group.R b/R/pkg/R/group.R
index 17f5283..0a7be0e 100644
--- a/R/pkg/R/group.R
+++ b/R/pkg/R/group.R
@@ -233,6 +233,9 @@ setMethod("gapplyCollect",
})
gapplyInternal <- function(x, func, schema) {
+ if (is.character(schema)) {
+ schema <- structType(schema)
+ }
packageNamesArr <- serialize(.sparkREnv[[".packages"]],
connection = NULL)
broadcastArr <- lapply(ls(.broadcastNames),
http://git-wip-us.apache.org/repos/asf/spark/blob/2bfd5acc/R/pkg/R/schema.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/schema.R b/R/pkg/R/schema.R
index cb5bdb9..d1ed683 100644
--- a/R/pkg/R/schema.R
+++ b/R/pkg/R/schema.R
@@ -23,18 +23,24 @@
#' Create a structType object that contains the metadata for a SparkDataFrame. Intended for
#' use with createDataFrame and toDF.
#'
-#' @param x a structField object (created with the field() function)
+#' @param x a structField object (created with the \code{structField} method). Since Spark 2.3,
+#' this can be a DDL-formatted string, which is a comma separated list of field
+#' definitions, e.g., "a INT, b STRING".
#' @param ... additional structField objects
#' @return a structType object
#' @rdname structType
#' @export
#' @examples
#'\dontrun{
-#' schema <- structType(structField("a", "integer"), structField("c", "string"),
+#' schema <- structType(structField("a", "integer"), structField("c", "string"),
#' structField("avg", "double"))
#' df1 <- gapply(df, list("a", "c"),
#' function(key, x) { y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE) },
#' schema)
+#' schema <- structType("a INT, c STRING, avg DOUBLE")
+#' df1 <- gapply(df, list("a", "c"),
+#' function(key, x) { y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE) },
+#' schema)
#' }
#' @note structType since 1.4.0
structType <- function(x, ...) {
@@ -68,6 +74,23 @@ structType.structField <- function(x, ...) {
structType(stObj)
}
+#' @rdname structType
+#' @method structType character
+#' @export
+structType.character <- function(x, ...) {
+ if (!is.character(x)) {
+ stop("schema must be a DDL-formatted string.")
+ }
+ if (length(list(...)) > 0) {
+ stop("multiple DDL-formatted strings are not supported")
+ }
+
+ stObj <- handledCallJStatic("org.apache.spark.sql.types.StructType",
+ "fromDDL",
+ x)
+ structType(stObj)
+}
+
#' Print a Spark StructType.
#'
#' This function prints the contents of a StructType returned from the
@@ -102,7 +125,7 @@ print.structType <- function(x, ...) {
#' field1 <- structField("a", "integer")
#' field2 <- structField("c", "string")
#' field3 <- structField("avg", "double")
-#' schema <- structType(field1, field2, field3)
+#' schema <- structType(field1, field2, field3)
#' df1 <- gapply(df, list("a", "c"),
#' function(key, x) { y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE) },
#' schema)
http://git-wip-us.apache.org/repos/asf/spark/blob/2bfd5acc/R/pkg/tests/fulltests/test_sparkSQL.R
----------------------------------------------------------------------
diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R
index a2bcb5a..77052d4 100644
--- a/R/pkg/tests/fulltests/test_sparkSQL.R
+++ b/R/pkg/tests/fulltests/test_sparkSQL.R
@@ -146,6 +146,13 @@ test_that("structType and structField", {
expect_is(testSchema, "structType")
expect_is(testSchema$fields()[[2]], "structField")
expect_equal(testSchema$fields()[[1]]$dataType.toString(), "StringType")
+
+ testSchema <- structType("a STRING, b INT")
+ expect_is(testSchema, "structType")
+ expect_is(testSchema$fields()[[2]], "structField")
+ expect_equal(testSchema$fields()[[1]]$dataType.toString(), "StringType")
+
+ expect_error(structType("A stri"), "DataType stri is not supported.")
})
test_that("structField type strings", {
@@ -1480,13 +1487,15 @@ test_that("column functions", {
j <- collect(select(df, alias(to_json(df$info), "json")))
expect_equal(j[order(j$json), ][1], "{\"age\":16,\"height\":176.5}")
df <- as.DataFrame(j)
- schema <- structType(structField("age", "integer"),
- structField("height", "double"))
- s <- collect(select(df, alias(from_json(df$json, schema), "structcol")))
- expect_equal(ncol(s), 1)
- expect_equal(nrow(s), 3)
- expect_is(s[[1]][[1]], "struct")
- expect_true(any(apply(s, 1, function(x) { x[[1]]$age == 16 } )))
+ schemas <- list(structType(structField("age", "integer"), structField("height", "double")),
+ "age INT, height DOUBLE")
+ for (schema in schemas) {
+ s <- collect(select(df, alias(from_json(df$json, schema), "structcol")))
+ expect_equal(ncol(s), 1)
+ expect_equal(nrow(s), 3)
+ expect_is(s[[1]][[1]], "struct")
+ expect_true(any(apply(s, 1, function(x) { x[[1]]$age == 16 } )))
+ }
# passing option
df <- as.DataFrame(list(list("col" = "{\"date\":\"21/10/2014\"}")))
@@ -1504,14 +1513,15 @@ test_that("column functions", {
# check if array type in string is correctly supported.
jsonArr <- "[{\"name\":\"Bob\"}, {\"name\":\"Alice\"}]"
df <- as.DataFrame(list(list("people" = jsonArr)))
- schema <- structType(structField("name", "string"))
- arr <- collect(select(df, alias(from_json(df$people, schema, as.json.array = TRUE), "arrcol")))
- expect_equal(ncol(arr), 1)
- expect_equal(nrow(arr), 1)
- expect_is(arr[[1]][[1]], "list")
- expect_equal(length(arr$arrcol[[1]]), 2)
- expect_equal(arr$arrcol[[1]][[1]]$name, "Bob")
- expect_equal(arr$arrcol[[1]][[2]]$name, "Alice")
+ for (schema in list(structType(structField("name", "string")), "name STRING")) {
+ arr <- collect(select(df, alias(from_json(df$people, schema, as.json.array = TRUE), "arrcol")))
+ expect_equal(ncol(arr), 1)
+ expect_equal(nrow(arr), 1)
+ expect_is(arr[[1]][[1]], "list")
+ expect_equal(length(arr$arrcol[[1]]), 2)
+ expect_equal(arr$arrcol[[1]][[1]]$name, "Bob")
+ expect_equal(arr$arrcol[[1]][[2]]$name, "Alice")
+ }
# Test create_array() and create_map()
df <- as.DataFrame(data.frame(
@@ -2885,30 +2895,33 @@ test_that("dapply() and dapplyCollect() on a DataFrame", {
expect_identical(ldf, result)
# Filter and add a column
- schema <- structType(structField("a", "integer"), structField("b", "double"),
- structField("c", "string"), structField("d", "integer"))
- df1 <- dapply(
- df,
- function(x) {
- y <- x[x$a > 1, ]
- y <- cbind(y, y$a + 1L)
- },
- schema)
- result <- collect(df1)
- expected <- ldf[ldf$a > 1, ]
- expected$d <- expected$a + 1L
- rownames(expected) <- NULL
- expect_identical(expected, result)
-
- result <- dapplyCollect(
- df,
- function(x) {
- y <- x[x$a > 1, ]
- y <- cbind(y, y$a + 1L)
- })
- expected1 <- expected
- names(expected1) <- names(result)
- expect_identical(expected1, result)
+ schemas <- list(structType(structField("a", "integer"), structField("b", "double"),
+ structField("c", "string"), structField("d", "integer")),
+ "a INT, b DOUBLE, c STRING, d INT")
+ for (schema in schemas) {
+ df1 <- dapply(
+ df,
+ function(x) {
+ y <- x[x$a > 1, ]
+ y <- cbind(y, y$a + 1L)
+ },
+ schema)
+ result <- collect(df1)
+ expected <- ldf[ldf$a > 1, ]
+ expected$d <- expected$a + 1L
+ rownames(expected) <- NULL
+ expect_identical(expected, result)
+
+ result <- dapplyCollect(
+ df,
+ function(x) {
+ y <- x[x$a > 1, ]
+ y <- cbind(y, y$a + 1L)
+ })
+ expected1 <- expected
+ names(expected1) <- names(result)
+ expect_identical(expected1, result)
+ }
# Remove the added column
df2 <- dapply(
@@ -3020,29 +3033,32 @@ test_that("gapply() and gapplyCollect() on a DataFrame", {
# Computes the sum of second column by grouping on the first and third columns
# and checks if the sum is larger than 2
- schema <- structType(structField("a", "integer"), structField("e", "boolean"))
- df2 <- gapply(
- df,
- c(df$"a", df$"c"),
- function(key, x) {
- y <- data.frame(key[1], sum(x$b) > 2)
- },
- schema)
- actual <- collect(df2)$e
- expected <- c(TRUE, TRUE)
- expect_identical(actual, expected)
-
- df2Collect <- gapplyCollect(
- df,
- c(df$"a", df$"c"),
- function(key, x) {
- y <- data.frame(key[1], sum(x$b) > 2)
- colnames(y) <- c("a", "e")
- y
- })
- actual <- df2Collect$e
+ schemas <- list(structType(structField("a", "integer"), structField("e", "boolean")),
+ "a INT, e BOOLEAN")
+ for (schema in schemas) {
+ df2 <- gapply(
+ df,
+ c(df$"a", df$"c"),
+ function(key, x) {
+ y <- data.frame(key[1], sum(x$b) > 2)
+ },
+ schema)
+ actual <- collect(df2)$e
+ expected <- c(TRUE, TRUE)
expect_identical(actual, expected)
+ df2Collect <- gapplyCollect(
+ df,
+ c(df$"a", df$"c"),
+ function(key, x) {
+ y <- data.frame(key[1], sum(x$b) > 2)
+ colnames(y) <- c("a", "e")
+ y
+ })
+ actual <- df2Collect$e
+ expect_identical(actual, expected)
+ }
+
# Computes the arithmetic mean of the second column by grouping
# on the first and third columns. Output the groupping value and the average.
schema <- structType(structField("a", "integer"), structField("c", "string"),
http://git-wip-us.apache.org/repos/asf/spark/blob/2bfd5acc/python/pyspark/sql/functions.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 5d8ded8..f3e7d03 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -1883,15 +1883,20 @@ def from_json(col, schema, options={}):
string.
:param col: string column in json format
- :param schema: a StructType or ArrayType of StructType to use when parsing the json column
+ :param schema: a StructType or ArrayType of StructType to use when parsing the json column.
:param options: options to control parsing. accepts the same options as the json datasource
+ .. note:: Since Spark 2.3, the DDL-formatted string or a JSON format string is also
+ supported for ``schema``.
+
>>> from pyspark.sql.types import *
>>> data = [(1, '''{"a": 1}''')]
>>> schema = StructType([StructField("a", IntegerType())])
>>> df = spark.createDataFrame(data, ("key", "value"))
>>> df.select(from_json(df.value, schema).alias("json")).collect()
[Row(json=Row(a=1))]
+ >>> df.select(from_json(df.value, "a INT").alias("json")).collect()
+ [Row(json=Row(a=1))]
>>> data = [(1, '''[{"a": 1}]''')]
>>> schema = ArrayType(StructType([StructField("a", IntegerType())]))
>>> df = spark.createDataFrame(data, ("key", "value"))
@@ -1900,7 +1905,9 @@ def from_json(col, schema, options={}):
"""
sc = SparkContext._active_spark_context
- jc = sc._jvm.functions.from_json(_to_java_column(col), schema.json(), options)
+ if isinstance(schema, DataType):
+ schema = schema.json()
+ jc = sc._jvm.functions.from_json(_to_java_column(col), schema, options)
return Column(jc)
http://git-wip-us.apache.org/repos/asf/spark/blob/2bfd5acc/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 0c7b483..ebdeb42 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -2114,7 +2114,7 @@ object functions {
* Calculates the hash code of given columns, and returns the result as an int column.
*
* @group misc_funcs
- * @since 2.0
+ * @since 2.0.0
*/
@scala.annotation.varargs
def hash(cols: Column*): Column = withExpr {
@@ -3074,9 +3074,8 @@ object functions {
* string.
*
* @param e a string column containing JSON data.
- * @param schema the schema to use when parsing the json string as a json string. In Spark 2.1,
- * the user-provided schema has to be in JSON format. Since Spark 2.2, the DDL
- * format is also supported for the schema.
+ * @param schema the schema to use when parsing the json string as a json string, it could be a
+ * JSON format string or a DDL-formatted string.
*
* @group collection_funcs
* @since 2.3.0
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org