You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2019/01/27 02:46:04 UTC

[spark] branch master updated: [SPARK-25981][R] Enables Arrow optimization from R DataFrame to Spark DataFrame

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new e8982ca  [SPARK-25981][R] Enables Arrow optimization from R DataFrame to Spark DataFrame
e8982ca is described below

commit e8982ca7ad94e98d907babf2d6f1068b7cd064c6
Author: hyukjinkwon <gu...@apache.org>
AuthorDate: Sun Jan 27 10:45:49 2019 +0800

    [SPARK-25981][R] Enables Arrow optimization from R DataFrame to Spark DataFrame
    
    ## What changes were proposed in this pull request?
    
    This PR targets to support Arrow optimization for conversion from R DataFrame to Spark DataFrame.
    Like PySpark side, it falls back to non-optimization code path when it's unable to use Arrow optimization.
    
    This can be tested as below:
    
    ```bash
    $ ./bin/sparkR --conf spark.sql.execution.arrow.enabled=true
    ```
    
    ```r
    collect(createDataFrame(mtcars))
    ```
    
    ### Requirements
      - R 3.5.x
      - Arrow package 0.12+
        ```bash
        Rscript -e 'remotes::install_github("apache/arrowapache-arrow-0.12.0", subdir = "r")'
        ```
    
    **Note:** currently, Arrow R package is not in CRAN. Please take a look at ARROW-3204.
    **Note:** currently, Arrow R package seems not supporting Windows. Please take a look at ARROW-3204.
    
    ### Benchmarks
    
    **Shall**
    
    ```bash
    sync && sudo purge
    ./bin/sparkR --conf spark.sql.execution.arrow.enabled=false
    ```
    
    ```bash
    sync && sudo purge
    ./bin/sparkR --conf spark.sql.execution.arrow.enabled=true
    ```
    
    **R code**
    
    ```r
    createDataFrame(mtcars) # Initializes
    rdf <- read.csv("500000.csv")
    
    test <- function() {
      options(digits.secs = 6) # milliseconds
      start.time <- Sys.time()
      createDataFrame(rdf)
      end.time <- Sys.time()
      time.taken <- end.time - start.time
      print(time.taken)
    }
    
    test()
    ```
    
    **Data (350 MB):**
    
    ```r
    object.size(read.csv("500000.csv"))
    350379504 bytes
    ```
    
    "500000 Records"  http://eforexcel.com/wp/downloads-16-sample-csv-files-data-sets-for-testing/
    
    **Results**
    
    ```
    Time difference of 29.9468 secs
    ```
    
    ```
    Time difference of 3.222129 secs
    ```
    
    The performance improvement was around **950%**.
    Actually, this PR improves around **1200%**+ because this PR includes a small optimization about regular R DataFrame -> Spark DatFrame. See https://github.com/apache/spark/pull/22954#discussion_r231847272
    
    ### Limitations:
    
    For now, Arrow optimization with R does not support when the data is `raw`, and when user explicitly gives float type in the schema. They produce corrupt values.
    In this case, we decide to fall back to non-optimization code path.
    
    ## How was this patch tested?
    
    Small test was added.
    
    I manually forced to set this optimization `true` for _all_ R tests and they were _all_ passed (with few of fallback warnings).
    
    **TODOs:**
    - [x] Draft codes
    - [x] make the tests passed
    - [x] make the CRAN check pass
    - [x] Performance measurement
    - [x] Supportability investigation (for instance types)
    - [x] Wait for Arrow 0.12.0 release
    - [x] Fix and match it to Arrow 0.12.0
    
    Closes #22954 from HyukjinKwon/r-arrow-createdataframe.
    
    Lead-authored-by: hyukjinkwon <gu...@apache.org>
    Co-authored-by: Hyukjin Kwon <gu...@apache.org>
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
 R/pkg/R/SQLContext.R                               | 161 +++++++++++++++++----
 R/pkg/R/context.R                                  |  40 ++---
 R/pkg/tests/fulltests/test_sparkSQL.R              |  57 ++++++++
 .../org/apache/spark/sql/internal/SQLConf.scala    |   5 +-
 .../org/apache/spark/sql/api/r/SQLUtils.scala      |  22 +++
 5 files changed, 239 insertions(+), 46 deletions(-)

diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R
index afcdd6f..2e5506a 100644
--- a/R/pkg/R/SQLContext.R
+++ b/R/pkg/R/SQLContext.R
@@ -147,6 +147,70 @@ getDefaultSqlSource <- function() {
   l[["spark.sql.sources.default"]]
 }
 
