You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by fe...@apache.org on 2017/02/15 18:45:40 UTC

spark git commit: [SPARK-19399][SPARKR] Add R coalesce API for DataFrame and Column

Repository: spark
Updated Branches:
  refs/heads/master c97f4e17d -> 671bc08ed


[SPARK-19399][SPARKR] Add R coalesce API for DataFrame and Column

## What changes were proposed in this pull request?

Add coalesce on DataFrame for down partitioning without shuffle and coalesce on Column

## How was this patch tested?

manual, unit tests

Author: Felix Cheung <fe...@hotmail.com>

Closes #16739 from felixcheung/rcoalesce.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/671bc08e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/671bc08e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/671bc08e

Branch: refs/heads/master
Commit: 671bc08ed502815bfa2254c30d64149402acb0c7
Parents: c97f4e1
Author: Felix Cheung <fe...@hotmail.com>
Authored: Wed Feb 15 10:45:37 2017 -0800
Committer: Felix Cheung <fe...@apache.org>
Committed: Wed Feb 15 10:45:37 2017 -0800

----------------------------------------------------------------------
 R/pkg/NAMESPACE                                 |  1 +
 R/pkg/R/DataFrame.R                             | 46 ++++++++++++++++++--
 R/pkg/R/RDD.R                                   |  4 +-
 R/pkg/R/functions.R                             | 26 ++++++++++-
 R/pkg/R/generics.R                              |  9 +++-
 R/pkg/inst/tests/testthat/test_rdd.R            |  2 +-
 R/pkg/inst/tests/testthat/test_sparkSQL.R       | 32 +++++++++++---
 .../main/scala/org/apache/spark/rdd/RDD.scala   |  3 +-
 python/pyspark/sql/dataframe.py                 | 10 ++++-
 .../scala/org/apache/spark/sql/Dataset.scala    | 10 ++++-
 .../sql/execution/basicPhysicalOperators.scala  | 10 ++++-
 11 files changed, 135 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/671bc08e/R/pkg/NAMESPACE
----------------------------------------------------------------------
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index 8b26500..81e1936 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -82,6 +82,7 @@ exportMethods("arrange",
               "as.data.frame",
               "attach",
               "cache",
+              "coalesce",
               "collect",
               "colnames",
               "colnames<-",

http://git-wip-us.apache.org/repos/asf/spark/blob/671bc08e/R/pkg/R/DataFrame.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index 5bca410..cf331ba 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -678,14 +678,53 @@ setMethod("storageLevel",
             storageLevelToString(callJMethod(x@sdf, "storageLevel"))
           })
 
+#' Coalesce
+#'
+#' Returns a new SparkDataFrame that has exactly \code{numPartitions} partitions.
+#' This operation results in a narrow dependency, e.g. if you go from 1000 partitions to 100
+#' partitions, there will not be a shuffle, instead each of the 100 new partitions will claim 10 of
+#' the current partitions. If a larger number of partitions is requested, it will stay at the
+#' current number of partitions.
+#'
+#' However, if you're doing a drastic coalesce on a SparkDataFrame, e.g. to numPartitions = 1,
+#' this may result in your computation taking place on fewer nodes than
+#' you like (e.g. one node in the case of numPartitions = 1). To avoid this,
+#' call \code{repartition}. This will add a shuffle step, but means the
+#' current upstream partitions will be executed in parallel (per whatever
+#' the current partitioning is).
+#'
+#' @param numPartitions the number of partitions to use.
+#'
+#' @family SparkDataFrame functions
+#' @rdname coalesce
+#' @name coalesce
+#' @aliases coalesce,SparkDataFrame-method
+#' @seealso \link{repartition}
+#' @export
+#' @examples
+#'\dontrun{
+#' sparkR.session()
+#' path <- "path/to/file.json"
+#' df <- read.json(path)
+#' newDF <- coalesce(df, 1L)
+#'}
+#' @note coalesce(SparkDataFrame) since 2.1.1
+setMethod("coalesce",
+          signature(x = "SparkDataFrame"),
+          function(x, numPartitions) {
+            stopifnot(is.numeric(numPartitions))
+            sdf <- callJMethod(x@sdf, "coalesce", numToInt(numPartitions))
+            dataFrame(sdf)
+          })
+
 #' Repartition
 #'
 #' The following options for repartition are possible:
 #' \itemize{
