You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2017/11/27 01:09:58 UTC

spark git commit: [SPARK-21693][R][FOLLOWUP] Reduce shuffle partitions running R worker in few tests to speed up

Repository: spark
Updated Branches:
  refs/heads/master fba63c1a7 -> d49d9e403


[SPARK-21693][R][FOLLOWUP] Reduce shuffle partitions running R worker in few tests to speed up

## What changes were proposed in this pull request?

This is a followup to reduce AppVeyor test time. This PR proposes to reduce the number of shuffle partitions to reduce the tasks running R workers in few particular tests.

The symptom is similar as described in `https://github.com/apache/spark/pull/19722`. There are many R processes newly launched on Windows without forking and it makes the differences of elapsed time between Linux and Windows.

Here is the simple comparison for before/after of this change. I manually tested this by disabling `spark.sparkr.use.daemon`. Disabling it resembles the tests on Windows:

**Before**

<img width="672" alt="2017-11-25 12 22 13" src="https://user-images.githubusercontent.com/6477701/33217949-b5528dfa-d17d-11e7-8050-75675c39eb20.png">

**After**

<img width="682" alt="2017-11-25 12 32 00" src="https://user-images.githubusercontent.com/6477701/33217958-c6518052-d17d-11e7-9f8e-1be21a784559.png">

So, this probably will reduce roughly more than 10 minutes.

## How was this patch tested?

AppVeyor tests

Author: hyukjinkwon <gu...@gmail.com>

Closes #19816 from HyukjinKwon/SPARK-21693-followup.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d49d9e40
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d49d9e40
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d49d9e40

Branch: refs/heads/master
Commit: d49d9e40383209eed9584a4ef2c3964f27f4a08f
Parents: fba63c1
Author: hyukjinkwon <gu...@gmail.com>
Authored: Mon Nov 27 10:09:53 2017 +0900
Committer: hyukjinkwon <gu...@gmail.com>
Committed: Mon Nov 27 10:09:53 2017 +0900

----------------------------------------------------------------------
 R/pkg/tests/fulltests/test_sparkSQL.R | 267 ++++++++++++++++-------------
 1 file changed, 148 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d49d9e40/R/pkg/tests/fulltests/test_sparkSQL.R