+writeToFileInArrow <- function(fileName, rdf, numPartitions) {
+  requireNamespace1 <- requireNamespace
+
+  # R API in Arrow is not yet released in CRAN. CRAN requires to add the
+  # package in requireNamespace at DESCRIPTION. Later, CRAN checks if the package is available
+  # or not. Therefore, it works around by avoiding direct requireNamespace.
+  # Currently, as of Arrow 0.12.0, it can be installed by install_github. See ARROW-3204.
+  if (requireNamespace1("arrow", quietly = TRUE)) {
+    record_batch <- get("record_batch", envir = asNamespace("arrow"), inherits = FALSE)
+    RecordBatchStreamWriter <- get(
+      "RecordBatchStreamWriter", envir = asNamespace("arrow"), inherits = FALSE)
+    FileOutputStream <- get(
+      "FileOutputStream", envir = asNamespace("arrow"), inherits = FALSE)
+
+    numPartitions <- if (!is.null(numPartitions)) {
+      numToInt(numPartitions)
+    } else {
+      1
+    }
+
+    rdf_slices <- if (numPartitions > 1) {
+      split(rdf, makeSplits(numPartitions, nrow(rdf)))
+    } else {
+      list(rdf)
+    }
+
+    stream_writer <- NULL
+    tryCatch({
+      for (rdf_slice in rdf_slices) {
+        batch <- record_batch(rdf_slice)
+        if (is.null(stream_writer)) {
+          stream <- FileOutputStream(fileName)
+          schema <- batch$schema
+          stream_writer <- RecordBatchStreamWriter(stream, schema)
+        }
+
+        stream_writer$write_batch(batch)
+      }
+    },
+    finally = {
+      if (!is.null(stream_writer)) {
+        stream_writer$close()
+      }
+    })
+
+  } else {
+    stop("'arrow' package should be installed.")
+  }
+}
+
+checkTypeRequirementForArrow <- function(dataHead, schema) {
+  # Currenty Arrow optimization does not support raw for now.
+  # Also, it does not support explicit float type set by users. It leads to
+  # incorrect conversion. We will fall back to the path without Arrow optimization.
+  if (any(sapply(dataHead, is.raw))) {
+    stop("Arrow optimization with R DataFrame does not support raw type yet.")
+  }
+  if (inherits(schema, "structType")) {
+    if (any(sapply(schema$fields(), function(x) x$dataType.toString() == "FloatType"))) {
+      stop("Arrow optimization with R DataFrame does not support FloatType type yet.")
+    }
+  }
+}
+
 #' Create a SparkDataFrame
 #'
 #' Converts R data.frame or list into SparkDataFrame.
@@ -172,36 +236,76 @@ getDefaultSqlSource <- function() {
 createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0,
                             numPartitions = NULL) {
   sparkSession <- getSparkSession()
+  arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]] == "true"
+  useArrow <- FALSE
+  firstRow <- NULL
 
   if (is.data.frame(data)) {
-      # Convert data into a list of rows. Each row is a list.
-
-      # get the names of columns, they will be put into RDD
-      if (is.null(schema)) {
-        schema <- names(data)
-      }
+    # get the names of columns, they will be put into RDD
+    if (is.null(schema)) {
+      schema <- names(data)
+    }
 
-      # get rid of factor type
-      cleanCols <- function(x) {
-        if (is.factor(x)) {
-          as.character(x)
-        } else {
-          x
-        }
+    # get rid of factor type
+    cleanCols <- function(x) {
+      if (is.factor(x)) {
+        as.character(x)
+      } else {
+        x
       }
+    }
+    data[] <- lapply(data, cleanCols)
+
+    args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE)
+    if (arrowEnabled) {
+      useArrow <- tryCatch({
+        stopifnot(length(data) > 0)
+        dataHead <- head(data, 1)
+        checkTypeRequirementForArrow(data, schema)
+        fileName <- tempfile(pattern = "sparwriteToFileInArrowk-arrow", fileext = ".tmp")
+        tryCatch({
+          writeToFileInArrow(fileName, data, numPartitions)
+          jrddInArrow <- callJStatic("org.apache.spark.sql.api.r.SQLUtils",
+                                     "readArrowStreamFromFile",
+                                     sparkSession,
+                                     fileName)
+        },
+        finally = {
+          # File might not be created.
+          suppressWarnings(file.remove(fileName))
+        })
+
+        firstRow <- do.call(mapply, append(args, dataHead))[[1]]
+        TRUE
+      },
+      error = function(e) {
+        warning(paste0("createDataFrame attempted Arrow optimization because ",
+                       "'spark.sql.execution.arrow.enabled' is set to true; however, ",
+                       "failed, attempting non-optimization. Reason: ",
+                       e))
+        FALSE
+      })
+    }
 
+    if (!useArrow) {
+      # Convert data into a list of rows. Each row is a list.
       # drop factors and wrap lists
-      data <- setNames(lapply(data, cleanCols), NULL)
+      data <- setNames(as.list(data), NULL)
 
       # check if all columns have supported type
       lapply(data, getInternalType)
 
       # convert to rows
-      args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE)
       data <- do.call(mapply, append(args, data))
+      if (length(data) > 0) {
+        firstRow <- data[[1]]
+      }
+    }
   }
 
