You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2019/10/16 03:12:21 UTC

[GitHub] [spark] viirya commented on a change in pull request #26127: [SPARK-29348][SQL] Add observable Metrics for Streaming queries

viirya commented on a change in pull request #26127: [SPARK-29348][SQL] Add observable Metrics for Streaming queries
URL: https://github.com/apache/spark/pull/26127#discussion_r335261651
 
 

 ##########
 File path: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
 ##########
 @@ -1847,6 +1847,53 @@ class Dataset[T] private[sql](
   @scala.annotation.varargs
   def agg(expr: Column, exprs: Column*): DataFrame = groupBy().agg(expr, exprs : _*)
 
+ /**
+  * Define (named) metrics to observe on the Dataset. This method returns an 'observed' Dataset
+  * that returns the same result as the input, with the following guarantees:
+  * - It will compute the defined aggregates (metrics) on all the data that is flowing through the
+  *   Dataset at that point.
+  * - It will report the value of the defined aggregate columns as soon as we reach a completion
+  *   point. A completion point is either the end of a query (batch mode) or the end of a streaming
+  *   epoch. The value of the aggregates only reflects the data processed since the previous
+  *   completion point.
+  *
+  * The metrics columns must either contain a literal (e.g. lit(42)), or should contain one or
+  * more aggregate functions (e.g. sum(a) or sum(a + b) + avg(c) - lit(1)). Expressions that
+  * contain references to the input Dataset's columns must always be wrapped in an aggregate
+  * function.
+  *
+  * A user can observe these metrics by either adding
+  * [[org.apache.spark.sql.streaming.StreamingQueryListener]] or a
+  * [[org.apache.spark.sql.util.QueryExecutionListener]] to the spark session.
+  *
+  * {{{
+  *   // Observe row count (rc) and error row count (erc) in the streaming Dataset
+  *   val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
+  *   observed_ds.writeStream.format("...").start()
+  *
+  *   // Monitor the metrics using a listener.
+  *   spark.streams.addListener(new StreamingQueryListener() {
+  *     override def onQueryProgress(event: QueryProgressEvent): Unit = {
+  *       event.progress.observedMetrics.get("my_event").foreach { row =>
+  *         // Trigger if the number of errors exceeds 5 percent
+  *         val num_rows = row.getAs[Long]("rc")
+  *         val num_error_rows = row.getAs[Long]("erc")
+  *         val ratio = num_error_rows.toDouble / num_rows
+  *         if (ratio > 0.05) {
+  *           // Trigger alert
+  *         }
+  *       }
+  *      }
+  *   })
+  * }}}
+  *
+  * @group typedrel
+  * @since DBR-6.0
 
 Review comment:
   DBR-6.0?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org