You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sh...@apache.org on 2015/10/27 04:58:22 UTC
spark git commit: [SPARK-11209][SPARKR] Add window functions into
SparkR [step 1].
Repository: spark
Updated Branches:
refs/heads/master 82464fb2e -> dc3220ce1
[SPARK-11209][SPARKR] Add window functions into SparkR [step 1].
Author: Sun Rui <ru...@intel.com>
Closes #9193 from sun-rui/SPARK-11209.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dc3220ce
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dc3220ce
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dc3220ce
Branch: refs/heads/master
Commit: dc3220ce11c7513b1452c82ee82cb86e908bcc2d
Parents: 82464fb
Author: Sun Rui <ru...@intel.com>
Authored: Mon Oct 26 20:58:18 2015 -0700
Committer: Shivaram Venkataraman <sh...@cs.berkeley.edu>
Committed: Mon Oct 26 20:58:18 2015 -0700
----------------------------------------------------------------------
R/pkg/NAMESPACE | 4 +
R/pkg/R/functions.R | 98 ++++++++++++++++++++
R/pkg/R/generics.R | 16 ++++
R/pkg/inst/tests/test_sparkSQL.R | 2 +
.../apache/spark/api/r/RBackendHandler.scala | 3 +-
5 files changed, 122 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/dc3220ce/R/pkg/NAMESPACE
----------------------------------------------------------------------
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index 52f7a01..b73bed3 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -119,6 +119,7 @@ exportMethods("%in%",
"count",
"countDistinct",
"crc32",
+ "cumeDist",
"date_add",
"date_format",
"date_sub",
@@ -150,8 +151,10 @@ exportMethods("%in%",
"isNaN",
"isNotNull",
"isNull",
+ "lag",
"last",
"last_day",
+ "lead",
"least",
"length",
"levenshtein",
@@ -177,6 +180,7 @@ exportMethods("%in%",
"nanvl",
"negate",
"next_day",
+ "ntile",
"otherwise",
"pmod",
"quarter",
http://git-wip-us.apache.org/repos/asf/spark/blob/dc3220ce/R/pkg/R/functions.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R
index a72fb7b..366290f 100644
--- a/R/pkg/R/functions.R
+++ b/R/pkg/R/functions.R
@@ -2013,3 +2013,101 @@ setMethod("ifelse",
"otherwise", no)
column(jc)
})
+
+###################### Window functions######################
+
+#' cumeDist
+#'
+#' Window function: returns the cumulative distribution of values within a window partition,
+#' i.e. the fraction of rows that are below the current row.
+#'
+#' N = total number of rows in the partition
+#' cumeDist(x) = number of values before (and including) x / N
+#'
+#' This is equivalent to the CUME_DIST function in SQL.
+#'
+#' @rdname cumeDist
+#' @name cumeDist
+#' @family window_funcs
+#' @export
+#' @examples \dontrun{cumeDist()}
+setMethod("cumeDist",
+ signature(x = "missing"),
+ function() {
+ jc <- callJStatic("org.apache.spark.sql.functions", "cumeDist")
+ column(jc)
+ })
+
+#' lag
+#'
+#' Window function: returns the value that is `offset` rows before the current row, and
+#' `defaultValue` if there is less than `offset` rows before the current row. For example,
+#' an `offset` of one will return the previous row at any given point in the window partition.
+#'
+#' This is equivalent to the LAG function in SQL.
+#'
+#' @rdname lag
+#' @name lag
+#' @family window_funcs
+#' @export
+#' @examples \dontrun{lag(df$c)}
+setMethod("lag",
+ signature(x = "characterOrColumn", offset = "numeric", defaultValue = "ANY"),
+ function(x, offset, defaultValue = NULL) {
+ col <- if (class(x) == "Column") {
+ x@jc
+ } else {
+ x
+ }
+
+ jc <- callJStatic("org.apache.spark.sql.functions",
+ "lag", col, as.integer(offset), defaultValue)
+ column(jc)
+ })
+
+#' lead
+#'
+#' Window function: returns the value that is `offset` rows after the current row, and
+#' `null` if there is less than `offset` rows after the current row. For example,
+#' an `offset` of one will return the next row at any given point in the window partition.
+#'
+#' This is equivalent to the LEAD function in SQL.
+#'
+#' @rdname lead
+#' @name lead
+#' @family window_funcs
+#' @export
+#' @examples \dontrun{lead(df$c)}
+setMethod("lead",
+ signature(x = "characterOrColumn", offset = "numeric", defaultValue = "ANY"),
+ function(x, offset, defaultValue = NULL) {
+ col <- if (class(x) == "Column") {
+ x@jc
+ } else {
+ x
+ }
+
+ jc <- callJStatic("org.apache.spark.sql.functions",
+ "lead", col, as.integer(offset), defaultValue)
+ column(jc)
+ })
+
+#' ntile
+#'
+#' Window function: returns the ntile group id (from 1 to `n` inclusive) in an ordered window
+#' partition. Fow example, if `n` is 4, the first quarter of the rows will get value 1, the second
+#' quarter will get 2, the third quarter will get 3, and the last quarter will get 4.
+#'
+#' This is equivalent to the NTILE function in SQL.
+#'
+#' @rdname ntile
+#' @name ntile
+#' @family window_funcs
+#' @export
+#' @examples \dontrun{ntile(1)}
+setMethod("ntile",
+ signature(x = "numeric"),
+ function(x) {
+ jc <- callJStatic("org.apache.spark.sql.functions", "ntile", as.integer(x))
+ column(jc)
+ })
http://git-wip-us.apache.org/repos/asf/spark/blob/dc3220ce/R/pkg/R/generics.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index 4a419f7..c11c3c8 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -714,6 +714,10 @@ setGeneric("countDistinct", function(x, ...) { standardGeneric("countDistinct")
#' @export
setGeneric("crc32", function(x) { standardGeneric("crc32") })
+#' @rdname cumeDist
+#' @export
+setGeneric("cumeDist", function(x) { standardGeneric("cumeDist") })
+
#' @rdname datediff
#' @export
setGeneric("datediff", function(y, x) { standardGeneric("datediff") })
@@ -790,6 +794,10 @@ setGeneric("instr", function(y, x) { standardGeneric("instr") })
#' @export
setGeneric("isNaN", function(x) { standardGeneric("isNaN") })
+#' @rdname lag
+#' @export
+setGeneric("lag", function(x, offset, defaultValue = NULL) { standardGeneric("lag") })
+
#' @rdname last
#' @export
setGeneric("last", function(x) { standardGeneric("last") })
@@ -798,6 +806,10 @@ setGeneric("last", function(x) { standardGeneric("last") })
#' @export
setGeneric("last_day", function(x) { standardGeneric("last_day") })
+#' @rdname lead
+#' @export
+setGeneric("lead", function(x, offset, defaultValue = NULL) { standardGeneric("lead") })
+
#' @rdname least
#' @export
setGeneric("least", function(x, ...) { standardGeneric("least") })
@@ -858,6 +870,10 @@ setGeneric("negate", function(x) { standardGeneric("negate") })
#' @export
setGeneric("next_day", function(y, x) { standardGeneric("next_day") })
+#' @rdname ntile
+#' @export
+setGeneric("ntile", function(x) { standardGeneric("ntile") })
+
#' @rdname countDistinct
#' @export
setGeneric("n_distinct", function(x, ...) { standardGeneric("n_distinct") })
http://git-wip-us.apache.org/repos/asf/spark/blob/dc3220ce/R/pkg/inst/tests/test_sparkSQL.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/test_sparkSQL.R b/R/pkg/inst/tests/test_sparkSQL.R
index 540854d..e1d4499 100644
--- a/R/pkg/inst/tests/test_sparkSQL.R
+++ b/R/pkg/inst/tests/test_sparkSQL.R
@@ -829,6 +829,8 @@ test_that("column functions", {
c9 <- signum(c) + sin(c) + sinh(c) + size(c) + soundex(c) + sqrt(c) + sum(c)
c10 <- sumDistinct(c) + tan(c) + tanh(c) + toDegrees(c) + toRadians(c)
c11 <- to_date(c) + trim(c) + unbase64(c) + unhex(c) + upper(c)
+ c12 <- lead("col", 1) + lead(c, 1) + lag("col", 1) + lag(c, 1)
+ c13 <- cumeDist() + ntile(1)
df <- jsonFile(sqlContext, jsonPath)
df2 <- select(df, between(df$age, c(20, 30)), between(df$age, c(10, 20)))
http://git-wip-us.apache.org/repos/asf/spark/blob/dc3220ce/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala b/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala
index 2a792d8..0095548 100644
--- a/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala
+++ b/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala
@@ -224,7 +224,8 @@ private[r] class RBackendHandler(server: RBackend)
case _ => parameterType
}
}
- if (!parameterWrapperType.isInstance(args(i))) {
+ if ((parameterType.isPrimitive || args(i) != null) &&
+ !parameterWrapperType.isInstance(args(i))) {
argMatched = false
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org