You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sh...@apache.org on 2015/10/27 04:58:22 UTC

spark git commit: [SPARK-11209][SPARKR] Add window functions into SparkR [step 1].

Repository: spark
Updated Branches:
  refs/heads/master 82464fb2e -> dc3220ce1


[SPARK-11209][SPARKR] Add window functions into SparkR [step 1].

Author: Sun Rui <ru...@intel.com>

Closes #9193 from sun-rui/SPARK-11209.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dc3220ce
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dc3220ce
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dc3220ce

Branch: refs/heads/master
Commit: dc3220ce11c7513b1452c82ee82cb86e908bcc2d
Parents: 82464fb
Author: Sun Rui <ru...@intel.com>
Authored: Mon Oct 26 20:58:18 2015 -0700
Committer: Shivaram Venkataraman <sh...@cs.berkeley.edu>
Committed: Mon Oct 26 20:58:18 2015 -0700

----------------------------------------------------------------------
 R/pkg/NAMESPACE                                 |  4 +
 R/pkg/R/functions.R                             | 98 ++++++++++++++++++++
 R/pkg/R/generics.R                              | 16 ++++
 R/pkg/inst/tests/test_sparkSQL.R                |  2 +
 .../apache/spark/api/r/RBackendHandler.scala    |  3 +-
 5 files changed, 122 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/dc3220ce/R/pkg/NAMESPACE
