You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by sun-rui <gi...@git.apache.org> on 2016/05/08 12:06:23 UTC

[GitHub] spark pull request: [SPARK-15202][SPARKR] add dapplyCollect() meth...

GitHub user sun-rui opened a pull request:

    https://github.com/apache/spark/pull/12989

    [SPARK-15202][SPARKR] add dapplyCollect() method for DataFrame in SparkR.

    ## What changes were proposed in this pull request?
    
    dapplyCollect() applies an R function on each partition of a SparkDataFrame and collects the result back to R as a data.frame.
    ```
    dapplyCollect(df, function(ldf) {...})
    ```
    
    ## How was this patch tested?
    SparkR unit tests.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/sun-rui/spark SPARK-15202

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/12989.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #12989
    
----
commit a728af5d73177235f0c79823a4cf50b8f850a9df
Author: Sun Rui <su...@gmail.com>
Date:   2016-05-08T12:01:24Z

    [SPARK-15202][SPARKR] add dapplyCollect() method for DataFrame in SparkR.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15202][SPARKR] add dapplyCollect() meth...

Posted by sun-rui <gi...@git.apache.org>.
Github user sun-rui commented on the pull request:

    https://github.com/apache/spark/pull/12989#issuecomment-218923967
  
    @shivaram,could you merge it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15202][SPARKR] add dapplyCollect() meth...

Posted by sun-rui <gi...@git.apache.org>.
Github user sun-rui commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12989#discussion_r62601728
  
    --- Diff: R/pkg/R/DataFrame.R ---
    @@ -1197,21 +1215,57 @@ setMethod("summarize",
     setMethod("dapply",
               signature(x = "SparkDataFrame", func = "function", schema = "structType"),
               function(x, func, schema) {
    -            packageNamesArr <- serialize(.sparkREnv[[".packages"]],
    -                                         connection = NULL)
    -
    -            broadcastArr <- lapply(ls(.broadcastNames),
    -                                   function(name) { get(name, .broadcastNames) })
    -
    -            sdf <- callJStatic(
    -                     "org.apache.spark.sql.api.r.SQLUtils",
    -                     "dapply",
    -                     x@sdf,
    -                     serialize(cleanClosure(func), connection = NULL),
    -                     packageNamesArr,
    -                     broadcastArr,
    -                     schema$jobj)
    -            dataFrame(sdf)
    +            dapplyInternal(x, func, schema)
    +          })
    +
    +#' dapplyCollect
    +#'
    +#' Apply a function to each partition of a SparkDataFrame and collect the result back
    +#’ to R as a data.frame.
    +#'
    +#' @param x A SparkDataFrame
    +#' @param func A function to be applied to each partition of the SparkDataFrame.
    +#'             func should have only one parameter, to which a data.frame corresponds
    +#'             to each partition will be passed.
    +#'             The output of func should be a data.frame.
    +#' @family SparkDataFrame functions
    +#' @rdname dapply
    --- End diff --
    
    I think dapply and dapplyCollect are closely related. So I put them in the same RD. Is this acceptable?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15202][SPARKR] add dapplyCollect() meth...

Posted by shivaram <gi...@git.apache.org>.
Github user shivaram commented on the pull request:

    https://github.com/apache/spark/pull/12989#issuecomment-218925573
  
    LGTM. Merging this to master, branch-2.0


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15202][SPARKR] add dapplyCollect() meth...

