You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by fe...@apache.org on 2018/01/04 05:43:20 UTC

spark git commit: [SPARK-22933][SPARKR] R Structured Streaming API for withWatermark, trigger, partitionBy

Repository: spark
Updated Branches:
  refs/heads/master 7d045c5f0 -> df95a908b


[SPARK-22933][SPARKR] R Structured Streaming API for withWatermark, trigger, partitionBy

## What changes were proposed in this pull request?

R Structured Streaming API for withWatermark, trigger, partitionBy

## How was this patch tested?

manual, unit tests

Author: Felix Cheung <fe...@hotmail.com>

Closes #20129 from felixcheung/rwater.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/df95a908
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/df95a908
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/df95a908

Branch: refs/heads/master
Commit: df95a908baf78800556636a76d58bba9b3dd943f
Parents: 7d045c5
Author: Felix Cheung <fe...@hotmail.com>
Authored: Wed Jan 3 21:43:14 2018 -0800
Committer: Felix Cheung <fe...@apache.org>
Committed: Wed Jan 3 21:43:14 2018 -0800

----------------------------------------------------------------------
 R/pkg/NAMESPACE                                 |   1 +
 R/pkg/R/DataFrame.R                             |  96 ++++++++++++++++-
 R/pkg/R/SQLContext.R                            |   4 +-
 R/pkg/R/generics.R                              |   6 ++
 R/pkg/tests/fulltests/test_streaming.R          | 107 +++++++++++++++++++
 python/pyspark/sql/streaming.py                 |   4 +
 .../sql/execution/streaming/Triggers.scala      |   2 +-
 7 files changed, 214 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/df95a908/R/pkg/NAMESPACE
