You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/12/19 02:03:23 UTC

[GitHub] [spark] zhengruifeng commented on a diff in pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement `DataFrame.observe`

zhengruifeng commented on code in PR #39091:
URL: https://github.com/apache/spark/pull/39091#discussion_r1051733557


##########
python/pyspark/sql/connect/dataframe.py:
##########
@@ -891,6 +892,75 @@ def to_jcols(
 
     melt = unpivot
 
+    def observe(
+        self,
+        observation: Union["Observation", str],
+        *exprs: Column,
+    ) -> "DataFrame":
+        """Define (named) metrics to observe on the DataFrame. This method returns an 'observed'
+        DataFrame that returns the same result as the input, with the following guarantees:
+
+        * It will compute the defined aggregates (metrics) on all the data that is flowing through
+            the Dataset at that point.
+
+        * It will report the value of the defined aggregate columns as soon as we reach a completion
+            point. A completion point is either the end of a query (batch mode) or the end of a
+            streaming epoch. The value of the aggregates only reflects the data processed since
+            the previous completion point.
+
+        The metrics columns must either contain a literal (e.g. lit(42)), or should contain one or
+        more aggregate functions (e.g. sum(a) or sum(a + b) + avg(c) - lit(1)). Expressions that
+        contain references to the input Dataset's columns must always be wrapped in an aggregate
+        function.
+
+        A user can observe these metrics by adding
+        Python's :class:`~pyspark.sql.streaming.StreamingQueryListener`,
+        Scala/Java's ``org.apache.spark.sql.streaming.StreamingQueryListener`` or Scala/Java's
+        ``org.apache.spark.sql.util.QueryExecutionListener`` to the spark session.
+
+        .. versionadded:: 3.3.0
+
+        Parameters
+        ----------
+        observation : :class:`Observation` or str
+            `str` to specify the name, or an :class:`Observation` instance to obtain the metric.
+
+            .. versionchanged:: 3.4.0
+               Added support for `str` in this parameter.

Review Comment:
   ```suggestion
   
   ```



##########
python/pyspark/sql/connect/dataframe.py:
##########
@@ -891,6 +892,75 @@ def to_jcols(
 
     melt = unpivot
 
+    def observe(
+        self,
+        observation: Union["Observation", str],
+        *exprs: Column,
+    ) -> "DataFrame":
+        """Define (named) metrics to observe on the DataFrame. This method returns an 'observed'
+        DataFrame that returns the same result as the input, with the following guarantees:
+
+        * It will compute the defined aggregates (metrics) on all the data that is flowing through
+            the Dataset at that point.
+
+        * It will report the value of the defined aggregate columns as soon as we reach a completion
+            point. A completion point is either the end of a query (batch mode) or the end of a
+            streaming epoch. The value of the aggregates only reflects the data processed since
+            the previous completion point.
+
+        The metrics columns must either contain a literal (e.g. lit(42)), or should contain one or
+        more aggregate functions (e.g. sum(a) or sum(a + b) + avg(c) - lit(1)). Expressions that
+        contain references to the input Dataset's columns must always be wrapped in an aggregate
+        function.
+
+        A user can observe these metrics by adding
+        Python's :class:`~pyspark.sql.streaming.StreamingQueryListener`,
+        Scala/Java's ``org.apache.spark.sql.streaming.StreamingQueryListener`` or Scala/Java's
+        ``org.apache.spark.sql.util.QueryExecutionListener`` to the spark session.
+
+        .. versionadded:: 3.3.0
+
+        Parameters
+        ----------
+        observation : :class:`Observation` or str
+            `str` to specify the name, or an :class:`Observation` instance to obtain the metric.
+
+            .. versionchanged:: 3.4.0
+               Added support for `str` in this parameter.
+        exprs : :class:`Column`
+            column expressions (:class:`Column`).
+
+        Returns
+        -------
+        :class:`DataFrame`
+            the observed :class:`DataFrame`.
+
+        Notes
+        -----
+        When ``observation`` is :class:`Observation`, this method only supports batch queries.
+        When ``observation`` is a string, this method works for both batch and streaming queries.
+        Continuous execution is currently not supported yet.
+        """
+        from pyspark.sql import Observation
+
+        if len(exprs) == 0:
+            raise ValueError("'exprs' should not be empty")
+        if not all(isinstance(c, Column) for c in exprs):
+            raise ValueError("all 'exprs' should be Column")
+
+        if isinstance(observation, Observation):
+            return DataFrame.withPlan(
+                plan.CollectMetrics(self._plan, str(observation._name), list(exprs), True),

Review Comment:
   is the implementation in pyspark equivalent to this? https://github.com/apache/spark/blob/c014fa2e18713f67deb072a4336286f4a3f4d3f4/python/pyspark/sql/observation.py#L86-L110



##########
python/pyspark/sql/connect/dataframe.py:
##########
@@ -891,6 +892,75 @@ def to_jcols(
 
     melt = unpivot
 
+    def observe(
+        self,
+        observation: Union["Observation", str],
+        *exprs: Column,
+    ) -> "DataFrame":
+        """Define (named) metrics to observe on the DataFrame. This method returns an 'observed'
+        DataFrame that returns the same result as the input, with the following guarantees:
+
+        * It will compute the defined aggregates (metrics) on all the data that is flowing through
+            the Dataset at that point.
+
+        * It will report the value of the defined aggregate columns as soon as we reach a completion
+            point. A completion point is either the end of a query (batch mode) or the end of a
+            streaming epoch. The value of the aggregates only reflects the data processed since
+            the previous completion point.
+
+        The metrics columns must either contain a literal (e.g. lit(42)), or should contain one or
+        more aggregate functions (e.g. sum(a) or sum(a + b) + avg(c) - lit(1)). Expressions that
+        contain references to the input Dataset's columns must always be wrapped in an aggregate
+        function.
+
+        A user can observe these metrics by adding
+        Python's :class:`~pyspark.sql.streaming.StreamingQueryListener`,
+        Scala/Java's ``org.apache.spark.sql.streaming.StreamingQueryListener`` or Scala/Java's
+        ``org.apache.spark.sql.util.QueryExecutionListener`` to the spark session.
+
+        .. versionadded:: 3.3.0

Review Comment:
   ```suggestion
           .. versionadded:: 3.4.0
   ```



##########
python/pyspark/sql/connect/dataframe.py:
##########
@@ -891,6 +892,75 @@ def to_jcols(
 
     melt = unpivot
 
+    def observe(
+        self,
+        observation: Union["Observation", str],
+        *exprs: Column,
+    ) -> "DataFrame":
+        """Define (named) metrics to observe on the DataFrame. This method returns an 'observed'
+        DataFrame that returns the same result as the input, with the following guarantees:
+
+        * It will compute the defined aggregates (metrics) on all the data that is flowing through
+            the Dataset at that point.
+
+        * It will report the value of the defined aggregate columns as soon as we reach a completion
+            point. A completion point is either the end of a query (batch mode) or the end of a
+            streaming epoch. The value of the aggregates only reflects the data processed since
+            the previous completion point.
+
+        The metrics columns must either contain a literal (e.g. lit(42)), or should contain one or
+        more aggregate functions (e.g. sum(a) or sum(a + b) + avg(c) - lit(1)). Expressions that
+        contain references to the input Dataset's columns must always be wrapped in an aggregate
+        function.
+
+        A user can observe these metrics by adding
+        Python's :class:`~pyspark.sql.streaming.StreamingQueryListener`,
+        Scala/Java's ``org.apache.spark.sql.streaming.StreamingQueryListener`` or Scala/Java's
+        ``org.apache.spark.sql.util.QueryExecutionListener`` to the spark session.
+
+        .. versionadded:: 3.3.0
+
+        Parameters
+        ----------
+        observation : :class:`Observation` or str
+            `str` to specify the name, or an :class:`Observation` instance to obtain the metric.
+
+            .. versionchanged:: 3.4.0
+               Added support for `str` in this parameter.
+        exprs : :class:`Column`
+            column expressions (:class:`Column`).
+
+        Returns
+        -------
+        :class:`DataFrame`
+            the observed :class:`DataFrame`.
+
+        Notes
+        -----
+        When ``observation`` is :class:`Observation`, this method only supports batch queries.
+        When ``observation`` is a string, this method works for both batch and streaming queries.

Review Comment:
   `streaming queries` is out of scope now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org