You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by fe...@apache.org on 2016/10/12 17:32:43 UTC

spark git commit: [SPARK-17790][SPARKR] Support for parallelizing R data.frame larger than 2GB

Repository: spark
Updated Branches:
  refs/heads/master d5580ebaa -> 5cc503f4f


[SPARK-17790][SPARKR] Support for parallelizing R data.frame larger than 2GB

## What changes were proposed in this pull request?
If the R data structure that is being parallelized is larger than `INT_MAX` we use files to transfer data to JVM. The serialization protocol mimics Python pickling. This allows us to simply call `PythonRDD.readRDDFromFile` to create the RDD.

I tested this on my MacBook. Following code works with this patch:
```R
intMax <- .Machine$integer.max
largeVec <- 1:intMax
rdd <- SparkR:::parallelize(sc, largeVec, 2)
```

## How was this patch tested?
* [x] Unit tests

Author: Hossein <ho...@databricks.com>

Closes #15375 from falaki/SPARK-17790.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5cc503f4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5cc503f4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5cc503f4

Branch: refs/heads/master
Commit: 5cc503f4fe9737a4c7947a80eecac053780606df
Parents: d5580eb
Author: Hossein <ho...@databricks.com>
Authored: Wed Oct 12 10:32:38 2016 -0700
Committer: Felix Cheung <fe...@apache.org>
Committed: Wed Oct 12 10:32:38 2016 -0700

----------------------------------------------------------------------
 R/pkg/R/context.R                               | 45 +++++++++++++++++++-
 R/pkg/inst/tests/testthat/test_sparkSQL.R       | 11 +++++
 .../apache/spark/api/r/RBackendHandler.scala    |  2 +-
 .../scala/org/apache/spark/api/r/RRDD.scala     | 13 ++++++
 4 files changed, 68 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5cc503f4/R/pkg/R/context.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/context.R b/R/pkg/R/context.R
index fe2f3e3..438d77a 100644
--- a/R/pkg/R/context.R
+++ b/R/pkg/R/context.R
@@ -87,6 +87,10 @@ objectFile <- function(sc, path, minPartitions = NULL) {
 #' in the list are split into \code{numSlices} slices and distributed to nodes
 #' in the cluster.
 #'
+#' If size of serialized slices is larger than spark.r.maxAllocationLimit or (200MB), the function 
+#' will write it to disk and send the file name to JVM. Also to make sure each slice is not 
+#' larger than that limit, number of slices may be increased.
+#'
 #' @param sc SparkContext to use
 #' @param coll collection to parallelize
 #' @param numSlices number of partitions to create in the RDD
@@ -120,6 +124,11 @@ parallelize <- function(sc, coll, numSlices = 1) {
     coll <- as.list(coll)
   }
 
+  sizeLimit <- getMaxAllocationLimit(sc)
+  objectSize <- object.size(coll)
+
+  # For large objects we make sure the size of each slice is also smaller than sizeLimit
+  numSlices <- max(numSlices, ceiling(objectSize / sizeLimit))
   if (numSlices > length(coll))
     numSlices <- length(coll)
 
@@ -130,12 +139,44 @@ parallelize <- function(sc, coll, numSlices = 1) {
   # 2-tuples of raws
   serializedSlices <- lapply(slices, serialize, connection = NULL)
 
-  jrdd <- callJStatic("org.apache.spark.api.r.RRDD",
-                      "createRDDFromArray", sc, serializedSlices)
+  # The PRC backend cannot handle arguments larger than 2GB (INT_MAX)
+  # If serialized data is safely less than that threshold we send it over the PRC channel.
+  # Otherwise, we write it to a file and send the file name
+  if (objectSize < sizeLimit) {
+    jrdd <- callJStatic("org.apache.spark.api.r.RRDD", "createRDDFromArray", sc, serializedSlices)
+  } else {
+    fileName <- writeToTempFile(serializedSlices)
+    jrdd <- tryCatch(callJStatic(
+        "org.apache.spark.api.r.RRDD", "createRDDFromFile", sc, fileName, as.integer(numSlices)),
+      finally = {
+        file.remove(fileName)
+    })
+  }
 
   RDD(jrdd, "byte")
 }
 
+getMaxAllocationLimit <- function(sc) {
+  conf <- callJMethod(sc, "getConf")
+  as.numeric(
+    callJMethod(conf,
+      "get",
+      "spark.r.maxAllocationLimit",
+      toString(.Machine$integer.max / 10) # Default to a safe value: 200MB
+  ))
+}
+
+writeToTempFile <- function(serializedSlices) {
+  fileName <- tempfile()
+  conn <- file(fileName, "wb")
+  for (slice in serializedSlices) {
+    writeBin(as.integer(length(slice)), conn, endian = "big")
+    writeBin(slice, conn, endian = "big")
+  }
+  close(conn)
+  fileName
+}
+
 #' Include this specified package on all workers
 #'
 #' This function can be used to include a package on all workers before the

http://git-wip-us.apache.org/repos/asf/spark/blob/5cc503f4/R/pkg/inst/tests/testthat/test_sparkSQL.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index 6155424..af81d05 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -208,6 +208,17 @@ test_that("create DataFrame from RDD", {
   unsetHiveContext()
 })
 