-#'  \item{1.} {Return a new SparkDataFrame partitioned by
+#'  \item{1.} {Return a new SparkDataFrame that has exactly \code{numPartitions}.}
+#'  \item{2.} {Return a new SparkDataFrame hash partitioned by
 #'                      the given columns into \code{numPartitions}.}
-#'  \item{2.} {Return a new SparkDataFrame that has exactly \code{numPartitions}.}
-#'  \item{3.} {Return a new SparkDataFrame partitioned by the given column(s),
+#'  \item{3.} {Return a new SparkDataFrame hash partitioned by the given column(s),
 #'                      using \code{spark.sql.shuffle.partitions} as number of partitions.}
 #'}
 #' @param x a SparkDataFrame.
@@ -697,6 +736,7 @@ setMethod("storageLevel",
 #' @rdname repartition
 #' @name repartition
 #' @aliases repartition,SparkDataFrame-method
+#' @seealso \link{coalesce}
 #' @export
 #' @examples
 #'\dontrun{

http://git-wip-us.apache.org/repos/asf/spark/blob/671bc08e/R/pkg/R/RDD.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/RDD.R b/R/pkg/R/RDD.R
index 91bab33..5667b9d 100644
--- a/R/pkg/R/RDD.R
+++ b/R/pkg/R/RDD.R
@@ -1028,7 +1028,7 @@ setMethod("repartitionRDD",
           signature(x = "RDD"),
           function(x, numPartitions) {
             if (!is.null(numPartitions) && is.numeric(numPartitions)) {
-              coalesce(x, numPartitions, TRUE)
+              coalesceRDD(x, numPartitions, TRUE)
             } else {
               stop("Please, specify the number of partitions")
             }
@@ -1049,7 +1049,7 @@ setMethod("repartitionRDD",
 #' @rdname coalesce
 #' @aliases coalesce,RDD
 #' @noRd
-setMethod("coalesce",
+setMethod("coalesceRDD",
            signature(x = "RDD", numPartitions = "numeric"),
            function(x, numPartitions, shuffle = FALSE) {
              numPartitions <- numToInt(numPartitions)

http://git-wip-us.apache.org/repos/asf/spark/blob/671bc08e/R/pkg/R/functions.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R
index 032cfec..9e50844 100644
--- a/R/pkg/R/functions.R
+++ b/R/pkg/R/functions.R
@@ -286,6 +286,28 @@ setMethod("ceil",
             column(jc)
           })
 
+#' Returns the first column that is not NA
+#'
+#' Returns the first column that is not NA, or NA if all inputs are.
+#'
+#' @rdname coalesce
+#' @name coalesce
+#' @family normal_funcs
+#' @export
+#' @aliases coalesce,Column-method
+#' @examples \dontrun{coalesce(df$c, df$d, df$e)}
+#' @note coalesce(Column) since 2.1.1
+setMethod("coalesce",
+          signature(x = "Column"),
+          function(x, ...) {
+            jcols <- lapply(list(x, ...), function (x) {
+              stopifnot(class(x) == "Column")
+              x@jc
+            })
+            jc <- callJStatic("org.apache.spark.sql.functions", "coalesce", jcols)
+            column(jc)
+          })
+
 #' Though scala functions has "col" function, we don't expose it in SparkR
 #' because we don't want to conflict with the "col" function in the R base
 #' package and we also have "column" function exported which is an alias of "col".
@@ -297,7 +319,7 @@ col <- function(x) {
 #' Returns a Column based on the given column name
 #'
 #' Returns a Column based on the given column name.
-#
+#'
 #' @param x Character column name.
 #'
 #' @rdname column
@@ -305,7 +327,7 @@ col <- function(x) {
 #' @family normal_funcs
 #' @export
 #' @aliases column,character-method
-#' @examples \dontrun{column(df)}
+#' @examples \dontrun{column("name")}
 #' @note column since 1.6.0
 setMethod("column",
           signature(x = "character"),

http://git-wip-us.apache.org/repos/asf/spark/blob/671bc08e/R/pkg/R/generics.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index 0d9a996..68864e6 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -28,7 +28,7 @@ setGeneric("cacheRDD", function(x) { standardGeneric("cacheRDD") })
 # @rdname coalesce
 # @seealso repartition
 # @export
-setGeneric("coalesce", function(x, numPartitions, ...) { standardGeneric("coalesce") })
+setGeneric("coalesceRDD", function(x, numPartitions, ...) { standardGeneric("coalesceRDD") })
 
 # @rdname checkpoint-methods
 # @export
@@ -406,6 +406,13 @@ setGeneric("attach")
 #' @export
 setGeneric("cache", function(x) { standardGeneric("cache") })
 
+#' @rdname coalesce
+#' @param x a Column or a SparkDataFrame.
+#' @param ... additional argument(s). If \code{x} is a Column, additional Columns can be optionally
+#'        provided.
+#' @export
+setGeneric("coalesce", function(x, ...) { standardGeneric("coalesce") })
+
 #' @rdname collect
 #' @export
 setGeneric("collect", function(x, ...) { standardGeneric("collect") })

http://git-wip-us.apache.org/repos/asf/spark/blob/671bc08e/R/pkg/inst/tests/testthat/test_rdd.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/testthat/test_rdd.R b/R/pkg/inst/tests/testthat/test_rdd.R
index ceb31bd..787ef51 100644
--- a/R/pkg/inst/tests/testthat/test_rdd.R
+++ b/R/pkg/inst/tests/testthat/test_rdd.R
@@ -315,7 +315,7 @@ test_that("repartition/coalesce on RDDs", {
   expect_true(count >= 0 && count <= 4)
 
   # coalesce
-  r3 <- coalesce(rdd, 1)
+  r3 <- coalesceRDD(rdd, 1)
   expect_equal(getNumPartitionsRDD(r3), 1L)
   count <- length(collectPartition(r3, 0L))
   expect_equal(count, 20)

http://git-wip-us.apache.org/repos/asf/spark/blob/671bc08e/R/pkg/inst/tests/testthat/test_sparkSQL.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index 1494ebb..199eb20 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -725,7 +725,7 @@ test_that("objectFile() works with row serialization", {
   objectPath <- tempfile(pattern = "spark-test", fileext = ".tmp")
   df <- read.json(jsonPath)
   dfRDD <- toRDD(df)
-  saveAsObjectFile(coalesce(dfRDD, 1L), objectPath)
+  saveAsObjectFile(coalesceRDD(dfRDD, 1L), objectPath)
   objectIn <- objectFile(sc, objectPath)
 
   expect_is(objectIn, "RDD")
@@ -1236,7 +1236,7 @@ test_that("column functions", {
   c16 <- is.nan(c) + isnan(c) + isNaN(c)
   c17 <- cov(c, c1) + cov("c", "c1") + covar_samp(c, c1) + covar_samp("c", "c1")
   c18 <- covar_pop(c, c1) + covar_pop("c", "c1")
-  c19 <- spark_partition_id()
+  c19 <- spark_partition_id() + coalesce(c) + coalesce(c1, c2, c3)
   c20 <- to_timestamp(c) + to_timestamp(c, "yyyy") + to_date(c, "yyyy")
 
   # Test if base::is.nan() is exposed
@@ -2491,15 +2491,18 @@ test_that("repartition by columns on DataFrame", {
     ("Please, specify the number of partitions and/or a column\\(s\\)", retError), TRUE)
 
   # repartition by column and number of partitions
-  actual <- repartition(df, 3L, col = df$"a")
+  actual <- repartition(df, 3, col = df$"a")
 
-  # since we cannot access the number of partitions from dataframe, checking
-  # that at least the dimensions are identical
+  # Checking that at least the dimensions are identical
   expect_identical(dim(df), dim(actual))
+  expect_equal(getNumPartitions(actual), 3L)
 
   # repartition by number of partitions
   actual <- repartition(df, 13L)
   expect_identical(dim(df), dim(actual))
+  expect_equal(getNumPartitions(actual), 13L)
+
+  expect_equal(getNumPartitions(coalesce(actual, 1L)), 1L)
 
   # a test case with a column and dapply
   schema <-  structType(structField("a", "integer"), structField("avg", "double"))
@@ -2515,6 +2518,25 @@ test_that("repartition by columns on DataFrame", {
   expect_equal(nrow(df1), 2)
 })
 
+test_that("coalesce, repartition, numPartitions", {
+  df <- as.DataFrame(cars, numPartitions = 5)
+  expect_equal(getNumPartitions(df), 5)
+  expect_equal(getNumPartitions(coalesce(df, 3)), 3)
+  expect_equal(getNumPartitions(coalesce(df, 6)), 5)
+
+  df1 <- coalesce(df, 3)
+  expect_equal(getNumPartitions(df1), 3)
+  expect_equal(getNumPartitions(coalesce(df1, 6)), 5)
+  expect_equal(getNumPartitions(coalesce(df1, 4)), 4)
+  expect_equal(getNumPartitions(coalesce(df1, 2)), 2)
+
+  df2 <- repartition(df1, 10)
+  expect_equal(getNumPartitions(df2), 10)
+  expect_equal(getNumPartitions(coalesce(df2, 13)), 5)
+  expect_equal(getNumPartitions(coalesce(df2, 7)), 5)
+  expect_equal(getNumPartitions(coalesce(df2, 3)), 3)
+})
+
 test_that("gapply() and gapplyCollect() on a DataFrame", {
   df <- createDataFrame (
     list(list(1L, 1, "1", 0.1), list(1L, 2, "1", 0.2), list(3L, 3, "3", 0.3)),

http://git-wip-us.apache.org/repos/asf/spark/blob/671bc08e/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 0359508..e524675 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -423,7 +423,8 @@ abstract class RDD[T: ClassTag](
    *
    * This results in a narrow dependency, e.g. if you go from 1000 partitions
    * to 100 partitions, there will not be a shuffle, instead each of the 100
-   * new partitions will claim 10 of the current partitions.
+   * new partitions will claim 10 of the current partitions. If a larger number
+   * of partitions is requested, it will stay at the current number of partitions.
    *
    * However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
    * this may result in your computation taking place on fewer nodes than

http://git-wip-us.apache.org/repos/asf/spark/blob/671bc08e/python/pyspark/sql/dataframe.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 188808b..70efeaf 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -515,7 +515,15 @@ class DataFrame(object):
         Similar to coalesce defined on an :class:`RDD`, this operation results in a
         narrow dependency, e.g. if you go from 1000 partitions to 100 partitions,
         there will not be a shuffle, instead each of the 100 new partitions will
-        claim 10 of the current partitions.
+        claim 10 of the current partitions. If a larger number of partitions is requested,
+        it will stay at the current number of partitions.
+
+        However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
+        this may result in your computation taking place on fewer nodes than
+        you like (e.g. one node in the case of numPartitions = 1). To avoid this,
+        you can call repartition(). This will add a shuffle step, but means the
+        current upstream partitions will be executed in parallel (per whatever
+        the current partitioning is).
 
         >>> df.coalesce(1).rdd.getNumPartitions()
         1

http://git-wip-us.apache.org/repos/asf/spark/blob/671bc08e/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index ce6e8be..6b80ff4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -2432,7 +2432,15 @@ class Dataset[T] private[sql](
    * Returns a new Dataset that has exactly `numPartitions` partitions.
    * Similar to coalesce defined on an `RDD`, this operation results in a narrow dependency, e.g.
    * if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of
-   * the 100 new partitions will claim 10 of the current partitions.
+   * the 100 new partitions will claim 10 of the current partitions.  If a larger number of
+   * partitions is requested, it will stay at the current number of partitions.
+   *
+   * However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
+   * this may result in your computation taking place on fewer nodes than
+   * you like (e.g. one node in the case of numPartitions = 1). To avoid this,
+   * you can call repartition. This will add a shuffle step, but means the
+   * current upstream partitions will be executed in parallel (per whatever
+   * the current partitioning is).
    *
    * @group typedrel
    * @since 1.6.0

http://git-wip-us.apache.org/repos/asf/spark/blob/671bc08e/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
index 649c21b..c01f9c5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
@@ -541,7 +541,15 @@ case class UnionExec(children: Seq[SparkPlan]) extends SparkPlan {
  * Physical plan for returning a new RDD that has exactly `numPartitions` partitions.
  * Similar to coalesce defined on an [[RDD]], this operation results in a narrow dependency, e.g.
  * if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of
- * the 100 new partitions will claim 10 of the current partitions.
+ * the 100 new partitions will claim 10 of the current partitions.  If a larger number of partitions
+ * is requested, it will stay at the current number of partitions.
+ *
+ * However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
+ * this may result in your computation taking place on fewer nodes than
+ * you like (e.g. one node in the case of numPartitions = 1). To avoid this,
+ * you see ShuffleExchange. This will add a shuffle step, but means the
+ * current upstream partitions will be executed in parallel (per whatever
+ * the current partitioning is).
  */
 case class CoalesceExec(numPartitions: Int, child: SparkPlan) extends UnaryExecNode {
   override def output: Seq[Attribute] = child.output


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org