You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2022/11/27 02:02:43 UTC
[spark] branch master updated: [SPARK-41267][R][SQL] Add unpivot / melt to SparkR
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new bcf03fe3f86 [SPARK-41267][R][SQL] Add unpivot / melt to SparkR
bcf03fe3f86 is described below
commit bcf03fe3f86a7230fd977c059b73a58554370d5d
Author: zero323 <ms...@gmail.com>
AuthorDate: Sun Nov 27 11:02:32 2022 +0900
[SPARK-41267][R][SQL] Add unpivot / melt to SparkR
### What changes were proposed in this pull request?
This PR adds `unpivot` / `melt` functions to SpakR.
### Why are the changes needed?
Feature parity.
### Does this PR introduce _any_ user-facing change?
New functions available to R users.
### How was this patch tested?
New unit tests.
Closes #38804 from zero323/SPARK-41267.
Authored-by: zero323 <ms...@gmail.com>
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
R/pkg/NAMESPACE | 2 +
R/pkg/R/DataFrame.R | 73 +++++++++++++++++++++++++++++++++++
R/pkg/R/generics.R | 10 +++++
R/pkg/pkgdown/_pkgdown_template.yml | 1 +
R/pkg/tests/fulltests/test_sparkSQL.R | 26 +++++++++++++
5 files changed, 112 insertions(+)
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index e078ba0c2cd..bb05e99a9d8 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -143,6 +143,7 @@ exportMethods("arrange",
"join",
"limit",
"localCheckpoint",
+ "melt",
"merge",
"mutate",
"na.omit",
@@ -182,6 +183,7 @@ exportMethods("arrange",
"unionByName",
"unique",
"unpersist",
+ "unpivot",
"where",
"with",
"withColumn",
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index 5760e1cdd30..456e3d9509f 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -4253,3 +4253,76 @@ setMethod("withWatermark",
sdf <- callJMethod(x@sdf, "withWatermark", eventTime, delayThreshold)
dataFrame(sdf)
})
+
+#' Unpivot a DataFrame from wide format to long format.
+#'
+#' This is the reverse to \code{groupBy(...).pivot(...).agg(...)},
+#' except for the aggregation, which cannot be reversed.
+#'
+#' @param x a SparkDataFrame.
+#' @param ids a character vector or a list of columns
+#' @param values a character vector, a list of columns or \code{NULL}.
+#' If not NULL must not be empty. If \code{NULL}, uses all columns that
+#' are not set as \code{ids}.
+#' @param variableColumnName character Name of the variable column.
+#' @param valueColumnName character Name of the value column.
+#' @return a SparkDataFrame.
+#' @aliases unpivot,SparkDataFrame,ANY,ANY,character,character-method
+#' @family SparkDataFrame functions
+#' @rdname unpivot
+#' @name unpivot
+#' @examples
+#' \dontrun{
+#' df <- createDataFrame(data.frame(
+#' id = 1:3, x = c(1, 3, 5), y = c(2, 4, 6), z = c(-1, 0, 1)
+#' ))
+#'
+#' head(unpivot(df, "id", c("x", "y"), "var", "val"))
+#'
+#' head(unpivot(df, "id", NULL, "var", "val"))
+#' }
+#' @note unpivot since 3.4.0
+setMethod("unpivot",
+ signature(
+ x = "SparkDataFrame", ids = "ANY", values = "ANY",
+ variableColumnName = "character", valueColumnName = "character"
+ ),
+ function(x, ids, values, variableColumnName, valueColumnName) {
+ as_jcols <- function(xs) lapply(
+ xs,
+ function(x) {
+ if (is.character(x)) {
+ column(x)@jc
+ } else {
+ c@jc
+ }
+ }
+ )
+
+ sdf <- if (is.null(values)) {
+ callJMethod(
+ x@sdf, "unpivotWithSeq", as_jcols(ids), variableColumnName, valueColumnName
+ )
+ } else {
+ callJMethod(
+ x@sdf, "unpivotWithSeq",
+ as_jcols(ids), as_jcols(values),
+ variableColumnName, valueColumnName
+ )
+ }
+ dataFrame(sdf)
+ })
+
+#' @rdname unpivot
+#' @name melt
+#' @aliases melt,SparkDataFrame,ANY,ANY,character,character-method
+#' @note melt since 3.4.0
+setMethod("melt",
+ signature(
+ x = "SparkDataFrame", ids = "ANY", values = "ANY",
+ variableColumnName = "character", valueColumnName = "character"
+ ),
+ function(x, ids, values, variableColumnName, valueColumnName) {
+ unpivot(x, ids, values, variableColumnName, valueColumnName)
+ }
+)
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index 93cd0f3bff3..328df50877b 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -670,6 +670,16 @@ setGeneric("randomSplit", function(x, weights, seed) { standardGeneric("randomSp
#' @rdname broadcast
setGeneric("broadcast", function(x) { standardGeneric("broadcast") })
+#' @rdname unpivot
+setGeneric("unpivot", function(x, ids, values, variableColumnName, valueColumnName) {
+ standardGeneric("unpivot")
+})
+
+#' @rdname melt
+setGeneric("melt", function(x, ids, values, variableColumnName, valueColumnName) {
+ standardGeneric("melt")
+})
+
###################### Column Methods ##########################
#' @rdname columnfunctions
diff --git a/R/pkg/pkgdown/_pkgdown_template.yml b/R/pkg/pkgdown/_pkgdown_template.yml
index 1da1d62ee9c..e6b485d4898 100644
--- a/R/pkg/pkgdown/_pkgdown_template.yml
+++ b/R/pkg/pkgdown/_pkgdown_template.yml
@@ -117,6 +117,7 @@ reference:
- unionAll
- unionByName
- unpersist
+ - unpivot
- with
- withColumn
diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R
index d2b6220b2e7..e4436b86c2c 100644
--- a/R/pkg/tests/fulltests/test_sparkSQL.R
+++ b/R/pkg/tests/fulltests/test_sparkSQL.R
@@ -3013,6 +3013,32 @@ test_that("mutate(), transform(), rename() and names()", {
expect_match(tail(columns(newDF), 1L), "234567890", fixed = TRUE)
})
+test_that("unpivot / melt", {
+ df <- createDataFrame(data.frame(
+ id = 1:3, x = c(1, 3, 5), y = c(2, 4, 6), z = c(-1, 0, 1)
+ ))
+
+ result <- unpivot(df, "id", c("x", "y"), "var", "val")
+ expect_s4_class(result, "SparkDataFrame")
+ expect_equal(columns(result), c("id", "var", "val"))
+ expect_equal(count(distinct(select(result, "var"))), 2)
+
+ result <- unpivot(df, "id", NULL, "variable", "value")
+ expect_s4_class(result, "SparkDataFrame")
+ expect_equal(columns(result), c("id", "variable", "value"))
+ expect_equal(count(distinct(select(result, "variable"))), 3)
+
+ result <- melt(df, "id", c("x", "y"), "key", "value")
+ expect_s4_class(result, "SparkDataFrame")
+ expect_equal(columns(result), c("id", "key", "value"))
+ expect_equal(count(distinct(select(result, "key"))), 2)
+
+ result <- melt(df, "id", NULL, "key", "val")
+ expect_s4_class(result, "SparkDataFrame")
+ expect_equal(columns(result), c("id", "key", "val"))
+ expect_equal(count(distinct(select(result, "key"))), 3)
+})
+
test_that("read/write ORC files", {
setHiveContext(sc)
df <- read.df(jsonPath, "json")
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org