You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sh...@apache.org on 2016/09/07 06:40:43 UTC
spark git commit: [SPARK-16785] R dapply doesn't return array or raw
columns
Repository: spark
Updated Branches:
refs/heads/master eb1ab88a8 -> 9fccde4ff
[SPARK-16785] R dapply doesn't return array or raw columns
## What changes were proposed in this pull request?
Fixed bug in `dapplyCollect` by changing the `compute` function of `worker.R` to explicitly handle raw (binary) vectors.
cc shivaram
## How was this patch tested?
Unit tests
Author: Clark Fitzgerald <cl...@gmail.com>
Closes #14783 from clarkfitzg/SPARK-16785.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9fccde4f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9fccde4f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9fccde4f
Branch: refs/heads/master
Commit: 9fccde4ff80fb0fd65a9e90eb3337965e4349de4
Parents: eb1ab88
Author: Clark Fitzgerald <cl...@gmail.com>
Authored: Tue Sep 6 23:40:37 2016 -0700
Committer: Shivaram Venkataraman <sh...@cs.berkeley.edu>
Committed: Tue Sep 6 23:40:37 2016 -0700
----------------------------------------------------------------------
R/pkg/R/SQLContext.R | 4 ++++
R/pkg/R/utils.R | 15 +++++++++++++++
R/pkg/inst/tests/testthat/test_sparkSQL.R | 21 +++++++++++++++++++++
R/pkg/inst/tests/testthat/test_utils.R | 24 ++++++++++++++++++++++++
R/pkg/inst/worker/worker.R | 9 ++++++++-
5 files changed, 72 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/9fccde4f/R/pkg/R/SQLContext.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R
index 783df53..ce531c3 100644
--- a/R/pkg/R/SQLContext.R
+++ b/R/pkg/R/SQLContext.R
@@ -202,7 +202,10 @@ getDefaultSqlSource <- function() {
# TODO(davies): support sampling and infer type from NA
createDataFrame.default <- function(data, schema = NULL, samplingRatio = 1.0) {
sparkSession <- getSparkSession()
+
if (is.data.frame(data)) {
+ # Convert data into a list of rows. Each row is a list.
+
# get the names of columns, they will be put into RDD
if (is.null(schema)) {
schema <- names(data)
@@ -227,6 +230,7 @@ createDataFrame.default <- function(data, schema = NULL, samplingRatio = 1.0) {
args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE)
data <- do.call(mapply, append(args, data))
}
+
if (is.list(data)) {
sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession)
rdd <- parallelize(sc, data)
http://git-wip-us.apache.org/repos/asf/spark/blob/9fccde4f/R/pkg/R/utils.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/utils.R b/R/pkg/R/utils.R
index 2809ce5..248c575 100644
--- a/R/pkg/R/utils.R
+++ b/R/pkg/R/utils.R
@@ -697,3 +697,18 @@ isMasterLocal <- function(master) {
isSparkRShell <- function() {
grepl(".*shell\\.R$", Sys.getenv("R_PROFILE_USER"), perl = TRUE)
}
+
+# rbind a list of rows with raw (binary) columns
+#
+# @param inputData a list of rows, with each row a list
+# @return data.frame with raw columns as lists
+rbindRaws <- function(inputData){
+ row1 <- inputData[[1]]
+ rawcolumns <- ("raw" == sapply(row1, class))
+
+ listmatrix <- do.call(rbind, inputData)
+ # A dataframe with all list columns
+ out <- as.data.frame(listmatrix)
+ out[!rawcolumns] <- lapply(out[!rawcolumns], unlist)
+ out
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/9fccde4f/R/pkg/inst/tests/testthat/test_sparkSQL.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index aac3f62..a9bd325 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -2270,6 +2270,27 @@ test_that("dapply() and dapplyCollect() on a DataFrame", {
expect_identical(expected, result)
})
+test_that("dapplyCollect() on DataFrame with a binary column", {
+
+ df <- data.frame(key = 1:3)
+ df$bytes <- lapply(df$key, serialize, connection = NULL)
+
+ df_spark <- createDataFrame(df)
+
+ result1 <- collect(df_spark)
+ expect_identical(df, result1)
+
+ result2 <- dapplyCollect(df_spark, function(x) x)
+ expect_identical(df, result2)
+
+ # A data.frame with a single column of bytes
+ scb <- subset(df, select = "bytes")
+ scb_spark <- createDataFrame(scb)
+ result <- dapplyCollect(scb_spark, function(x) x)
+ expect_identical(scb, result)
+
+})
+
test_that("repartition by columns on DataFrame", {
df <- createDataFrame(
list(list(1L, 1, "1", 0.1), list(1L, 2, "2", 0.2), list(3L, 3, "3", 0.3)),
http://git-wip-us.apache.org/repos/asf/spark/blob/9fccde4f/R/pkg/inst/tests/testthat/test_utils.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/testthat/test_utils.R b/R/pkg/inst/tests/testthat/test_utils.R
index 83e94a1..77f2529 100644
--- a/R/pkg/inst/tests/testthat/test_utils.R
+++ b/R/pkg/inst/tests/testthat/test_utils.R
@@ -183,4 +183,28 @@ test_that("overrideEnvs", {
expect_equal(config[["config_only"]], "ok")
})
+test_that("rbindRaws", {
+
+ # Mixed Column types
+ r <- serialize(1:5, connection = NULL)
+ r1 <- serialize(1, connection = NULL)
+ r2 <- serialize(letters, connection = NULL)
+ r3 <- serialize(1:10, connection = NULL)
+ inputData <- list(list(1L, r1, "a", r), list(2L, r2, "b", r),
+ list(3L, r3, "c", r))
+ expected <- data.frame(V1 = 1:3)
+ expected$V2 <- list(r1, r2, r3)
+ expected$V3 <- c("a", "b", "c")
+ expected$V4 <- list(r, r, r)
+ result <- rbindRaws(inputData)
+ expect_equal(expected, result)
+
+ # Single binary column
+ input <- list(list(r1), list(r2), list(r3))
+ expected <- subset(expected, select = "V2")
+ result <- setNames(rbindRaws(input), "V2")
+ expect_equal(expected, result)
+
+})
+
sparkR.session.stop()
http://git-wip-us.apache.org/repos/asf/spark/blob/9fccde4f/R/pkg/inst/worker/worker.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/worker/worker.R b/R/pkg/inst/worker/worker.R
index debf018..cfe41de 100644
--- a/R/pkg/inst/worker/worker.R
+++ b/R/pkg/inst/worker/worker.R
@@ -36,7 +36,14 @@ compute <- function(mode, partition, serializer, deserializer, key,
# available since R 3.2.4. So we set the global option here.
oldOpt <- getOption("stringsAsFactors")
options(stringsAsFactors = FALSE)
- inputData <- do.call(rbind.data.frame, inputData)
+
+ # Handle binary data types
+ if ("raw" %in% sapply(inputData[[1]], class)) {
+ inputData <- SparkR:::rbindRaws(inputData)
+ } else {
+ inputData <- do.call(rbind.data.frame, inputData)
+ }
+
options(stringsAsFactors = oldOpt)
names(inputData) <- colNames
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org