You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2022/11/27 02:02:43 UTC

[spark] branch master updated: [SPARK-41267][R][SQL] Add unpivot / melt to SparkR

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new bcf03fe3f86 [SPARK-41267][R][SQL] Add unpivot / melt to SparkR
bcf03fe3f86 is described below

commit bcf03fe3f86a7230fd977c059b73a58554370d5d
Author: zero323 <ms...@gmail.com>
AuthorDate: Sun Nov 27 11:02:32 2022 +0900

    [SPARK-41267][R][SQL] Add unpivot / melt to SparkR
    
    ### What changes were proposed in this pull request?
    
    This PR adds `unpivot` / `melt` functions to SpakR.
    
    ### Why are the changes needed?
    
    Feature parity.
    
    ### Does this PR introduce _any_ user-facing change?
    
    New functions available to R users.
    
    ### How was this patch tested?
    
    New unit tests.
    
    Closes #38804 from zero323/SPARK-41267.
    
    Authored-by: zero323 <ms...@gmail.com>
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
 R/pkg/NAMESPACE                       |  2 +
 R/pkg/R/DataFrame.R                   | 73 +++++++++++++++++++++++++++++++++++
 R/pkg/R/generics.R                    | 10 +++++
 R/pkg/pkgdown/_pkgdown_template.yml   |  1 +
 R/pkg/tests/fulltests/test_sparkSQL.R | 26 +++++++++++++
 5 files changed, 112 insertions(+)

diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index e078ba0c2cd..bb05e99a9d8 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -143,6 +143,7 @@ exportMethods("arrange",
               "join",
               "limit",
               "localCheckpoint",
+              "melt",
               "merge",
               "mutate",
               "na.omit",
@@ -182,6 +183,7 @@ exportMethods("arrange",
               "unionByName",
               "unique",
               "unpersist",
+              "unpivot",
               "where",
               "with",
               "withColumn",
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index 5760e1cdd30..456e3d9509f 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -4253,3 +4253,76 @@ setMethod("withWatermark",
             sdf <- callJMethod(x@sdf, "withWatermark", eventTime, delayThreshold)
             dataFrame(sdf)
           })
+
+#' Unpivot a DataFrame from wide format to long format.
+#'
+#' This is the reverse to \code{groupBy(...).pivot(...).agg(...)},
+#' except for the aggregation, which cannot be reversed.
+#'
+#' @param x a SparkDataFrame.
+#' @param ids a character vector or a list of columns
+#' @param values a character vector, a list of columns or \code{NULL}.
+#'               If not NULL must not be empty. If \code{NULL}, uses all columns that
+#'               are not set as \code{ids}.
+#' @param variableColumnName character Name of the variable column.
+#' @param valueColumnName character Name of the value column.
+#' @return a SparkDataFrame.
+#' @aliases unpivot,SparkDataFrame,ANY,ANY,character,character-method
+#' @family SparkDataFrame functions
+#' @rdname unpivot
+#' @name unpivot
+#' @examples
+#' \dontrun{
+#' df <- createDataFrame(data.frame(
+#'   id = 1:3, x = c(1, 3, 5), y = c(2, 4, 6), z = c(-1, 0, 1)
+#' ))
+#'
+#' head(unpivot(df, "id", c("x", "y"), "var", "val"))
+#'
+#' head(unpivot(df, "id", NULL, "var", "val"))
+#' }
+#' @note unpivot since 3.4.0
+setMethod("unpivot",
+          signature(
+            x = "SparkDataFrame", ids = "ANY", values = "ANY",
+            variableColumnName = "character", valueColumnName = "character"
+          ),
+          function(x, ids, values, variableColumnName, valueColumnName) {
+            as_jcols <- function(xs) lapply(
+              xs,
+              function(x) {
+                 if (is.character(x)) {
+                   column(x)@jc
+                 } else {
+                   c@jc
+                 }
+              }
+            )
+
+            sdf <- if (is.null(values)) {
+              callJMethod(
+                x@sdf, "unpivotWithSeq", as_jcols(ids), variableColumnName, valueColumnName
+              )
+            } else {
+              callJMethod(
+                x@sdf, "unpivotWithSeq",
+                as_jcols(ids), as_jcols(values),
+                variableColumnName, valueColumnName
+              )
+            }
+            dataFrame(sdf)
+          })
+
+#' @rdname unpivot
+#' @name melt
+#' @aliases melt,SparkDataFrame,ANY,ANY,character,character-method
+#' @note melt since 3.4.0
+setMethod("melt",
+          signature(
+            x = "SparkDataFrame", ids = "ANY", values = "ANY",
+            variableColumnName = "character", valueColumnName = "character"
+          ),
+          function(x, ids, values, variableColumnName, valueColumnName) {
+            unpivot(x, ids, values, variableColumnName, valueColumnName)
+          }
+)
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index 93cd0f3bff3..328df50877b 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -670,6 +670,16 @@ setGeneric("randomSplit", function(x, weights, seed) { standardGeneric("randomSp
 #' @rdname broadcast
 setGeneric("broadcast", function(x) { standardGeneric("broadcast") })
 
+#' @rdname unpivot
+setGeneric("unpivot", function(x, ids, values, variableColumnName, valueColumnName) {
+  standardGeneric("unpivot")
+})
+
+#' @rdname melt
+setGeneric("melt", function(x, ids, values, variableColumnName, valueColumnName) {
+  standardGeneric("melt")
+})
+
 ###################### Column Methods ##########################
 
 #' @rdname columnfunctions
diff --git a/R/pkg/pkgdown/_pkgdown_template.yml b/R/pkg/pkgdown/_pkgdown_template.yml
index 1da1d62ee9c..e6b485d4898 100644
--- a/R/pkg/pkgdown/_pkgdown_template.yml
+++ b/R/pkg/pkgdown/_pkgdown_template.yml
@@ -117,6 +117,7 @@ reference:
   - unionAll
   - unionByName
   - unpersist
+  - unpivot
   - with
   - withColumn
 
diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R
index d2b6220b2e7..e4436b86c2c 100644
--- a/R/pkg/tests/fulltests/test_sparkSQL.R
+++ b/R/pkg/tests/fulltests/test_sparkSQL.R
@@ -3013,6 +3013,32 @@ test_that("mutate(), transform(), rename() and names()", {
   expect_match(tail(columns(newDF), 1L), "234567890", fixed = TRUE)
 })
 
+test_that("unpivot / melt", {
+  df <- createDataFrame(data.frame(
+    id = 1:3, x = c(1, 3, 5), y = c(2, 4, 6), z = c(-1, 0, 1)
+  ))
+
+  result <- unpivot(df, "id", c("x", "y"), "var", "val")
+  expect_s4_class(result, "SparkDataFrame")
+  expect_equal(columns(result), c("id", "var", "val"))
+  expect_equal(count(distinct(select(result, "var"))), 2)
+
+  result <- unpivot(df, "id", NULL, "variable", "value")
+  expect_s4_class(result, "SparkDataFrame")
+  expect_equal(columns(result), c("id", "variable", "value"))
+  expect_equal(count(distinct(select(result, "variable"))), 3)
+
+  result <- melt(df, "id", c("x", "y"), "key", "value")
+  expect_s4_class(result, "SparkDataFrame")
+  expect_equal(columns(result), c("id", "key", "value"))
+  expect_equal(count(distinct(select(result, "key"))), 2)
+
+  result <- melt(df, "id", NULL, "key", "val")
+  expect_s4_class(result, "SparkDataFrame")
+  expect_equal(columns(result), c("id", "key", "val"))
+  expect_equal(count(distinct(select(result, "key"))), 3)
+})
+
 test_that("read/write ORC files", {
   setHiveContext(sc)
   df <- read.df(jsonPath, "json")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org