You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by da...@apache.org on 2016/04/06 02:21:47 UTC

spark git commit: [SPARK-14353] Dataset Time Window `window` API for R

Repository: spark
Updated Branches:
  refs/heads/master 48682f6bf -> 1146c534d


[SPARK-14353] Dataset Time Window `window` API for R

## What changes were proposed in this pull request?

The `window` function was added to Dataset with [this PR](https://github.com/apache/spark/pull/12008).
This PR adds the R API for this function.

With this PR, SQL, Java, and Scala will share the same APIs as in users can use:
 - `window(timeColumn, windowDuration)`
 - `window(timeColumn, windowDuration, slideDuration)`
 - `window(timeColumn, windowDuration, slideDuration, startTime)`

In Python and R, users can access all APIs above, but in addition they can do
 - In R:
   `window(timeColumn, windowDuration, startTime=...)`

that is, they can provide the startTime without providing the `slideDuration`. In this case, we will generate tumbling windows.

## How was this patch tested?

Unit tests + manual tests

Author: Burak Yavuz <br...@gmail.com>

Closes #12141 from brkyvz/R-windows.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1146c534
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1146c534
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1146c534

Branch: refs/heads/master
Commit: 1146c534d6c3806f3e920043ba06838ef02cd7e8
Parents: 48682f6
Author: Burak Yavuz <br...@gmail.com>
Authored: Tue Apr 5 17:21:41 2016 -0700
Committer: Davies Liu <da...@gmail.com>
Committed: Tue Apr 5 17:21:41 2016 -0700

----------------------------------------------------------------------
 R/pkg/NAMESPACE                           |  1 +
 R/pkg/R/functions.R                       | 63 ++++++++++++++++++++++++++
 R/pkg/R/generics.R                        |  4 ++
 R/pkg/inst/tests/testthat/test_context.R  |  2 +-
 R/pkg/inst/tests/testthat/test_sparkSQL.R | 36 +++++++++++++++
 5 files changed, 105 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1146c534/R/pkg/NAMESPACE
----------------------------------------------------------------------
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index fa3fb0b..f48c61c 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -265,6 +265,7 @@ exportMethods("%in%",
               "var_samp",
               "weekofyear",
               "when",
+              "window",
               "year")
 
 exportClasses("GroupedData")

http://git-wip-us.apache.org/repos/asf/spark/blob/1146c534/R/pkg/R/functions.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R
index d9c10b4..db877b2 100644
--- a/R/pkg/R/functions.R
+++ b/R/pkg/R/functions.R
@@ -2131,6 +2131,69 @@ setMethod("from_unixtime", signature(x = "Column"),
             column(jc)
           })
 
+#' window
+#'
+#' Bucketize rows into one or more time windows given a timestamp specifying column. Window
+#' starts are inclusive but the window ends are exclusive, e.g. 12:05 will be in the window
+#' [12:05,12:10) but not in [12:00,12:05). Windows can support microsecond precision. Windows in
+#' the order of months are not supported.
+#'
+#' The time column must be of TimestampType.
+#'
+#' Durations are provided as strings, e.g. '1 second', '1 day 12 hours', '2 minutes'. Valid
+#' interval strings are 'week', 'day', 'hour', 'minute', 'second', 'millisecond', 'microsecond'.
+#' If the `slideDuration` is not provided, the windows will be tumbling windows.
+#'
+#' The startTime is the offset with respect to 1970-01-01 00:00:00 UTC with which to start
+#' window intervals. For example, in order to have hourly tumbling windows that start 15 minutes
+#' past the hour, e.g. 12:15-13:15, 13:15-14:15... provide `startTime` as `15 minutes`.
+#'
+#' The output column will be a struct called 'window' by default with the nested columns 'start'
+#' and 'end'.
+#'
+#' @family datetime_funcs
+#' @rdname window
+#' @name window
+#' @export
+#' @examples
+#'\dontrun{
+#'   # One minute windows every 15 seconds 10 seconds after the minute, e.g. 09:00:10-09:01:10,
+#'   # 09:00:25-09:01:25, 09:00:40-09:01:40, ...
+#'   window(df$time, "1 minute", "15 seconds", "10 seconds")
+#'
+#'   # One minute tumbling windows 15 seconds after the minute, e.g. 09:00:15-09:01:15,
+#'    # 09:01:15-09:02:15...
+#'   window(df$time, "1 minute", startTime = "15 seconds")
+#'
+#'   # Thirty second windows every 10 seconds, e.g. 09:00:00-09:00:30, 09:00:10-09:00:40, ...
+#'   window(df$time, "30 seconds", "10 seconds")
+#'}
+setMethod("window", signature(x = "Column"),
+          function(x, windowDuration, slideDuration = NULL, startTime = NULL) {
+            stopifnot(is.character(windowDuration))
+            if (!is.null(slideDuration) && !is.null(startTime)) {
+              stopifnot(is.character(slideDuration) && is.character(startTime))
+              jc <- callJStatic("org.apache.spark.sql.functions",
+                                "window",
+                                x@jc, windowDuration, slideDuration, startTime)
+            } else if (!is.null(slideDuration)) {
+              stopifnot(is.character(slideDuration))
+              jc <- callJStatic("org.apache.spark.sql.functions",
+                                "window",
+                                x@jc, windowDuration, slideDuration)
+            } else if (!is.null(startTime)) {
+              stopifnot(is.character(startTime))
+              jc <- callJStatic("org.apache.spark.sql.functions",
+                                "window",
+                                x@jc, windowDuration, windowDuration, startTime)
+            } else {
+              jc <- callJStatic("org.apache.spark.sql.functions",
+                                "window",
+                                x@jc, windowDuration)
+            }
+            column(jc)
+          })
+
 #' locate
 #'
 #' Locate the position of the first occurrence of substr.