-  if (is.list(data)) {
+  if (useArrow) {
+    rdd <- jrddInArrow
+  } else if (is.list(data)) {
     sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession)
     if (!is.null(numPartitions)) {
       rdd <- parallelize(sc, data, numSlices = numToInt(numPartitions))
@@ -215,14 +319,16 @@ createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0,
   }
 
   if (is.null(schema) || (!inherits(schema, "structType") && is.null(names(schema)))) {
-    row <- firstRDD(rdd)
+    if (is.null(firstRow)) {
+      firstRow <- firstRDD(rdd)
+    }
     names <- if (is.null(schema)) {
-      names(row)
+      names(firstRow)
     } else {
       as.list(schema)
     }
     if (is.null(names)) {
-      names <- lapply(1:length(row), function(x) {
+      names <- lapply(1:length(firstRow), function(x) {
         paste("_", as.character(x), sep = "")
       })
     }
@@ -237,8 +343,8 @@ createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0,
       nn
     })
 
-    types <- lapply(row, infer_type)
-    fields <- lapply(1:length(row), function(i) {
+    types <- lapply(firstRow, infer_type)
+    fields <- lapply(1:length(firstRow), function(i) {
       structField(names[[i]], types[[i]], TRUE)
     })
     schema <- do.call(structType, fields)
@@ -246,10 +352,15 @@ createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0,
 
   stopifnot(class(schema) == "structType")
 
-  jrdd <- getJRDD(lapply(rdd, function(x) x), "row")
-  srdd <- callJMethod(jrdd, "rdd")
-  sdf <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "createDF",
-                     srdd, schema$jobj, sparkSession)
+  if (useArrow) {
+    sdf <- callJStatic("org.apache.spark.sql.api.r.SQLUtils",
+                       "toDataFrame", rdd, schema$jobj, sparkSession)
+  } else {
+    jrdd <- getJRDD(lapply(rdd, function(x) x), "row")
+    srdd <- callJMethod(jrdd, "rdd")
+    sdf <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "createDF",
+                       srdd, schema$jobj, sparkSession)
+  }
   dataFrame(sdf)
 }
 
diff --git a/R/pkg/R/context.R b/R/pkg/R/context.R
index 0207f24..bac3efd 100644
--- a/R/pkg/R/context.R
+++ b/R/pkg/R/context.R
@@ -81,6 +81,26 @@ objectFile <- function(sc, path, minPartitions = NULL) {
   RDD(jrdd, "byte")
 }
 
+makeSplits <- function(numSerializedSlices, length) {
+  # Generate the slice ids to put each row
+  # For instance, for numSerializedSlices of 22, length of 50
+  #  [1]  0  0  2  2  4  4  6  6  6  9  9 11 11 13 13 15 15 15 18 18 20 20 22 22 22
+  # [26] 25 25 27 27 29 29 31 31 31 34 34 36 36 38 38 40 40 40 43 43 45 45 47 47 47
+  # Notice the slice group with 3 slices (ie. 6, 15, 22) are roughly evenly spaced.
+  # We are trying to reimplement the calculation in the positions method in ParallelCollectionRDD
+  if (numSerializedSlices > 0) {
+    unlist(lapply(0: (numSerializedSlices - 1), function(x) {
+      # nolint start
+      start <- trunc((as.numeric(x) * length) / numSerializedSlices)
+      end <- trunc(((as.numeric(x) + 1) * length) / numSerializedSlices)
+      # nolint end
+      rep(start, end - start)
+    }))
+  } else {
+    1
+  }
+}
+
 #' Create an RDD from a homogeneous list or vector.
 #'
 #' This function creates an RDD from a local homogeneous list in R. The elements
@@ -143,25 +163,7 @@ parallelize <- function(sc, coll, numSlices = 1) {
   # For large objects we make sure the size of each slice is also smaller than sizeLimit
   numSerializedSlices <- min(len, max(numSlices, ceiling(objectSize / sizeLimit)))
 
-  # Generate the slice ids to put each row
-  # For instance, for numSerializedSlices of 22, length of 50
-  #  [1]  0  0  2  2  4  4  6  6  6  9  9 11 11 13 13 15 15 15 18 18 20 20 22 22 22
-  # [26] 25 25 27 27 29 29 31 31 31 34 34 36 36 38 38 40 40 40 43 43 45 45 47 47 47
-  # Notice the slice group with 3 slices (ie. 6, 15, 22) are roughly evenly spaced.
-  # We are trying to reimplement the calculation in the positions method in ParallelCollectionRDD
-  splits <- if (numSerializedSlices > 0) {
-    unlist(lapply(0: (numSerializedSlices - 1), function(x) {
-      # nolint start
-      start <- trunc((as.numeric(x) * len) / numSerializedSlices)
-      end <- trunc(((as.numeric(x) + 1) * len) / numSerializedSlices)
-      # nolint end
-      rep(start, end - start)
-    }))
-  } else {
-    1
-  }
-
-  slices <- split(coll, splits)
+  slices <- split(coll, makeSplits(numSerializedSlices, len))
 
   # Serialize each slice: obtain a list of raws, or a list of lists (slices) of
   # 2-tuples of raws
diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R
index 88f2286..93cb890 100644
--- a/R/pkg/tests/fulltests/test_sparkSQL.R
+++ b/R/pkg/tests/fulltests/test_sparkSQL.R
@@ -307,6 +307,63 @@ test_that("create DataFrame from RDD", {
   unsetHiveContext()
 })
 
+test_that("createDataFrame Arrow optimization", {
+  skip_if_not_installed("arrow")
+
+  conf <- callJMethod(sparkSession, "conf")
+  arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]]
+
+  callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "false")
+  tryCatch({
+    expected <- collect(createDataFrame(mtcars))
+  },
+  finally = {
+    # Resetting the conf back to default value
+    callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
+  })
+
+  callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "true")
+  tryCatch({
+    expect_equal(collect(createDataFrame(mtcars)), expected)
+  },
+  finally = {
+    # Resetting the conf back to default value
+    callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
+  })
+})
+
+test_that("createDataFrame Arrow optimization - type specification", {
+  skip_if_not_installed("arrow")
+  rdf <- data.frame(list(list(a = 1,
+                              b = "a",
+                              c = TRUE,
+                              d = 1.1,
+                              e = 1L,
+                              f = as.Date("1990-02-24"),
+                              g = as.POSIXct("1990-02-24 12:34:56"))))
+
+  arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]]
+  conf <- callJMethod(sparkSession, "conf")
+
+  callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "false")
+  tryCatch({
+    expected <- collect(createDataFrame(rdf))
+  },
+  finally = {
+    # Resetting the conf back to default value
+    callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
+  })
+
+  callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "true")
+  tryCatch({
+    expect_equal(collect(createDataFrame(rdf)), expected)
+  },
+  finally = {
+    # Resetting the conf back to default value
+    callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
+  })
+})
+
 test_that("read/write csv as DataFrame", {
   if (windows_with_hadoop()) {
     csvPath <- tempfile(pattern = "sparkr-test", fileext = ".csv")
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index da595e7..4484273 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1286,8 +1286,9 @@ object SQLConf {
   val ARROW_EXECUTION_ENABLED =
     buildConf("spark.sql.execution.arrow.enabled")
       .doc("When true, make use of Apache Arrow for columnar data transfers. Currently available " +
-        "for use with pyspark.sql.DataFrame.toPandas, and " +
-        "pyspark.sql.SparkSession.createDataFrame when its input is a Pandas DataFrame. " +
+        "for use with pyspark.sql.DataFrame.toPandas, " +
+        "pyspark.sql.SparkSession.createDataFrame when its input is a Pandas DataFrame, " +
+        "and createDataFrame when its input is an R DataFrame. " +
         "The following data types are unsupported: " +
         "BinaryType, MapType, ArrayType of TimestampType, and nested StructType.")
       .booleanConf
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
index f5d8d4e..693be99 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
@@ -32,6 +32,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.expressions.{ExprUtils, GenericRowWithSchema}
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
+import org.apache.spark.sql.execution.arrow.ArrowConverters
 import org.apache.spark.sql.execution.command.ShowTablesCommand
 import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
 import org.apache.spark.sql.types._
@@ -237,4 +238,25 @@ private[sql] object SQLUtils extends Logging {
   def createArrayType(column: Column): ArrayType = {
     new ArrayType(ExprUtils.evalTypeExpr(column.expr), true)
   }
+
+  /**
+   * R callable function to read a file in Arrow stream format and create an `RDD`
+   * using each serialized ArrowRecordBatch as a partition.
+   */
+  def readArrowStreamFromFile(
+      sparkSession: SparkSession,
+      filename: String): JavaRDD[Array[Byte]] = {
+    ArrowConverters.readArrowStreamFromFile(sparkSession.sqlContext, filename)
+  }
+
+  /**
+   * R callable function to create a `DataFrame` from a `JavaRDD` of serialized
+   * ArrowRecordBatches.
+   */
+  def toDataFrame(
+      arrowBatchRDD: JavaRDD[Array[Byte]],
+      schema: StructType,
+      sparkSession: SparkSession): DataFrame = {
+    ArrowConverters.toDataFrame(arrowBatchRDD, schema.json, sparkSession.sqlContext)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org