----------------------------------------------------------------------
diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R
index 00217c8..d87f5d2 100644
--- a/R/pkg/tests/fulltests/test_sparkSQL.R
+++ b/R/pkg/tests/fulltests/test_sparkSQL.R
@@ -3021,41 +3021,54 @@ test_that("dapplyCollect() on DataFrame with a binary column", {
 })
 
 test_that("repartition by columns on DataFrame", {
-  df <- createDataFrame(
-    list(list(1L, 1, "1", 0.1), list(1L, 2, "2", 0.2), list(3L, 3, "3", 0.3)),
-    c("a", "b", "c", "d"))
-
-  # no column and number of partitions specified
-  retError <- tryCatch(repartition(df), error = function(e) e)
-  expect_equal(grepl
-    ("Please, specify the number of partitions and/or a column\\(s\\)", retError), TRUE)
-
-  # repartition by column and number of partitions
-  actual <- repartition(df, 3, col = df$"a")
-
-  # Checking that at least the dimensions are identical
-  expect_identical(dim(df), dim(actual))
-  expect_equal(getNumPartitions(actual), 3L)
-
-  # repartition by number of partitions
-  actual <- repartition(df, 13L)
-  expect_identical(dim(df), dim(actual))
-  expect_equal(getNumPartitions(actual), 13L)
-
-  expect_equal(getNumPartitions(coalesce(actual, 1L)), 1L)
-
-  # a test case with a column and dapply
-  schema <-  structType(structField("a", "integer"), structField("avg", "double"))
-  df <- repartition(df, col = df$"a")
-  df1 <- dapply(
-    df,
-    function(x) {
-      y <- (data.frame(x$a[1], mean(x$b)))
-    },
-    schema)
+  # The tasks here launch R workers with shuffles. So, we decrease the number of shuffle
+  # partitions to reduce the number of the tasks to speed up the test. This is particularly
+  # slow on Windows because the R workers are unable to be forked. See also SPARK-21693.
+  conf <- callJMethod(sparkSession, "conf")
+  shufflepartitionsvalue <- callJMethod(conf, "get", "spark.sql.shuffle.partitions")
+  callJMethod(conf, "set", "spark.sql.shuffle.partitions", "5")
+  tryCatch({
+    df <- createDataFrame(
+      list(list(1L, 1, "1", 0.1), list(1L, 2, "2", 0.2), list(3L, 3, "3", 0.3)),
+      c("a", "b", "c", "d"))
+
+    # no column and number of partitions specified
+    retError <- tryCatch(repartition(df), error = function(e) e)
+    expect_equal(grepl
+      ("Please, specify the number of partitions and/or a column\\(s\\)", retError), TRUE)
+
+    # repartition by column and number of partitions
+    actual <- repartition(df, 3, col = df$"a")
+
+    # Checking that at least the dimensions are identical
+    expect_identical(dim(df), dim(actual))
+    expect_equal(getNumPartitions(actual), 3L)
+
+    # repartition by number of partitions
+    actual <- repartition(df, 13L)
+    expect_identical(dim(df), dim(actual))
+    expect_equal(getNumPartitions(actual), 13L)
+
+    expect_equal(getNumPartitions(coalesce(actual, 1L)), 1L)
+
+    # a test case with a column and dapply
+    schema <-  structType(structField("a", "integer"), structField("avg", "double"))
+    df <- repartition(df, col = df$"a")
+
+    df1 <- dapply(
+      df,
+      function(x) {
+        y <- (data.frame(x$a[1], mean(x$b)))
+      },
+      schema)
 
-  # Number of partitions is equal to 2
-  expect_equal(nrow(df1), 2)
+    # Number of partitions is equal to 2
+    expect_equal(nrow(df1), 2)
+  },
+  finally = {
+    # Resetting the conf back to default value
+    callJMethod(conf, "set", "spark.sql.shuffle.partitions", shufflepartitionsvalue)
+  })
 })
 
 test_that("coalesce, repartition, numPartitions", {
@@ -3078,101 +3091,117 @@ test_that("coalesce, repartition, numPartitions", {
 })
 
 test_that("gapply() and gapplyCollect() on a DataFrame", {
-  df <- createDataFrame(
-    list(list(1L, 1, "1", 0.1), list(1L, 2, "1", 0.2), list(3L, 3, "3", 0.3)),
-    c("a", "b", "c", "d"))
-  expected <- collect(df)
-  df1 <- gapply(df, "a", function(key, x) { x }, schema(df))
-  actual <- collect(df1)
-  expect_identical(actual, expected)
-
-  df1Collect <- gapplyCollect(df, list("a"), function(key, x) { x })
-  expect_identical(df1Collect, expected)
-
-  # gapply on empty grouping columns.
-  df1 <- gapply(df, c(), function(key, x) { x }, schema(df))
-  actual <- collect(df1)
-  expect_identical(actual, expected)
-
-  # Computes the sum of second column by grouping on the first and third columns
-  # and checks if the sum is larger than 2
-  schemas <- list(structType(structField("a", "integer"), structField("e", "boolean")),
-                  "a INT, e BOOLEAN")
-  for (schema in schemas) {
-    df2 <- gapply(
+  # The tasks here launch R workers with shuffles. So, we decrease the number of shuffle
+  # partitions to reduce the number of the tasks to speed up the test. This is particularly
+  # slow on Windows because the R workers are unable to be forked. See also SPARK-21693.
+  conf <- callJMethod(sparkSession, "conf")
+  shufflepartitionsvalue <- callJMethod(conf, "get", "spark.sql.shuffle.partitions")
+  # TODO: Lower number of 'spark.sql.shuffle.partitions' causes test failures
+  # for an unknown reason. Probably we should fix it.
+  callJMethod(conf, "set", "spark.sql.shuffle.partitions", "16")
+  tryCatch({
+    df <- createDataFrame(
+      list(list(1L, 1, "1", 0.1), list(1L, 2, "1", 0.2), list(3L, 3, "3", 0.3)),
+      c("a", "b", "c", "d"))
+    expected <- collect(df)
+    df1 <- gapply(df, "a", function(key, x) { x }, schema(df))
+    actual <- collect(df1)
+    expect_identical(actual, expected)
+
+    df1Collect <- gapplyCollect(df, list("a"), function(key, x) { x })
+    expect_identical(df1Collect, expected)
+
+    # gapply on empty grouping columns.
+    df1 <- gapply(df, c(), function(key, x) { x }, schema(df))
+    actual <- collect(df1)
+    expect_identical(actual, expected)
+
+    # Computes the sum of second column by grouping on the first and third columns
+    # and checks if the sum is larger than 2
+    schemas <- list(structType(structField("a", "integer"), structField("e", "boolean")),
+                    "a INT, e BOOLEAN")
+    for (schema in schemas) {
+      df2 <- gapply(
+        df,
+        c(df$"a", df$"c"),
+        function(key, x) {
+          y <- data.frame(key[1], sum(x$b) > 2)
+        },
+        schema)
+      actual <- collect(df2)$e
+      expected <- c(TRUE, TRUE)
+      expect_identical(actual, expected)
+
+      df2Collect <- gapplyCollect(
+        df,
+        c(df$"a", df$"c"),
+        function(key, x) {
+          y <- data.frame(key[1], sum(x$b) > 2)
+          colnames(y) <- c("a", "e")
+          y
+        })
+      actual <- df2Collect$e
+      expect_identical(actual, expected)
+    }
+
+    # Computes the arithmetic mean of the second column by grouping
+    # on the first and third columns. Output the groupping value and the average.
+    schema <-  structType(structField("a", "integer"), structField("c", "string"),
+                          structField("avg", "double"))
+    df3 <- gapply(
       df,
-      c(df$"a", df$"c"),
+      c("a", "c"),
       function(key, x) {
-        y <- data.frame(key[1], sum(x$b) > 2)
+        y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
       },
       schema)
-    actual <- collect(df2)$e
-    expected <- c(TRUE, TRUE)
+    actual <- collect(df3)
+    actual <- actual[order(actual$a), ]
+    rownames(actual) <- NULL
+    expected <- collect(select(df, "a", "b", "c"))
+    expected <- data.frame(aggregate(expected$b, by = list(expected$a, expected$c), FUN = mean))
+    colnames(expected) <- c("a", "c", "avg")
+    expected <- expected[order(expected$a), ]
+    rownames(expected) <- NULL
     expect_identical(actual, expected)
 
-    df2Collect <- gapplyCollect(
+    df3Collect <- gapplyCollect(
       df,
-      c(df$"a", df$"c"),
+      c("a", "c"),
       function(key, x) {
-        y <- data.frame(key[1], sum(x$b) > 2)
-        colnames(y) <- c("a", "e")
+        y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
+        colnames(y) <- c("a", "c", "avg")
         y
       })
-      actual <- df2Collect$e
-      expect_identical(actual, expected)
-  }
-
-  # Computes the arithmetic mean of the second column by grouping
-  # on the first and third columns. Output the groupping value and the average.
-  schema <-  structType(structField("a", "integer"), structField("c", "string"),
-               structField("avg", "double"))
-  df3 <- gapply(
-    df,
-    c("a", "c"),
-    function(key, x) {
-      y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
-    },
-    schema)
-  actual <- collect(df3)
-  actual <-  actual[order(actual$a), ]
-  rownames(actual) <- NULL
-  expected <- collect(select(df, "a", "b", "c"))
-  expected <- data.frame(aggregate(expected$b, by = list(expected$a, expected$c), FUN = mean))
-  colnames(expected) <- c("a", "c", "avg")
-  expected <-  expected[order(expected$a), ]
-  rownames(expected) <- NULL
-  expect_identical(actual, expected)
-
-  df3Collect <- gapplyCollect(
-    df,
-    c("a", "c"),
-    function(key, x) {
-      y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
-      colnames(y) <- c("a", "c", "avg")
-      y
-    })
-  actual <- df3Collect[order(df3Collect$a), ]
-  expect_identical(actual$avg, expected$avg)
-
-  irisDF <- suppressWarnings(createDataFrame(iris))
-  schema <-  structType(structField("Sepal_Length", "double"), structField("Avg", "double"))
-  # Groups by `Sepal_Length` and computes the average for `Sepal_Width`
-  df4 <- gapply(
-    cols = "Sepal_Length",
-    irisDF,
-    function(key, x) {
-      y <- data.frame(key, mean(x$Sepal_Width), stringsAsFactors = FALSE)
-    },
-    schema)
-  actual <- collect(df4)
-  actual <- actual[order(actual$Sepal_Length), ]
-  rownames(actual) <- NULL
-  agg_local_df <- data.frame(aggregate(iris$Sepal.Width, by = list(iris$Sepal.Length), FUN = mean),
-                    stringsAsFactors = FALSE)
-  colnames(agg_local_df) <- c("Sepal_Length", "Avg")
-  expected <-  agg_local_df[order(agg_local_df$Sepal_Length), ]
-  rownames(expected) <- NULL
-  expect_identical(actual, expected)
+    actual <- df3Collect[order(df3Collect$a), ]
+    expect_identical(actual$avg, expected$avg)
+
+    irisDF <- suppressWarnings(createDataFrame(iris))
+    schema <- structType(structField("Sepal_Length", "double"), structField("Avg", "double"))
+    # Groups by `Sepal_Length` and computes the average for `Sepal_Width`
+    df4 <- gapply(
+      cols = "Sepal_Length",
+      irisDF,
+      function(key, x) {
+        y <- data.frame(key, mean(x$Sepal_Width), stringsAsFactors = FALSE)
+      },
+      schema)
+    actual <- collect(df4)
+    actual <- actual[order(actual$Sepal_Length), ]
+    rownames(actual) <- NULL
+    agg_local_df <- data.frame(aggregate(iris$Sepal.Width,
+                                         by = list(iris$Sepal.Length),
+                                         FUN = mean),
+                               stringsAsFactors = FALSE)
+    colnames(agg_local_df) <- c("Sepal_Length", "Avg")
+    expected <- agg_local_df[order(agg_local_df$Sepal_Length), ]
+    rownames(expected) <- NULL
+    expect_identical(actual, expected)
+  },
+  finally = {
+    # Resetting the conf back to default value
+    callJMethod(conf, "set", "spark.sql.shuffle.partitions", shufflepartitionsvalue)
+  })
 })
 
 test_that("Window functions on a DataFrame", {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org