+test_that("createDataFrame uses files for large objects", {
+  # To simulate a large file scenario, we set spark.r.maxAllocationLimit to a smaller value
+  conf <- callJMethod(sparkSession, "conf")
+  callJMethod(conf, "set", "spark.r.maxAllocationLimit", "100")
+  df <- createDataFrame(iris)
+
+  # Resetting the conf back to default value
+  callJMethod(conf, "set", "spark.r.maxAllocationLimit", toString(.Machine$integer.max / 10))
+  expect_equal(dim(df), dim(iris))
+})
+
 test_that("read/write csv as DataFrame", {
   csvPath <- tempfile(pattern = "sparkr-test", fileext = ".csv")
   mockLinesCsv <- c("year,make,model,comment,blank",

http://git-wip-us.apache.org/repos/asf/spark/blob/5cc503f4/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala b/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala
index 7d53482..1422ef8 100644
--- a/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala
+++ b/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala
@@ -168,7 +168,7 @@ private[r] class RBackendHandler(server: RBackend)
       }
     } catch {
       case e: Exception =>
-        logError(s"$methodName on $objId failed")
+        logError(s"$methodName on $objId failed", e)
         writeInt(dos, -1)
         // Writing the error message of the cause for the exception. This will be returned
         // to user in the R process.

http://git-wip-us.apache.org/repos/asf/spark/blob/5cc503f4/core/src/main/scala/org/apache/spark/api/r/RRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/r/RRDD.scala b/core/src/main/scala/org/apache/spark/api/r/RRDD.scala
index 59c8429..a1a5eb8 100644
--- a/core/src/main/scala/org/apache/spark/api/r/RRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/r/RRDD.scala
@@ -24,6 +24,7 @@ import scala.reflect.ClassTag
 
 import org.apache.spark._
 import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext}
+import org.apache.spark.api.python.PythonRDD
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
@@ -140,4 +141,16 @@ private[r] object RRDD {
   def createRDDFromArray(jsc: JavaSparkContext, arr: Array[Array[Byte]]): JavaRDD[Array[Byte]] = {
     JavaRDD.fromRDD(jsc.sc.parallelize(arr, arr.length))
   }
+
+  /**
+   * Create an RRDD given a temporary file name. This is used to create RRDD when parallelize is
+   * called on large R objects.
+   *
+   * @param fileName name of temporary file on driver machine
+   * @param parallelism number of slices defaults to 4
+   */
+  def createRDDFromFile(jsc: JavaSparkContext, fileName: String, parallelism: Int):
+  JavaRDD[Array[Byte]] = {
+    PythonRDD.readRDDFromFile(jsc, fileName, parallelism)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org