----------------------------------------------------------------------
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index 3219c6f..c51eb0f 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -179,6 +179,7 @@ exportMethods("arrange",
               "with",
               "withColumn",
               "withColumnRenamed",
+              "withWatermark",
               "write.df",
               "write.jdbc",
               "write.json",

http://git-wip-us.apache.org/repos/asf/spark/blob/df95a908/R/pkg/R/DataFrame.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index fe238f6..9956f7e 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -3661,7 +3661,8 @@ setMethod("getNumPartitions",
 #' isStreaming
 #'
 #' Returns TRUE if this SparkDataFrame contains one or more sources that continuously return data
-#' as it arrives.
+#' as it arrives. A dataset that reads data from a streaming source must be executed as a
+#' \code{StreamingQuery} using \code{write.stream}.
 #'
 #' @param x A SparkDataFrame
 #' @return TRUE if this SparkDataFrame is from a streaming source
@@ -3707,7 +3708,17 @@ setMethod("isStreaming",
 #' @param df a streaming SparkDataFrame.
 #' @param source a name for external data source.
 #' @param outputMode one of 'append', 'complete', 'update'.
-#' @param ... additional argument(s) passed to the method.
+#' @param partitionBy a name or a list of names of columns to partition the output by on the file
+#'        system. If specified, the output is laid out on the file system similar to Hive's
+#'        partitioning scheme.
+#' @param trigger.processingTime a processing time interval as a string, e.g. '5 seconds',
+#'        '1 minute'. This is a trigger that runs a query periodically based on the processing
+#'        time. If value is '0 seconds', the query will run as fast as possible, this is the
+#'        default. Only one trigger can be set.
+#' @param trigger.once a logical, must be set to \code{TRUE}. This is a trigger that processes only
+#'        one batch of data in a streaming query then terminates the query. Only one trigger can be
+#'        set.
+#' @param ... additional external data source specific named options.
 #'
 #' @family SparkDataFrame functions
 #' @seealso \link{read.stream}
@@ -3725,7 +3736,8 @@ setMethod("isStreaming",
 #' # console
 #' q <- write.stream(wordCounts, "console", outputMode = "complete")
 #' # text stream
-#' q <- write.stream(df, "text", path = "/home/user/out", checkpointLocation = "/home/user/cp")
+#' q <- write.stream(df, "text", path = "/home/user/out", checkpointLocation = "/home/user/cp"
+#'                   partitionBy = c("year", "month"), trigger.processingTime = "30 seconds")
 #' # memory stream
 #' q <- write.stream(wordCounts, "memory", queryName = "outs", outputMode = "complete")
 #' head(sql("SELECT * from outs"))
@@ -3737,7 +3749,8 @@ setMethod("isStreaming",
 #' @note experimental
 setMethod("write.stream",
           signature(df = "SparkDataFrame"),
-          function(df, source = NULL, outputMode = NULL, ...) {
+          function(df, source = NULL, outputMode = NULL, partitionBy = NULL,
+                   trigger.processingTime = NULL, trigger.once = NULL, ...) {
             if (!is.null(source) && !is.character(source)) {
               stop("source should be character, NULL or omitted. It is the data source specified ",
                    "in 'spark.sql.sources.default' configuration by default.")
@@ -3748,12 +3761,43 @@ setMethod("write.stream",
             if (is.null(source)) {
               source <- getDefaultSqlSource()
             }
+            cols <- NULL
+            if (!is.null(partitionBy)) {
+              if (!all(sapply(partitionBy, function(c) { is.character(c) }))) {
+                stop("All partitionBy column names should be characters.")
+              }
+              cols <- as.list(partitionBy)
+            }
+            jtrigger <- NULL
+            if (!is.null(trigger.processingTime) && !is.na(trigger.processingTime)) {
+              if (!is.null(trigger.once)) {
+                stop("Multiple triggers not allowed.")
+              }
+              interval <- as.character(trigger.processingTime)
+              if (nchar(interval) == 0) {
+                stop("Value for trigger.processingTime must be a non-empty string.")
+              }
+              jtrigger <- handledCallJStatic("org.apache.spark.sql.streaming.Trigger",
+                                             "ProcessingTime",
+                                             interval)
+            } else if (!is.null(trigger.once) && !is.na(trigger.once)) {
+              if (!is.logical(trigger.once) || !trigger.once) {
+                stop("Value for trigger.once must be TRUE.")
+              }
+              jtrigger <- callJStatic("org.apache.spark.sql.streaming.Trigger", "Once")
+            }
             options <- varargsToStrEnv(...)
             write <- handledCallJMethod(df@sdf, "writeStream")
             write <- callJMethod(write, "format", source)
             if (!is.null(outputMode)) {
               write <- callJMethod(write, "outputMode", outputMode)
             }
+            if (!is.null(cols)) {
+              write <- callJMethod(write, "partitionBy", cols)
+            }
+            if (!is.null(jtrigger)) {
+              write <- callJMethod(write, "trigger", jtrigger)
+            }
             write <- callJMethod(write, "options", options)
             ssq <- handledCallJMethod(write, "start")
             streamingQuery(ssq)
@@ -3967,3 +4011,47 @@ setMethod("broadcast",
             sdf <- callJStatic("org.apache.spark.sql.functions", "broadcast", x@sdf)
             dataFrame(sdf)
           })
+
+#' withWatermark
+#'
+#' Defines an event time watermark for this streaming SparkDataFrame. A watermark tracks a point in
+#' time before which we assume no more late data is going to arrive.
+#'
+#' Spark will use this watermark for several purposes:
+#' \itemize{
+#'  \item{-} To know when a given time window aggregation can be finalized and thus can be emitted
+#' when using output modes that do not allow updates.
+#'  \item{-} To minimize the amount of state that we need to keep for on-going aggregations.
+#' }
+#' The current watermark is computed by looking at the \code{MAX(eventTime)} seen across
+#' all of the partitions in the query minus a user specified \code{delayThreshold}. Due to the cost
+#' of coordinating this value across partitions, the actual watermark used is only guaranteed
+#' to be at least \code{delayThreshold} behind the actual event time.  In some cases we may still
+#' process records that arrive more than \code{delayThreshold} late.
+#'
+#' @param x a streaming SparkDataFrame
+#' @param eventTime a string specifying the name of the Column that contains the event time of the
+#'                  row.
+#' @param delayThreshold a string specifying the minimum delay to wait to data to arrive late,
+#'                       relative to the latest record that has been processed in the form of an
+#'                       interval (e.g. "1 minute" or "5 hours"). NOTE: This should not be negative.
+#' @return a SparkDataFrame.
+#' @aliases withWatermark,SparkDataFrame,character,character-method
+#' @family SparkDataFrame functions
+#' @rdname withWatermark
+#' @name withWatermark
+#' @export
+#' @examples
+#' \dontrun{
+#' sparkR.session()
+#' schema <- structType(structField("time", "timestamp"), structField("value", "double"))
+#' df <- read.stream("json", path = jsonDir, schema = schema, maxFilesPerTrigger = 1)
+#' df <- withWatermark(df, "time", "10 minutes")
+#' }
+#' @note withWatermark since 2.3.0
+setMethod("withWatermark",
+          signature(x = "SparkDataFrame", eventTime = "character", delayThreshold = "character"),
+          function(x, eventTime, delayThreshold) {
+            sdf <- callJMethod(x@sdf, "withWatermark", eventTime, delayThreshold)
+            dataFrame(sdf)
+          })

http://git-wip-us.apache.org/repos/asf/spark/blob/df95a908/R/pkg/R/SQLContext.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R
index 3b7f71b..9d0a2d5 100644
--- a/R/pkg/R/SQLContext.R
+++ b/R/pkg/R/SQLContext.R
@@ -727,7 +727,9 @@ read.jdbc <- function(url, tableName,
 #' @param schema The data schema defined in structType or a DDL-formatted string, this is
 #'               required for file-based streaming data source
 #' @param ... additional external data source specific named options, for instance \code{path} for
-#'        file-based streaming data source
+#'        file-based streaming data source. \code{timeZone} to indicate a timezone to be used to 
+#'        parse timestamps in the JSON/CSV data sources or partition values; If it isn't set, it
+#'        uses the default value, session local timezone.
 #' @return SparkDataFrame
 #' @rdname read.stream
 #' @name read.stream

http://git-wip-us.apache.org/repos/asf/spark/blob/df95a908/R/pkg/R/generics.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index 5369c32..e0dde33 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -799,6 +799,12 @@ setGeneric("withColumn", function(x, colName, col) { standardGeneric("withColumn
 setGeneric("withColumnRenamed",
            function(x, existingCol, newCol) { standardGeneric("withColumnRenamed") })
 
+#' @rdname withWatermark
+#' @export
+setGeneric("withWatermark", function(x, eventTime, delayThreshold) {
+  standardGeneric("withWatermark")
+})
+
 #' @rdname write.df
 #' @export
 setGeneric("write.df", function(df, path = NULL, ...) { standardGeneric("write.df") })

http://git-wip-us.apache.org/repos/asf/spark/blob/df95a908/R/pkg/tests/fulltests/test_streaming.R
----------------------------------------------------------------------
diff --git a/R/pkg/tests/fulltests/test_streaming.R b/R/pkg/tests/fulltests/test_streaming.R
index 54f40bb..a354d50 100644
--- a/R/pkg/tests/fulltests/test_streaming.R
+++ b/R/pkg/tests/fulltests/test_streaming.R
@@ -172,6 +172,113 @@ test_that("Terminated by error", {
   stopQuery(q)
 })
 
+test_that("PartitionBy", {
+  parquetPath <- tempfile(pattern = "sparkr-test", fileext = ".parquet")
+  checkpointPath <- tempfile(pattern = "sparkr-test", fileext = ".checkpoint")
+  textPath <- tempfile(pattern = "sparkr-test", fileext = ".text")
+  df <- read.df(jsonPath, "json", stringSchema)
+  write.df(df, parquetPath, "parquet", "overwrite")
+
+  df <- read.stream(path = parquetPath, schema = stringSchema)
+
+  expect_error(write.stream(df, "json", path = textPath, checkpointLocation = "append",
+                            partitionBy = c(1, 2)),
+                            "All partitionBy column names should be characters")
+
+  q <- write.stream(df, "json", path = textPath, checkpointLocation = "append",
+                    partitionBy = "name")
+  awaitTermination(q, 5 * 1000)
+  callJMethod(q@ssq, "processAllAvailable")
+
+  dirs <- list.files(textPath)
+  expect_equal(length(dirs[substring(dirs, 1, nchar("name=")) == "name="]), 3)
+
+  unlink(checkpointPath)
+  unlink(textPath)
+  unlink(parquetPath)
+})
+
+test_that("Watermark", {
+  parquetPath <- tempfile(pattern = "sparkr-test", fileext = ".parquet")
+  schema <- structType(structField("value", "string"))
+  t <- Sys.time()
+  df <- as.DataFrame(lapply(list(t), as.character), schema)
+  write.df(df, parquetPath, "parquet", "append")
+  df <- read.stream(path = parquetPath, schema = "value STRING")
+  df <- withColumn(df, "eventTime", cast(df$value, "timestamp"))
+  df <- withWatermark(df, "eventTime", "10 seconds")
+  counts <- count(group_by(df, "eventTime"))
+  q <- write.stream(counts, "memory", queryName = "times", outputMode = "append")
+
+  # first events
+  df <- as.DataFrame(lapply(list(t + 1, t, t + 2), as.character), schema)
+  write.df(df, parquetPath, "parquet", "append")
+  awaitTermination(q, 5 * 1000)
+  callJMethod(q@ssq, "processAllAvailable")
+
+  # advance watermark to 15
+  df <- as.DataFrame(lapply(list(t + 25), as.character), schema)
+  write.df(df, parquetPath, "parquet", "append")
+  awaitTermination(q, 5 * 1000)
+  callJMethod(q@ssq, "processAllAvailable")
+
+  # old events, should be dropped
+  df <- as.DataFrame(lapply(list(t), as.character), schema)
+  write.df(df, parquetPath, "parquet", "append")
+  awaitTermination(q, 5 * 1000)
+  callJMethod(q@ssq, "processAllAvailable")
+
+  # evict events less than previous watermark
+  df <- as.DataFrame(lapply(list(t + 25), as.character), schema)
+  write.df(df, parquetPath, "parquet", "append")
+  awaitTermination(q, 5 * 1000)
+  callJMethod(q@ssq, "processAllAvailable")
+
+  times <- collect(sql("SELECT * FROM times"))
+  # looks like write timing can affect the first bucket; but it should be t
+  expect_equal(times[order(times$eventTime),][1, 2], 2)
+
+  stopQuery(q)
+  unlink(parquetPath)
+})
+
+test_that("Trigger", {
+  parquetPath <- tempfile(pattern = "sparkr-test", fileext = ".parquet")
+  schema <- structType(structField("value", "string"))
+  df <- as.DataFrame(lapply(list(Sys.time()), as.character), schema)
+  write.df(df, parquetPath, "parquet", "append")
+  df <- read.stream(path = parquetPath, schema = "value STRING")
+
+  expect_error(write.stream(df, "memory", queryName = "times", outputMode = "append",
+               trigger.processingTime = "", trigger.once = ""), "Multiple triggers not allowed.")
+
+  expect_error(write.stream(df, "memory", queryName = "times", outputMode = "append",
+               trigger.processingTime = ""),
+               "Value for trigger.processingTime must be a non-empty string.")
+
+  expect_error(write.stream(df, "memory", queryName = "times", outputMode = "append",
+               trigger.processingTime = "invalid"), "illegal argument")
+
+  expect_error(write.stream(df, "memory", queryName = "times", outputMode = "append",
+               trigger.once = ""), "Value for trigger.once must be TRUE.")
+
+  expect_error(write.stream(df, "memory", queryName = "times", outputMode = "append",
+               trigger.once = FALSE), "Value for trigger.once must be TRUE.")
+
+  q <- write.stream(df, "memory", queryName = "times", outputMode = "append", trigger.once = TRUE)
+  awaitTermination(q, 5 * 1000)
+  callJMethod(q@ssq, "processAllAvailable")
+  df <- as.DataFrame(lapply(list(Sys.time()), as.character), schema)
+  write.df(df, parquetPath, "parquet", "append")
+  awaitTermination(q, 5 * 1000)
+  callJMethod(q@ssq, "processAllAvailable")
+
+  expect_equal(nrow(collect(sql("SELECT * FROM times"))), 1)
+
+  stopQuery(q)
+  unlink(parquetPath)
+})
+
 unlink(jsonPath)
 unlink(jsonPathNa)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/df95a908/python/pyspark/sql/streaming.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index fb228f9..24ae377 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -793,6 +793,10 @@ class DataStreamWriter(object):
         .. note:: Evolving.
 
         :param processingTime: a processing time interval as a string, e.g. '5 seconds', '1 minute'.
+                               Set a trigger that runs a query periodically based on the processing
+                               time. Only one trigger can be set.
+        :param once: if set to True, set a trigger that processes only one batch of data in a
+                     streaming query then terminates the query. Only one trigger can be set.
 
         >>> # trigger the query for execution every 5 seconds
         >>> writer = sdf.writeStream.trigger(processingTime='5 seconds')

http://git-wip-us.apache.org/repos/asf/spark/blob/df95a908/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala
index 271bc4d..19e3e55 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala
@@ -21,7 +21,7 @@ import org.apache.spark.annotation.{Experimental, InterfaceStability}
 import org.apache.spark.sql.streaming.Trigger
 
 /**
- * A [[Trigger]] that process only one batch of data in a streaming query then terminates
+ * A [[Trigger]] that processes only one batch of data in a streaming query then terminates
  * the query.
  */
 @Experimental


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org