You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Enrico Minack (Jira)" <ji...@apache.org> on 2021/03/19 20:12:00 UTC
[jira] [Created] (SPARK-34806) Helper class for batch
Dataset.observe()
Enrico Minack created SPARK-34806:
-------------------------------------
Summary: Helper class for batch Dataset.observe()
Key: SPARK-34806
URL: https://issues.apache.org/jira/browse/SPARK-34806
Project: Spark
Issue Type: New Feature
Components: SQL
Affects Versions: 3.2.0
Reporter: Enrico Minack
The {{observe}} method has been added to the {{Dataset}} API in 3.0.0. It allows to collect aggregate metrics over data of a Dataset while they are being processed during an action.
These metrics are collected in a separate thread after registering {{QueryExecutionListener}} for batch datasets and {{StreamingQueryListener}} for stream datasets, respectively. While in streaming context it makes perfectly sense to process incremental metrics in an event-based fashion, for simple batch datatset processing, a single result should be retrievable without the need to register listeners or handling threading.
Introducing an {{Observation}} helper class can hide that complexity for simple use-cases in batch processing.
Similar to {{AccumulatorV2}} provided by {{SparkContext}} (e.g. {{SparkContext.LongAccumulator}}), the {{SparkSession}} can provide a method to create a new {{Observation}} instance and register it with the session.
Alternatively, an {{Observation}} instance could be instantiated on its own which on calling {{Observation.on(Dataset)}} registers with {{Dataset.sparkSession}}. This "registration" registers a listener with the session that retrieves the metrics.
The {{Observation}} class provides methods to retrieve the metrics. This retrieval has to wait for the listener to be called in a separate thread. So all methods will wait for this, optionally with a timeout:
- {{Observation.get}} waits without timeout and returns the metric.
- {{Observation.option(time, unit)}} waits at most {{time}}, returns the metric as an {{Option}}, or {{None}} when the timeout occurs.
- {{Observation.waitCompleted(time, unit)}} waits for the metrics and indicates timeout by returning {{false}}.
Obviously, an action has to be called on the observed dataset before any of these methods are called, otherwise a timeout will occur.
With {{Observation.reset}}, another action can be observed. Finally, {{Observation.close}} unregisters the listener from the session.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org