http://git-wip-us.apache.org/repos/asf/spark/blob/1146c534/R/pkg/R/generics.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index c6990f4..ecdeea5 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -1152,6 +1152,10 @@ setGeneric("var_samp", function(x) { standardGeneric("var_samp") })
 #' @export
 setGeneric("weekofyear", function(x) { standardGeneric("weekofyear") })
 
+#' @rdname window
+#' @export
+setGeneric("window", function(x, ...) { standardGeneric("window") })
+
 #' @rdname year
 #' @export
 setGeneric("year", function(x) { standardGeneric("year") })

http://git-wip-us.apache.org/repos/asf/spark/blob/1146c534/R/pkg/inst/tests/testthat/test_context.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/testthat/test_context.R b/R/pkg/inst/tests/testthat/test_context.R
index ad3f972..6e06c97 100644
--- a/R/pkg/inst/tests/testthat/test_context.R
+++ b/R/pkg/inst/tests/testthat/test_context.R
@@ -26,7 +26,7 @@ test_that("Check masked functions", {
   maskedBySparkR <- masked[funcSparkROrEmpty]
   namesOfMasked <- c("describe", "cov", "filter", "lag", "na.omit", "predict", "sd", "var",
                      "colnames", "colnames<-", "intersect", "rank", "rbind", "sample", "subset",
-                     "summary", "transform", "drop")
+                     "summary", "transform", "drop", "window")
   expect_equal(length(maskedBySparkR), length(namesOfMasked))
   expect_equal(sort(maskedBySparkR), sort(namesOfMasked))
   # above are those reported as masked when `library(SparkR)`

http://git-wip-us.apache.org/repos/asf/spark/blob/1146c534/R/pkg/inst/tests/testthat/test_sparkSQL.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index eef365b..22eb3ec 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -1204,6 +1204,42 @@ test_that("greatest() and least() on a DataFrame", {
   expect_equal(collect(select(df, least(df$a, df$b)))[, 1], c(1, 3))
 })
 
+test_that("time windowing (window()) with all inputs", {
+  df <- createDataFrame(sqlContext, data.frame(t = c("2016-03-11 09:00:07"), v = c(1)))
+  df$window <- window(df$t, "5 seconds", "5 seconds", "0 seconds")
+  local <- collect(df)$v
+  # Not checking time windows because of possible time zone issues. Just checking that the function
+  # works
+  expect_equal(local, c(1))
+})
+
+test_that("time windowing (window()) with slide duration", {
+  df <- createDataFrame(sqlContext, data.frame(t = c("2016-03-11 09:00:07"), v = c(1)))
+  df$window <- window(df$t, "5 seconds", "2 seconds")
+  local <- collect(df)$v
+  # Not checking time windows because of possible time zone issues. Just checking that the function
+  # works
+  expect_equal(local, c(1, 1))
+})
+
+test_that("time windowing (window()) with start time", {
+  df <- createDataFrame(sqlContext, data.frame(t = c("2016-03-11 09:00:07"), v = c(1)))
+  df$window <- window(df$t, "5 seconds", startTime = "2 seconds")
+  local <- collect(df)$v
+  # Not checking time windows because of possible time zone issues. Just checking that the function
+  # works
+  expect_equal(local, c(1))
+})
+
+test_that("time windowing (window()) with just window duration", {
+  df <- createDataFrame(sqlContext, data.frame(t = c("2016-03-11 09:00:07"), v = c(1)))
+  df$window <- window(df$t, "5 seconds")
+  local <- collect(df)$v
+  # Not checking time windows because of possible time zone issues. Just checking that the function
+  # works
+  expect_equal(local, c(1))
+})
+
 test_that("when(), otherwise() and ifelse() on a DataFrame", {
   l <- list(list(a = 1, b = 2), list(a = 3, b = 4))
   df <- createDataFrame(sqlContext, l)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org