----------------------------------------------------------------------
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index 52f7a01..b73bed3 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -119,6 +119,7 @@ exportMethods("%in%",
               "count",
               "countDistinct",
               "crc32",
+              "cumeDist",
               "date_add",
               "date_format",
               "date_sub",
@@ -150,8 +151,10 @@ exportMethods("%in%",
               "isNaN",
               "isNotNull",
               "isNull",
+              "lag",
               "last",
               "last_day",
+              "lead",
               "least",
               "length",
               "levenshtein",
@@ -177,6 +180,7 @@ exportMethods("%in%",
               "nanvl",
               "negate",
               "next_day",
+              "ntile",
               "otherwise",
               "pmod",
               "quarter",

http://git-wip-us.apache.org/repos/asf/spark/blob/dc3220ce/R/pkg/R/functions.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R
index a72fb7b..366290f 100644
--- a/R/pkg/R/functions.R
+++ b/R/pkg/R/functions.R
@@ -2013,3 +2013,101 @@ setMethod("ifelse",
                                 "otherwise", no)
               column(jc)
           })
+
+###################### Window functions######################
+
+#' cumeDist
+#'
+#' Window function: returns the cumulative distribution of values within a window partition,
+#' i.e. the fraction of rows that are below the current row.
+#' 
+#'   N = total number of rows in the partition
+#'   cumeDist(x) = number of values before (and including) x / N
+#'   
+#' This is equivalent to the CUME_DIST function in SQL.
+#'
+#' @rdname cumeDist
+#' @name cumeDist
+#' @family window_funcs
+#' @export
+#' @examples \dontrun{cumeDist()}
+setMethod("cumeDist",
+          signature(x = "missing"),
+          function() {
+            jc <- callJStatic("org.apache.spark.sql.functions", "cumeDist")
+            column(jc)
+          })
+
+#' lag
+#'
+#' Window function: returns the value that is `offset` rows before the current row, and
+#' `defaultValue` if there is less than `offset` rows before the current row. For example,
+#' an `offset` of one will return the previous row at any given point in the window partition.
+#' 
+#' This is equivalent to the LAG function in SQL.
+#'
+#' @rdname lag
+#' @name lag
+#' @family window_funcs
+#' @export
+#' @examples \dontrun{lag(df$c)}
+setMethod("lag",
+          signature(x = "characterOrColumn", offset = "numeric", defaultValue = "ANY"),
+          function(x, offset, defaultValue = NULL) {
+            col <- if (class(x) == "Column") {
+              x@jc
+            } else {
+              x
+            }
+
+            jc <- callJStatic("org.apache.spark.sql.functions",
+                              "lag", col, as.integer(offset), defaultValue)
+            column(jc)
+          })
+
+#' lead
+#'
+#' Window function: returns the value that is `offset` rows after the current row, and
+#' `null` if there is less than `offset` rows after the current row. For example,
+#' an `offset` of one will return the next row at any given point in the window partition.
+#' 
+#' This is equivalent to the LEAD function in SQL.
+#'
+#' @rdname lead
+#' @name lead
+#' @family window_funcs
+#' @export
+#' @examples \dontrun{lead(df$c)}
+setMethod("lead",
+          signature(x = "characterOrColumn", offset = "numeric", defaultValue = "ANY"),
+          function(x, offset, defaultValue = NULL) {
+            col <- if (class(x) == "Column") {
+              x@jc
+            } else {
+              x
+            }
+
+            jc <- callJStatic("org.apache.spark.sql.functions",
+                              "lead", col, as.integer(offset), defaultValue)
+            column(jc)
+          })
+
+#' ntile
+#'
+#' Window function: returns the ntile group id (from 1 to `n` inclusive) in an ordered window
+#' partition. Fow example, if `n` is 4, the first quarter of the rows will get value 1, the second
+#' quarter will get 2, the third quarter will get 3, and the last quarter will get 4.
+#' 
+#' This is equivalent to the NTILE function in SQL.
+#'
+#' @rdname ntile
+#' @name ntile
+#' @family window_funcs
+#' @export
+#' @examples \dontrun{ntile(1)}
+setMethod("ntile",
+          signature(x = "numeric"),
+          function(x) {
+            jc <- callJStatic("org.apache.spark.sql.functions", "ntile", as.integer(x))
+            column(jc)
+          })

http://git-wip-us.apache.org/repos/asf/spark/blob/dc3220ce/R/pkg/R/generics.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index 4a419f7..c11c3c8 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -714,6 +714,10 @@ setGeneric("countDistinct", function(x, ...) { standardGeneric("countDistinct")
 #' @export
 setGeneric("crc32", function(x) { standardGeneric("crc32") })
 
+#' @rdname cumeDist
+#' @export
+setGeneric("cumeDist", function(x) { standardGeneric("cumeDist") })
+
 #' @rdname datediff
 #' @export
 setGeneric("datediff", function(y, x) { standardGeneric("datediff") })
@@ -790,6 +794,10 @@ setGeneric("instr", function(y, x) { standardGeneric("instr") })
 #' @export
 setGeneric("isNaN", function(x) { standardGeneric("isNaN") })
 
+#' @rdname lag
+#' @export
+setGeneric("lag", function(x, offset, defaultValue = NULL) { standardGeneric("lag") })
+
 #' @rdname last
 #' @export
 setGeneric("last", function(x) { standardGeneric("last") })
@@ -798,6 +806,10 @@ setGeneric("last", function(x) { standardGeneric("last") })
 #' @export
 setGeneric("last_day", function(x) { standardGeneric("last_day") })
 
+#' @rdname lead
+#' @export
+setGeneric("lead", function(x, offset, defaultValue = NULL) { standardGeneric("lead") })
+
 #' @rdname least
 #' @export
 setGeneric("least", function(x, ...) { standardGeneric("least") })
@@ -858,6 +870,10 @@ setGeneric("negate", function(x) { standardGeneric("negate") })
 #' @export
 setGeneric("next_day", function(y, x) { standardGeneric("next_day") })
 
+#' @rdname ntile
+#' @export
+setGeneric("ntile", function(x) { standardGeneric("ntile") })
+
 #' @rdname countDistinct
 #' @export
 setGeneric("n_distinct", function(x, ...) { standardGeneric("n_distinct") })

http://git-wip-us.apache.org/repos/asf/spark/blob/dc3220ce/R/pkg/inst/tests/test_sparkSQL.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/test_sparkSQL.R b/R/pkg/inst/tests/test_sparkSQL.R
index 540854d..e1d4499 100644
--- a/R/pkg/inst/tests/test_sparkSQL.R
+++ b/R/pkg/inst/tests/test_sparkSQL.R
@@ -829,6 +829,8 @@ test_that("column functions", {
   c9 <- signum(c) + sin(c) + sinh(c) + size(c) + soundex(c) + sqrt(c) + sum(c)
   c10 <- sumDistinct(c) + tan(c) + tanh(c) + toDegrees(c) + toRadians(c)
   c11 <- to_date(c) + trim(c) + unbase64(c) + unhex(c) + upper(c)
+  c12 <- lead("col", 1) + lead(c, 1) + lag("col", 1) + lag(c, 1)
+  c13 <- cumeDist() + ntile(1)
 
   df <- jsonFile(sqlContext, jsonPath)
   df2 <- select(df, between(df$age, c(20, 30)), between(df$age, c(10, 20)))

http://git-wip-us.apache.org/repos/asf/spark/blob/dc3220ce/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala b/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala
index 2a792d8..0095548 100644
--- a/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala
+++ b/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala
@@ -224,7 +224,8 @@ private[r] class RBackendHandler(server: RBackend)
                 case _ => parameterType
               }
             }
-            if (!parameterWrapperType.isInstance(args(i))) {
+            if ((parameterType.isPrimitive || args(i) != null) &&
+                !parameterWrapperType.isInstance(args(i))) {
               argMatched = false
             }
           }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org