You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2017/11/27 01:09:58 UTC
spark git commit: [SPARK-21693][R][FOLLOWUP] Reduce shuffle
partitions running R worker in few tests to speed up
Repository: spark
Updated Branches:
refs/heads/master fba63c1a7 -> d49d9e403
[SPARK-21693][R][FOLLOWUP] Reduce shuffle partitions running R worker in few tests to speed up
## What changes were proposed in this pull request?
This is a followup to reduce AppVeyor test time. This PR proposes to reduce the number of shuffle partitions to reduce the tasks running R workers in few particular tests.
The symptom is similar as described in `https://github.com/apache/spark/pull/19722`. There are many R processes newly launched on Windows without forking and it makes the differences of elapsed time between Linux and Windows.
Here is the simple comparison for before/after of this change. I manually tested this by disabling `spark.sparkr.use.daemon`. Disabling it resembles the tests on Windows:
**Before**
<img width="672" alt="2017-11-25 12 22 13" src="https://user-images.githubusercontent.com/6477701/33217949-b5528dfa-d17d-11e7-8050-75675c39eb20.png">
**After**
<img width="682" alt="2017-11-25 12 32 00" src="https://user-images.githubusercontent.com/6477701/33217958-c6518052-d17d-11e7-9f8e-1be21a784559.png">
So, this probably will reduce roughly more than 10 minutes.
## How was this patch tested?
AppVeyor tests
Author: hyukjinkwon <gu...@gmail.com>
Closes #19816 from HyukjinKwon/SPARK-21693-followup.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d49d9e40
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d49d9e40
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d49d9e40
Branch: refs/heads/master
Commit: d49d9e40383209eed9584a4ef2c3964f27f4a08f
Parents: fba63c1
Author: hyukjinkwon <gu...@gmail.com>
Authored: Mon Nov 27 10:09:53 2017 +0900
Committer: hyukjinkwon <gu...@gmail.com>
Committed: Mon Nov 27 10:09:53 2017 +0900
----------------------------------------------------------------------
R/pkg/tests/fulltests/test_sparkSQL.R | 267 ++++++++++++++++-------------
1 file changed, 148 insertions(+), 119 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/d49d9e40/R/pkg/tests/fulltests/test_sparkSQL.R
----------------------------------------------------------------------
diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R
index 00217c8..d87f5d2 100644
--- a/R/pkg/tests/fulltests/test_sparkSQL.R
+++ b/R/pkg/tests/fulltests/test_sparkSQL.R
@@ -3021,41 +3021,54 @@ test_that("dapplyCollect() on DataFrame with a binary column", {
})
test_that("repartition by columns on DataFrame", {
- df <- createDataFrame(
- list(list(1L, 1, "1", 0.1), list(1L, 2, "2", 0.2), list(3L, 3, "3", 0.3)),
- c("a", "b", "c", "d"))
-
- # no column and number of partitions specified
- retError <- tryCatch(repartition(df), error = function(e) e)
- expect_equal(grepl
- ("Please, specify the number of partitions and/or a column\\(s\\)", retError), TRUE)
-
- # repartition by column and number of partitions
- actual <- repartition(df, 3, col = df$"a")
-
- # Checking that at least the dimensions are identical
- expect_identical(dim(df), dim(actual))
- expect_equal(getNumPartitions(actual), 3L)
-
- # repartition by number of partitions
- actual <- repartition(df, 13L)
- expect_identical(dim(df), dim(actual))
- expect_equal(getNumPartitions(actual), 13L)
-
- expect_equal(getNumPartitions(coalesce(actual, 1L)), 1L)
-
- # a test case with a column and dapply
- schema <- structType(structField("a", "integer"), structField("avg", "double"))
- df <- repartition(df, col = df$"a")
- df1 <- dapply(
- df,
- function(x) {
- y <- (data.frame(x$a[1], mean(x$b)))
- },
- schema)
+ # The tasks here launch R workers with shuffles. So, we decrease the number of shuffle
+ # partitions to reduce the number of the tasks to speed up the test. This is particularly
+ # slow on Windows because the R workers are unable to be forked. See also SPARK-21693.
+ conf <- callJMethod(sparkSession, "conf")
+ shufflepartitionsvalue <- callJMethod(conf, "get", "spark.sql.shuffle.partitions")
+ callJMethod(conf, "set", "spark.sql.shuffle.partitions", "5")
+ tryCatch({
+ df <- createDataFrame(
+ list(list(1L, 1, "1", 0.1), list(1L, 2, "2", 0.2), list(3L, 3, "3", 0.3)),
+ c("a", "b", "c", "d"))
+
+ # no column and number of partitions specified
+ retError <- tryCatch(repartition(df), error = function(e) e)
+ expect_equal(grepl
+ ("Please, specify the number of partitions and/or a column\\(s\\)", retError), TRUE)
+
+ # repartition by column and number of partitions
+ actual <- repartition(df, 3, col = df$"a")
+
+ # Checking that at least the dimensions are identical
+ expect_identical(dim(df), dim(actual))
+ expect_equal(getNumPartitions(actual), 3L)
+
+ # repartition by number of partitions
+ actual <- repartition(df, 13L)
+ expect_identical(dim(df), dim(actual))
+ expect_equal(getNumPartitions(actual), 13L)
+
+ expect_equal(getNumPartitions(coalesce(actual, 1L)), 1L)
+
+ # a test case with a column and dapply
+ schema <- structType(structField("a", "integer"), structField("avg", "double"))
+ df <- repartition(df, col = df$"a")
+
+ df1 <- dapply(
+ df,
+ function(x) {
+ y <- (data.frame(x$a[1], mean(x$b)))
+ },
+ schema)
- # Number of partitions is equal to 2
- expect_equal(nrow(df1), 2)
+ # Number of partitions is equal to 2
+ expect_equal(nrow(df1), 2)
+ },
+ finally = {
+ # Resetting the conf back to default value
+ callJMethod(conf, "set", "spark.sql.shuffle.partitions", shufflepartitionsvalue)
+ })
})
test_that("coalesce, repartition, numPartitions", {
@@ -3078,101 +3091,117 @@ test_that("coalesce, repartition, numPartitions", {
})
test_that("gapply() and gapplyCollect() on a DataFrame", {
- df <- createDataFrame(
- list(list(1L, 1, "1", 0.1), list(1L, 2, "1", 0.2), list(3L, 3, "3", 0.3)),
- c("a", "b", "c", "d"))
- expected <- collect(df)
- df1 <- gapply(df, "a", function(key, x) { x }, schema(df))
- actual <- collect(df1)
- expect_identical(actual, expected)
-
- df1Collect <- gapplyCollect(df, list("a"), function(key, x) { x })
- expect_identical(df1Collect, expected)
-
- # gapply on empty grouping columns.
- df1 <- gapply(df, c(), function(key, x) { x }, schema(df))
- actual <- collect(df1)
- expect_identical(actual, expected)
-
- # Computes the sum of second column by grouping on the first and third columns
- # and checks if the sum is larger than 2
- schemas <- list(structType(structField("a", "integer"), structField("e", "boolean")),
- "a INT, e BOOLEAN")
- for (schema in schemas) {
- df2 <- gapply(
+ # The tasks here launch R workers with shuffles. So, we decrease the number of shuffle
+ # partitions to reduce the number of the tasks to speed up the test. This is particularly
+ # slow on Windows because the R workers are unable to be forked. See also SPARK-21693.
+ conf <- callJMethod(sparkSession, "conf")
+ shufflepartitionsvalue <- callJMethod(conf, "get", "spark.sql.shuffle.partitions")
+ # TODO: Lower number of 'spark.sql.shuffle.partitions' causes test failures
+ # for an unknown reason. Probably we should fix it.
+ callJMethod(conf, "set", "spark.sql.shuffle.partitions", "16")
+ tryCatch({
+ df <- createDataFrame(
+ list(list(1L, 1, "1", 0.1), list(1L, 2, "1", 0.2), list(3L, 3, "3", 0.3)),
+ c("a", "b", "c", "d"))
+ expected <- collect(df)
+ df1 <- gapply(df, "a", function(key, x) { x }, schema(df))
+ actual <- collect(df1)
+ expect_identical(actual, expected)
+
+ df1Collect <- gapplyCollect(df, list("a"), function(key, x) { x })
+ expect_identical(df1Collect, expected)
+
+ # gapply on empty grouping columns.
+ df1 <- gapply(df, c(), function(key, x) { x }, schema(df))
+ actual <- collect(df1)
+ expect_identical(actual, expected)
+
+ # Computes the sum of second column by grouping on the first and third columns
+ # and checks if the sum is larger than 2
+ schemas <- list(structType(structField("a", "integer"), structField("e", "boolean")),
+ "a INT, e BOOLEAN")
+ for (schema in schemas) {
+ df2 <- gapply(
+ df,
+ c(df$"a", df$"c"),
+ function(key, x) {
+ y <- data.frame(key[1], sum(x$b) > 2)
+ },
+ schema)
+ actual <- collect(df2)$e
+ expected <- c(TRUE, TRUE)
+ expect_identical(actual, expected)
+
+ df2Collect <- gapplyCollect(
+ df,
+ c(df$"a", df$"c"),
+ function(key, x) {
+ y <- data.frame(key[1], sum(x$b) > 2)
+ colnames(y) <- c("a", "e")
+ y
+ })
+ actual <- df2Collect$e
+ expect_identical(actual, expected)
+ }
+
+ # Computes the arithmetic mean of the second column by grouping
+ # on the first and third columns. Output the groupping value and the average.
+ schema <- structType(structField("a", "integer"), structField("c", "string"),
+ structField("avg", "double"))
+ df3 <- gapply(
df,
- c(df$"a", df$"c"),
+ c("a", "c"),
function(key, x) {
- y <- data.frame(key[1], sum(x$b) > 2)
+ y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
},
schema)
- actual <- collect(df2)$e
- expected <- c(TRUE, TRUE)
+ actual <- collect(df3)
+ actual <- actual[order(actual$a), ]
+ rownames(actual) <- NULL
+ expected <- collect(select(df, "a", "b", "c"))
+ expected <- data.frame(aggregate(expected$b, by = list(expected$a, expected$c), FUN = mean))
+ colnames(expected) <- c("a", "c", "avg")
+ expected <- expected[order(expected$a), ]
+ rownames(expected) <- NULL
expect_identical(actual, expected)
- df2Collect <- gapplyCollect(
+ df3Collect <- gapplyCollect(
df,
- c(df$"a", df$"c"),
+ c("a", "c"),
function(key, x) {
- y <- data.frame(key[1], sum(x$b) > 2)
- colnames(y) <- c("a", "e")
+ y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
+ colnames(y) <- c("a", "c", "avg")
y
})
- actual <- df2Collect$e
- expect_identical(actual, expected)
- }
-
- # Computes the arithmetic mean of the second column by grouping
- # on the first and third columns. Output the groupping value and the average.
- schema <- structType(structField("a", "integer"), structField("c", "string"),
- structField("avg", "double"))
- df3 <- gapply(
- df,
- c("a", "c"),
- function(key, x) {
- y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
- },
- schema)
- actual <- collect(df3)
- actual <- actual[order(actual$a), ]
- rownames(actual) <- NULL
- expected <- collect(select(df, "a", "b", "c"))
- expected <- data.frame(aggregate(expected$b, by = list(expected$a, expected$c), FUN = mean))
- colnames(expected) <- c("a", "c", "avg")
- expected <- expected[order(expected$a), ]
- rownames(expected) <- NULL
- expect_identical(actual, expected)
-
- df3Collect <- gapplyCollect(
- df,
- c("a", "c"),
- function(key, x) {
- y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
- colnames(y) <- c("a", "c", "avg")
- y
- })
- actual <- df3Collect[order(df3Collect$a), ]
- expect_identical(actual$avg, expected$avg)
-
- irisDF <- suppressWarnings(createDataFrame(iris))
- schema <- structType(structField("Sepal_Length", "double"), structField("Avg", "double"))
- # Groups by `Sepal_Length` and computes the average for `Sepal_Width`
- df4 <- gapply(
- cols = "Sepal_Length",
- irisDF,
- function(key, x) {
- y <- data.frame(key, mean(x$Sepal_Width), stringsAsFactors = FALSE)
- },
- schema)
- actual <- collect(df4)
- actual <- actual[order(actual$Sepal_Length), ]
- rownames(actual) <- NULL
- agg_local_df <- data.frame(aggregate(iris$Sepal.Width, by = list(iris$Sepal.Length), FUN = mean),
- stringsAsFactors = FALSE)
- colnames(agg_local_df) <- c("Sepal_Length", "Avg")
- expected <- agg_local_df[order(agg_local_df$Sepal_Length), ]
- rownames(expected) <- NULL
- expect_identical(actual, expected)
+ actual <- df3Collect[order(df3Collect$a), ]
+ expect_identical(actual$avg, expected$avg)
+
+ irisDF <- suppressWarnings(createDataFrame(iris))
+ schema <- structType(structField("Sepal_Length", "double"), structField("Avg", "double"))
+ # Groups by `Sepal_Length` and computes the average for `Sepal_Width`
+ df4 <- gapply(
+ cols = "Sepal_Length",
+ irisDF,
+ function(key, x) {
+ y <- data.frame(key, mean(x$Sepal_Width), stringsAsFactors = FALSE)
+ },
+ schema)
+ actual <- collect(df4)
+ actual <- actual[order(actual$Sepal_Length), ]
+ rownames(actual) <- NULL
+ agg_local_df <- data.frame(aggregate(iris$Sepal.Width,
+ by = list(iris$Sepal.Length),
+ FUN = mean),
+ stringsAsFactors = FALSE)
+ colnames(agg_local_df) <- c("Sepal_Length", "Avg")
+ expected <- agg_local_df[order(agg_local_df$Sepal_Length), ]
+ rownames(expected) <- NULL
+ expect_identical(actual, expected)
+ },
+ finally = {
+ # Resetting the conf back to default value
+ callJMethod(conf, "set", "spark.sql.shuffle.partitions", shufflepartitionsvalue)
+ })
})
test_that("Window functions on a DataFrame", {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org