Posted by NarineK <gi...@git.apache.org>.
Github user NarineK commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12989#discussion_r62716138
  
    --- Diff: R/pkg/inst/tests/testthat/test_sparkSQL.R ---
    @@ -2070,6 +2072,16 @@ test_that("dapply() on a DataFrame", {
       rownames(expected) <- NULL
       expect_identical(expected, result)
     
    +  result <- dapplyCollect(
    +              df,
    +              function(x) {
    +                y <- x[x$a > 1, ]
    +                y <- cbind(y, y$a + 1L)
    --- End diff --
    
    I see. In our previous product we could do that. Pass other parameters. I'll take a look and might have some suggestions.
    But I think this is not specifically related to this pull request.
    LGTM!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15202][SPARKR] add dapplyCollect() meth...

Posted by felixcheung <gi...@git.apache.org>.
Github user felixcheung commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12989#discussion_r62587086
  
    --- Diff: R/pkg/R/DataFrame.R ---
    @@ -1197,21 +1215,57 @@ setMethod("summarize",
     setMethod("dapply",
               signature(x = "SparkDataFrame", func = "function", schema = "structType"),
               function(x, func, schema) {
    -            packageNamesArr <- serialize(.sparkREnv[[".packages"]],
    -                                         connection = NULL)
    -
    -            broadcastArr <- lapply(ls(.broadcastNames),
    -                                   function(name) { get(name, .broadcastNames) })
    -
    -            sdf <- callJStatic(
    -                     "org.apache.spark.sql.api.r.SQLUtils",
    -                     "dapply",
    -                     x@sdf,
    -                     serialize(cleanClosure(func), connection = NULL),
    -                     packageNamesArr,
    -                     broadcastArr,
    -                     schema$jobj)
    -            dataFrame(sdf)
    +            dapplyInternal(x, func, schema)
    --- End diff --
    
    should we require schema != NULL here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15202][SPARKR] add dapplyCollect() meth...

Posted by sun-rui <gi...@git.apache.org>.
Github user sun-rui commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12989#discussion_r62633115
  
    --- Diff: R/pkg/inst/tests/testthat/test_sparkSQL.R ---
    @@ -2070,6 +2072,16 @@ test_that("dapply() on a DataFrame", {
       rownames(expected) <- NULL
       expect_identical(expected, result)
     
    +  result <- dapplyCollect(
    +              df,
    +              function(x) {
    +                y <- x[x$a > 1, ]
    +                y <- cbind(y, y$a + 1L)
    --- End diff --
    
    No, the R function can't take addtional arguments, as R worker don't know how to do it.
    If the additional data is small, it can be captured in the closure of the R function. Otherwise, broadcast variable can be used.
    Does this answer your question?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15202][SPARKR] add dapplyCollect() meth...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/12989#issuecomment-217713372
  
    **[Test build #58104 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/58104/consoleFull)** for PR 12989 at commit [`a728af5`](https://github.com/apache/spark/commit/a728af5d73177235f0c79823a4cf50b8f850a9df).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15202][SPARKR] add dapplyCollect() meth...

Posted by felixcheung <gi...@git.apache.org>.
Github user felixcheung commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12989#discussion_r62587150
  
    --- Diff: R/pkg/R/DataFrame.R ---
    @@ -1197,21 +1215,57 @@ setMethod("summarize",
     setMethod("dapply",
               signature(x = "SparkDataFrame", func = "function", schema = "structType"),
               function(x, func, schema) {
    -            packageNamesArr <- serialize(.sparkREnv[[".packages"]],
    -                                         connection = NULL)
    -
    -            broadcastArr <- lapply(ls(.broadcastNames),
    -                                   function(name) { get(name, .broadcastNames) })
    -
    -            sdf <- callJStatic(
    -                     "org.apache.spark.sql.api.r.SQLUtils",
    -                     "dapply",
    -                     x@sdf,
    -                     serialize(cleanClosure(func), connection = NULL),
    -                     packageNamesArr,
    -                     broadcastArr,
    -                     schema$jobj)
    -            dataFrame(sdf)
    +            dapplyInternal(x, func, schema)
    --- End diff --
    
    nevermind - it's in the signature.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15202][SPARKR] add dapplyCollect() meth...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/12989#issuecomment-217713498
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/58104/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15202][SPARKR] add dapplyCollect() meth...

Posted by NarineK <gi...@git.apache.org>.
Github user NarineK commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12989#discussion_r62576985
  
    --- Diff: R/pkg/R/DataFrame.R ---
    @@ -1197,21 +1215,57 @@ setMethod("summarize",
     setMethod("dapply",
               signature(x = "SparkDataFrame", func = "function", schema = "structType"),
               function(x, func, schema) {
    -            packageNamesArr <- serialize(.sparkREnv[[".packages"]],
    -                                         connection = NULL)
    -
    -            broadcastArr <- lapply(ls(.broadcastNames),
    -                                   function(name) { get(name, .broadcastNames) })
    -
    -            sdf <- callJStatic(
    -                     "org.apache.spark.sql.api.r.SQLUtils",
    -                     "dapply",
    -                     x@sdf,
    -                     serialize(cleanClosure(func), connection = NULL),
    -                     packageNamesArr,
    -                     broadcastArr,
    -                     schema$jobj)
    -            dataFrame(sdf)
    +            dapplyInternal(x, func, schema)
    +          })
    +
    +#' dapplyCollect
    +#'
    +#' Apply a function to each partition of a SparkDataFrame and collect the result back
    +#’ to R as a data.frame.
    +#'
    +#' @param x A SparkDataFrame
    +#' @param func A function to be applied to each partition of the SparkDataFrame.
    +#'             func should have only one parameter, to which a data.frame corresponds
    +#'             to each partition will be passed.
    +#'             The output of func should be a data.frame.
    +#' @family SparkDataFrame functions
    +#' @rdname dapply
    +#' @name dapplyCollect
    +#' @export
    +#' @examples
    +#' \dontrun{
    +#'   df <- createDataFrame (sqlContext, iris)
    +#'   ldf <- dapplyCollect(df, function(x) { x })
    +#'
    +#'   # filter and add a column
    +#'   df <- createDataFrame (
    +#'           sqlContext, 
    +#'           list(list(1L, 1, "1"), list(2L, 2, "2"), list(3L, 3, "3")),
    +#'           c("a", "b", "c"))
    +#'   ldf <- dapplyCollect(
    +#'            df,
    +#'            function(x) {
    +#'              y <- x[x[1] > 1, ]
    +#'              y <- cbind(y, y[1] + 1L)
    +#'            })
    +#'   # the result
    +#'   #       a b c d
    +#'   #       2 2 2 3
    +#'   #       3 3 3 4
    --- End diff --
    
    These seem to be the same examples from dapply ;)
    Maybe we can have different ones here :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15202][SPARKR] add dapplyCollect() meth...

Posted by NarineK <gi...@git.apache.org>.
Github user NarineK commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12989#discussion_r62592006
  
    --- Diff: R/pkg/inst/tests/testthat/test_sparkSQL.R ---
    @@ -2070,6 +2072,16 @@ test_that("dapply() on a DataFrame", {
       rownames(expected) <- NULL
       expect_identical(expected, result)
     
    +  result <- dapplyCollect(
    +              df,
    +              function(x) {
    +                y <- x[x$a > 1, ]
    +                y <- cbind(y, y$a + 1L)
    --- End diff --
    
    Maybe, we can add test cases where the function returns data.frame, list and other data structures. What do you think ? 
    In general, it seems that we only support a subset of R's data types and there might be issues when we train a model on a worker and try to pass that model to a predict to make predictions using a separate dapply method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15202][SPARKR] add dapplyCollect() meth...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/12989#issuecomment-217713494
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15202][SPARKR] add dapplyCollect() meth...

Posted by sun-rui <gi...@git.apache.org>.
Github user sun-rui commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12989#discussion_r62601678
  
    --- Diff: R/pkg/R/DataFrame.R ---
    @@ -1197,21 +1215,57 @@ setMethod("summarize",
     setMethod("dapply",
               signature(x = "SparkDataFrame", func = "function", schema = "structType"),
               function(x, func, schema) {
    -            packageNamesArr <- serialize(.sparkREnv[[".packages"]],
    -                                         connection = NULL)
    -
    -            broadcastArr <- lapply(ls(.broadcastNames),
    -                                   function(name) { get(name, .broadcastNames) })
    -
    -            sdf <- callJStatic(
    -                     "org.apache.spark.sql.api.r.SQLUtils",
    -                     "dapply",
    -                     x@sdf,
    -                     serialize(cleanClosure(func), connection = NULL),
    -                     packageNamesArr,
    -                     broadcastArr,
    -                     schema$jobj)
    -            dataFrame(sdf)
    +            dapplyInternal(x, func, schema)
    +          })
    +
    +#' dapplyCollect
    +#'
    +#' Apply a function to each partition of a SparkDataFrame and collect the result back
    +#’ to R as a data.frame.
    +#'
    +#' @param x A SparkDataFrame
    +#' @param func A function to be applied to each partition of the SparkDataFrame.
    +#'             func should have only one parameter, to which a data.frame corresponds
    +#'             to each partition will be passed.
    +#'             The output of func should be a data.frame.
    +#' @family SparkDataFrame functions
    +#' @rdname dapply
    +#' @name dapplyCollect
    +#' @export
    +#' @examples
    +#' \dontrun{
    +#'   df <- createDataFrame (sqlContext, iris)
    +#'   ldf <- dapplyCollect(df, function(x) { x })
    +#'
    +#'   # filter and add a column
    +#'   df <- createDataFrame (
    +#'           sqlContext, 
    +#'           list(list(1L, 1, "1"), list(2L, 2, "2"), list(3L, 3, "3")),
    +#'           c("a", "b", "c"))
    +#'   ldf <- dapplyCollect(
    +#'            df,
    +#'            function(x) {
    +#'              y <- x[x[1] > 1, ]
    +#'              y <- cbind(y, y[1] + 1L)
    +#'            })
    +#'   # the result
    +#'   #       a b c d
    +#'   #       2 2 2 3
    +#'   #       3 3 3 4
    --- End diff --
    
    I think same examples are OK. just demonstrate the usage.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15202][SPARKR] add dapplyCollect() meth...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/12989


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15202][SPARKR] add dapplyCollect() meth...

Posted by NarineK <gi...@git.apache.org>.
Github user NarineK commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12989#discussion_r62619994
  
    --- Diff: R/pkg/inst/tests/testthat/test_sparkSQL.R ---
    @@ -2070,6 +2072,16 @@ test_that("dapply() on a DataFrame", {
       rownames(expected) <- NULL
       expect_identical(expected, result)
     
    +  result <- dapplyCollect(
    +              df,
    +              function(x) {
    +                y <- x[x$a > 1, ]
    +                y <- cbind(y, y$a + 1L)
    --- End diff --
    
    That is : Can I pass other arguments to the R function except the data frame


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15202][SPARKR] add dapplyCollect() meth...

Posted by sun-rui <gi...@git.apache.org>.
Github user sun-rui commented on the pull request:

    https://github.com/apache/spark/pull/12989#issuecomment-217777755
  
    cc @shivaram, @felixcheung , @NarineK 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15202][SPARKR] add dapplyCollect() meth...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/12989#issuecomment-217712089
  
    **[Test build #58104 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/58104/consoleFull)** for PR 12989 at commit [`a728af5`](https://github.com/apache/spark/commit/a728af5d73177235f0c79823a4cf50b8f850a9df).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15202][SPARKR] add dapplyCollect() meth...

Posted by sun-rui <gi...@git.apache.org>.
Github user sun-rui commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12989#discussion_r62601918
  
    --- Diff: R/pkg/inst/tests/testthat/test_sparkSQL.R ---
    @@ -2070,6 +2072,16 @@ test_that("dapply() on a DataFrame", {
       rownames(expected) <- NULL
       expect_identical(expected, result)
     
    +  result <- dapplyCollect(
    +              df,
    +              function(x) {
    +                y <- x[x$a > 1, ]
    +                y <- cbind(y, y$a + 1L)
    --- End diff --
    
    @NarineK, not sure if I understand. the user input R function should return data.frame.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15202][SPARKR] add dapplyCollect() meth...

Posted by NarineK <gi...@git.apache.org>.
Github user NarineK commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12989#discussion_r62619916
  
    --- Diff: R/pkg/inst/tests/testthat/test_sparkSQL.R ---
    @@ -2070,6 +2072,16 @@ test_that("dapply() on a DataFrame", {
       rownames(expected) <- NULL
       expect_identical(expected, result)
     
    +  result <- dapplyCollect(
    +              df,
    +              function(x) {
    +                y <- x[x$a > 1, ]
    +                y <- cbind(y, y$a + 1L)
    --- End diff --
    
    Well, let's assume I use dapply to train linear model. The model object contains a list of different attributes for example coefficients, residuals , ... etc. 
    model <- lm(Species ~ ., iris).
    dapply wraps my model in a data.frame which is fine.
    
    Now, as next I'd like to make predictions using dapply and the model which I've previously trained.
    This means, that I probably need something like this: 
    ```
    df1 <- dapply(
        df, 
       model,
        function (x, model) {
           predict (model, x, ... )
       }
    )
    ```
    Can we do something similar ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15202][SPARKR] add dapplyCollect() meth...

Posted by felixcheung <gi...@git.apache.org>.
Github user felixcheung commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12989#discussion_r62587316
  
    --- Diff: R/pkg/R/DataFrame.R ---
    @@ -1197,21 +1215,57 @@ setMethod("summarize",
     setMethod("dapply",
               signature(x = "SparkDataFrame", func = "function", schema = "structType"),
               function(x, func, schema) {
    -            packageNamesArr <- serialize(.sparkREnv[[".packages"]],
    -                                         connection = NULL)
    -
    -            broadcastArr <- lapply(ls(.broadcastNames),
    -                                   function(name) { get(name, .broadcastNames) })
    -
    -            sdf <- callJStatic(
    -                     "org.apache.spark.sql.api.r.SQLUtils",
    -                     "dapply",
    -                     x@sdf,
    -                     serialize(cleanClosure(func), connection = NULL),
    -                     packageNamesArr,
    -                     broadcastArr,
    -                     schema$jobj)
    -            dataFrame(sdf)
    +            dapplyInternal(x, func, schema)
    +          })
    +
    +#' dapplyCollect
    +#'
    +#' Apply a function to each partition of a SparkDataFrame and collect the result back
    +#’ to R as a data.frame.
    +#'
    +#' @param x A SparkDataFrame
    +#' @param func A function to be applied to each partition of the SparkDataFrame.
    +#'             func should have only one parameter, to which a data.frame corresponds
    +#'             to each partition will be passed.
    +#'             The output of func should be a data.frame.
    +#' @family SparkDataFrame functions
    +#' @rdname dapply
    --- End diff --
    
    should this be documented under dapply or dapplyCollect?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15202][SPARKR] add dapplyCollect() meth...

Posted by felixcheung <gi...@git.apache.org>.
Github user felixcheung commented on the pull request:

    https://github.com/apache/spark/pull/12989#issuecomment-218014582
  
    LGTM!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15202][SPARKR] add dapplyCollect() meth...

Posted by shivaram <gi...@git.apache.org>.
Github user shivaram commented on the pull request:

    https://github.com/apache/spark/pull/12989#issuecomment-218924180
  
    @sun-rui Taking a final pass now


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org