You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/12/16 12:26:32 UTC

[GitHub] [spark] beliefer opened a new pull request, #39091: [SPARK-41527][CONNECT][PYTHON] Implement DataFrame.observe

beliefer opened a new pull request, #39091:
URL: https://github.com/apache/spark/pull/39091

   ### What changes were proposed in this pull request?
   Implement `DataFrame.observe` with a proto message
   
   Implement `DataFrame.observe` for scala API
   Implement `DataFrame.observe` for python API
   
   
   ### Why are the changes needed?
   for Connect API coverage
   
   
   ### Does this PR introduce _any_ user-facing change?
   'No'. New API
   
   
   ### How was this patch tested?
   New test cases.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement `DataFrame.observe`

Posted by GitBox <gi...@apache.org>.
hvanhovell commented on PR #39091:
URL: https://github.com/apache/spark/pull/39091#issuecomment-1358235780

   @beliefer can we just send them as part of the `ExecutePlanResponse` at the end of the query? Doing another RPC seems a bit wasteful, and it means we have to track query state in the server side session.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement `DataFrame.observe`

Posted by GitBox <gi...@apache.org>.
beliefer commented on code in PR #39091:
URL: https://github.com/apache/spark/pull/39091#discussion_r1055179837


##########
python/pyspark/sql/connect/dataframe.py:
##########
@@ -45,6 +45,7 @@
     UnresolvedRegex,
 )
 from pyspark.sql.connect.functions import col, lit
+from pyspark.sql import Observation

Review Comment:
   At first, because users are used to using the Observation from pyspark.sql. I think we better use it too.
   Second, this PR only use the name of Observation, not any action of it.  So, the use is safely here.



##########
python/pyspark/sql/connect/dataframe.py:
##########
@@ -45,6 +45,7 @@
     UnresolvedRegex,
 )
 from pyspark.sql.connect.functions import col, lit
+from pyspark.sql import Observation

Review Comment:
   At first, because users are used to using the `Observation` from pyspark.sql. I think we better use it too.
   Second, this PR only use the name of `Observation`, not any action of it.  So, the use is safely here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement `DataFrame.observe`

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on code in PR #39091:
URL: https://github.com/apache/spark/pull/39091#discussion_r1055511412


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -372,6 +374,22 @@ class SparkConnectPlanner(session: SparkSession) {
     }
   }
 
+  private def transformCollectMetrics(rel: proto.CollectMetrics): LogicalPlan = {
+    val metrics = rel.getMetricsList.asScala.toSeq.map { expr =>
+      Column(transformExpression(expr))
+    }
+
+    if (rel.getIsObservation) {
+      val observation = Observation(rel.getName)
+      Dataset
+        .ofRows(session, transformRelation(rel.getInput))
+        .observe(observation, metrics.head, metrics.tail: _*)
+        .logicalPlan
+    } else {
+      CollectMetrics(rel.getName, metrics.map(_.named), transformRelation(rel.getInput))

Review Comment:
   As outlined above, only this branch is needed, their behavior is identical



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement `DataFrame.observe`

Posted by GitBox <gi...@apache.org>.
beliefer commented on PR #39091:
URL: https://github.com/apache/spark/pull/39091#issuecomment-1359204069

   > @beliefer can we just send them as part of the `ExecutePlanResponse` at the end of the query? Doing another RPC seems a bit wasteful, and it means we have to track query state in the server side session.
   
   This job was finished.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] amaliujia commented on a diff in pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement DataFrame.observe

Posted by GitBox <gi...@apache.org>.
amaliujia commented on code in PR #39091:
URL: https://github.com/apache/spark/pull/39091#discussion_r1051070657


##########
sql/core/src/main/scala/org/apache/spark/sql/Observation.scala:
##########
@@ -45,7 +45,7 @@ import org.apache.spark.sql.util.QueryExecutionListener
  * @param name name of the metric
  * @since 3.3.0
  */
-class Observation(name: String) {
+class Observation(val name: String) {

Review Comment:
   ah without `val`, `name` is treated as a method. Nice catch on this.



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala:
##########
@@ -126,6 +126,20 @@ package object dsl {
           Expression.UnresolvedFunction.newBuilder().setFunctionName("min").addArguments(e))
         .build()
 
+    def proto_max(e: Expression): Expression =

Review Comment:
   Same for below



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement `DataFrame.observe`

Posted by GitBox <gi...@apache.org>.
beliefer commented on code in PR #39091:
URL: https://github.com/apache/spark/pull/39091#discussion_r1055998439


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -338,6 +340,22 @@ class SparkConnectPlanner(session: SparkSession) {
     }
   }
 
+  private def transformCollectMetrics(rel: proto.CollectMetrics): LogicalPlan = {
+    val metrics = rel.getMetricsList.asScala.map { expr =>
+      Column(transformExpression(expr))
+    }
+
+    if (rel.getIsObservation) {

Review Comment:
   OK. Let only support the string.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement `DataFrame.observe`

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #39091:
URL: https://github.com/apache/spark/pull/39091#discussion_r1125285556


##########
connector/connect/common/src/main/protobuf/spark/connect/base.proto:
##########
@@ -158,6 +159,9 @@ message ExecutePlanResponse {
   // batch of results and then represent the overall state of the query execution.
   Metrics metrics = 4;
 
+  // The metrics observed during the execution of the query plan.
+  optional ObservedMetrics observed_metrics = 5;

Review Comment:
   You could also just use an `repeated ObservedMetricsObject` here. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement `DataFrame.observe`

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on PR #39091:
URL: https://github.com/apache/spark/pull/39091#issuecomment-1455279364

   > @beliefer can you please remove the is_observation code path? And take another look at the protocol. Otherwise I think it looks good.
   
   is_observation code path has been removed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement `DataFrame.observe`

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on code in PR #39091:
URL: https://github.com/apache/spark/pull/39091#discussion_r1058070139


##########
connector/connect/common/src/main/protobuf/spark/connect/base.proto:
##########
@@ -181,6 +184,18 @@ message ExecutePlanResponse {
       string metric_type = 3;
     }
   }
+
+  // This message is only used by dataframe.observe if the observation is Observation.
+  message ObservedMetrics {
+
+    repeated ObservedMetricsObject metrics_objects = 1;
+
+    message ObservedMetricsObject {
+      string name = 1;
+      DataType schema = 2;
+      repeated string values = 3;

Review Comment:
   The reason for the previous comment was not necessary to put the schema of the row in every metric, but to figure out if it makes sense to just encode them as literals for now.
   ```suggestion
         repeated Expression.Literal values = 2;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement `DataFrame.observe`

Posted by GitBox <gi...@apache.org>.
beliefer commented on code in PR #39091:
URL: https://github.com/apache/spark/pull/39091#discussion_r1055396383


##########
connector/connect/common/src/main/protobuf/spark/connect/base.proto:
##########
@@ -181,6 +184,16 @@ message ExecutePlanResponse {
       string metric_type = 3;
     }
   }
+
+  message ObservedMetrics {
+
+    repeated ObservedMetricsObject metrics_objects = 1;
+
+    message ObservedMetricsObject {
+      string name = 1;
+      repeated string values = 2;

Review Comment:
   `SQLMetrics` have name, value and metricType and ObservedMetrics have name and value.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement `DataFrame.observe`

Posted by GitBox <gi...@apache.org>.
beliefer commented on code in PR #39091:
URL: https://github.com/apache/spark/pull/39091#discussion_r1055997117


##########
connector/connect/common/src/main/protobuf/spark/connect/relations.proto:
##########
@@ -598,3 +599,18 @@ message ToSchema {
   // The Sever side will update the dataframe with this schema.
   DataType schema = 2;
 }
+
+// Collect arbitrary (named) metrics from a dataset.
+message CollectMetrics {
+  // (Required) The input relation.
+  Relation input = 1;
+
+  // (Required) Name of the metrics.
+  string name = 2;
+
+  // (Required) The metric sequence.
+  repeated Expression metrics = 3;
+
+  // (Optional) The identity whether Observation are used.
+  optional bool is_observation = 4;

Review Comment:
   If we not using the Observation. The connect API will not consistent with pyspark API.
   cc @zhengruifeng @HyukjinKwon @cloud-fan 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement `DataFrame.observe`

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on PR #39091:
URL: https://github.com/apache/spark/pull/39091#issuecomment-1428966021

   > @beliefer will take a look today. Thanks for your hard work and patience!
   
   Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement `DataFrame.observe`

Posted by GitBox <gi...@apache.org>.
beliefer commented on code in PR #39091:
URL: https://github.com/apache/spark/pull/39091#discussion_r1055999303


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -372,6 +374,22 @@ class SparkConnectPlanner(session: SparkSession) {
     }
   }
 
+  private def transformCollectMetrics(rel: proto.CollectMetrics): LogicalPlan = {
+    val metrics = rel.getMetricsList.asScala.toSeq.map { expr =>
+      Column(transformExpression(expr))
+    }
+
+    if (rel.getIsObservation) {
+      val observation = Observation(rel.getName)
+      Dataset
+        .ofRows(session, transformRelation(rel.getInput))
+        .observe(observation, metrics.head, metrics.tail: _*)
+        .logicalPlan
+    } else {
+      CollectMetrics(rel.getName, metrics.map(_.named), transformRelation(rel.getInput))

Review Comment:
   @hvanhovell I'm sorry. I also supports `df.randomSplits`. The reply just now is confused. `df.observe` without cache behavior. We can only support string now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement `DataFrame.observe`

Posted by GitBox <gi...@apache.org>.
beliefer commented on code in PR #39091:
URL: https://github.com/apache/spark/pull/39091#discussion_r1055337201


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -372,6 +374,22 @@ class SparkConnectPlanner(session: SparkSession) {
     }
   }
 
+  private def transformCollectMetrics(rel: proto.CollectMetrics): LogicalPlan = {
+    val metrics = rel.getMetricsList.asScala.toSeq.map { expr =>
+      Column(transformExpression(expr))
+    }
+
+    if (rel.getIsObservation) {
+      val observation = Observation(rel.getName)
+      Dataset
+        .ofRows(session, transformRelation(rel.getInput))
+        .observe(observation, metrics.head, metrics.tail: _*)
+        .logicalPlan
+    } else {
+      CollectMetrics(rel.getName, metrics.map(_.named), transformRelation(rel.getInput))

Review Comment:
   For example, `Unpivot`, `UnresolvedHint` and so on.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement `DataFrame.observe`

Posted by GitBox <gi...@apache.org>.
beliefer commented on code in PR #39091:
URL: https://github.com/apache/spark/pull/39091#discussion_r1055990134


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -372,6 +374,22 @@ class SparkConnectPlanner(session: SparkSession) {
     }
   }
 
+  private def transformCollectMetrics(rel: proto.CollectMetrics): LogicalPlan = {
+    val metrics = rel.getMetricsList.asScala.toSeq.map { expr =>
+      Column(transformExpression(expr))
+    }
+
+    if (rel.getIsObservation) {
+      val observation = Observation(rel.getName)
+      Dataset
+        .ofRows(session, transformRelation(rel.getInput))
+        .observe(observation, metrics.head, metrics.tail: _*)
+        .logicalPlan
+    } else {
+      CollectMetrics(rel.getName, metrics.map(_.named), transformRelation(rel.getInput))

Review Comment:
   The above branch will cache the logical plan but the below branch do not.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement `DataFrame.observe`

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on code in PR #39091:
URL: https://github.com/apache/spark/pull/39091#discussion_r1054795195


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -372,6 +374,22 @@ class SparkConnectPlanner(session: SparkSession) {
     }
   }
 
+  private def transformCollectMetrics(rel: proto.CollectMetrics): LogicalPlan = {
+    val metrics = rel.getMetricsList.asScala.toSeq.map { expr =>
+      Column(transformExpression(expr))
+    }
+
+    if (rel.getIsObservation) {
+      val observation = Observation(rel.getName)
+      Dataset
+        .ofRows(session, transformRelation(rel.getInput))
+        .observe(observation, metrics.head, metrics.tail: _*)
+        .logicalPlan
+    } else {
+      CollectMetrics(rel.getName, metrics.map(_.named), transformRelation(rel.getInput))

Review Comment:
   Does this actually make sense? Looking the way this class is used it seems that only `.observe` actually creates an instance of `CollectMetrics`. I'm not sure we're actually exposing this operator as such.
   
   @hvanhovell what's your perspective?



##########
python/pyspark/sql/connect/dataframe.py:
##########
@@ -45,6 +45,7 @@
     UnresolvedRegex,
 )
 from pyspark.sql.connect.functions import col, lit
+from pyspark.sql import Observation

Review Comment:
   I'm not sure this is going to work. The tricky part is that the Observation instance heavily depends on the `_jvm` object. So I'm worried using the type here because it will not be possible to even construct this when using Spark Connect. 
   
   For now, we have two ways out here: 
   
   1) For now we just use string and do the Observation class as a follow up
   2) We're adding an Observation class now.
   
   Personally, to get this PR moving, I would vote for 1
   
   
   



##########
connector/connect/common/src/main/protobuf/spark/connect/base.proto:
##########
@@ -158,6 +158,9 @@ message ExecutePlanResponse {
   // batch of results and then represent the overall state of the query execution.
   Metrics metrics = 4;
 
+  // The metrics observed during the execution of the query plan.
+  ObservedMetrics observed_metrics = 5;

Review Comment:
   ```suggestion
     optional ObservedMetrics observed_metrics = 5;
   ```



##########
connector/connect/common/src/main/protobuf/spark/connect/relations.proto:
##########
@@ -598,3 +599,18 @@ message ToSchema {
   // The Sever side will update the dataframe with this schema.
   DataType schema = 2;
 }
+
+// Collect arbitrary (named) metrics from a dataset.
+message CollectMetrics {
+  // (Required) The input relation.
+  Relation input = 1;
+
+  // (Required) Name of the metrics.
+  string name = 2;
+
+  // (Required) The metric sequence.
+  repeated Expression metrics = 3;
+
+  // (Optional) The identity whether Observation are used.
+  bool is_observation = 4;

Review Comment:
   What happens if this is false?
   ```suggestion
     optional bool is_observation = 4;
   ```



##########
connector/connect/common/src/main/protobuf/spark/connect/base.proto:
##########
@@ -181,6 +184,16 @@ message ExecutePlanResponse {
       string metric_type = 3;
     }
   }
+
+  message ObservedMetrics {
+
+    repeated ObservedMetricsObject metrics_objects = 1;
+
+    message ObservedMetricsObject {
+      string name = 1;
+      repeated string values = 2;

Review Comment:
   Is this equivalent to what we do for regular observations?



##########
connector/connect/common/src/main/protobuf/spark/connect/base.proto:
##########
@@ -181,6 +184,16 @@ message ExecutePlanResponse {
       string metric_type = 3;
     }
   }
+
+  message ObservedMetrics {

Review Comment:
   Doc?



##########
connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala:
##########
@@ -619,6 +619,50 @@ class SparkConnectProtoSuite extends PlanTest with SparkConnectPlanTest {
     comparePlans(connectPlan1, sparkPlan1)
   }
 
+  test("Test observe") {

Review Comment:
   please add negative tests for throwing analysis exceptions when submitting non aggregation functions.



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala:
##########
@@ -179,6 +183,29 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[ExecutePlanResp
       .build()
   }
 
+  def sendObservedMetricsToResponse(

Review Comment:
   ```suggestion
     private def sendObservedMetricsToResponse(
   ```
   
   doc?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement `DataFrame.observe`

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #39091:
URL: https://github.com/apache/spark/pull/39091#discussion_r1095762825


##########
python/pyspark/sql/connect/dataframe.py:
##########
@@ -45,6 +45,7 @@
     UnresolvedRegex,
 )
 from pyspark.sql.connect.functions import col, lit
+from pyspark.sql import Observation

Review Comment:
   https://github.com/apache/spark/pull/39091#discussion_r1073397082



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement `DataFrame.observe`

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on PR #39091:
URL: https://github.com/apache/spark/pull/39091#issuecomment-1374939250

   Hi @beliefer,
   
   when you're ready for another round of reviews, I would suggest to resolve the comments that you think you have addressed because otherwise it's going to be very hard to understand the current status of the PR. 
   
   In particular, the discussion on the `isObservation` flag in the proto message needs to be addressed to simplify.
   
   Thanks a lot for your contributions!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement `DataFrame.observe`

Posted by GitBox <gi...@apache.org>.
beliefer commented on PR #39091:
URL: https://github.com/apache/spark/pull/39091#issuecomment-1371935451

   ping @hvanhovell @HyukjinKwon 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement DataFrame.observe

Posted by GitBox <gi...@apache.org>.
beliefer commented on PR #39091:
URL: https://github.com/apache/spark/pull/39091#issuecomment-1356003824

   > @beliefer thanks for working on this. I have one question how are we going to get the observed metrics to the client? This seems to be missing from the implementation. One of the approaches would be to send it in a similar way as the metrics in the result code path.
   
   Good question. The result of datasets could passed by grpc server. But the ObservationListener runs on server, it seems we need another way to get.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement `DataFrame.observe`

Posted by GitBox <gi...@apache.org>.
beliefer commented on code in PR #39091:
URL: https://github.com/apache/spark/pull/39091#discussion_r1055270233


##########
connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala:
##########
@@ -619,6 +619,50 @@ class SparkConnectProtoSuite extends PlanTest with SparkConnectPlanTest {
     comparePlans(connectPlan1, sparkPlan1)
   }
 
+  test("Test observe") {

Review Comment:
   This test just test the protocol msg. I can add them into end-to-end place.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement `DataFrame.observe`

Posted by GitBox <gi...@apache.org>.
beliefer commented on PR #39091:
URL: https://github.com/apache/spark/pull/39091#issuecomment-1368190192

   ping @hvanhovell @grundprinzip @zhengruifeng @HyukjinKwon 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement `DataFrame.observe`

Posted by GitBox <gi...@apache.org>.
beliefer commented on code in PR #39091:
URL: https://github.com/apache/spark/pull/39091#discussion_r1073397082


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -338,6 +340,22 @@ class SparkConnectPlanner(session: SparkSession) {
     }
   }
 
+  private def transformCollectMetrics(rel: proto.CollectMetrics): LogicalPlan = {
+    val metrics = rel.getMetricsList.asScala.map { expr =>
+      Column(transformExpression(expr))
+    }
+
+    if (rel.getIsObservation) {

Review Comment:
   > We don't need Observation here. We just need to send the observed metrics as part of the response stream.
   
   But we should maintain the consistency of behavior between the API of spark connect and the API of pyspark. The observe of pyspark supports using `Observation` as parameter and the doc test checks the consistence.
   
   Maybe we could keep the API of connect supports `Observation` and it will not be used at server side, but directly using `CollectMetrics`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement `DataFrame.observe`

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #39091:
URL: https://github.com/apache/spark/pull/39091#discussion_r1125777299


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -338,6 +340,22 @@ class SparkConnectPlanner(session: SparkSession) {
     }
   }
 
+  private def transformCollectMetrics(rel: proto.CollectMetrics): LogicalPlan = {
+    val metrics = rel.getMetricsList.asScala.map { expr =>
+      Column(transformExpression(expr))
+    }
+
+    if (rel.getIsObservation) {

Review Comment:
   @hvanhovell is_observation has been removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement `DataFrame.observe`

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on PR #39091:
URL: https://github.com/apache/spark/pull/39091#issuecomment-1455327845

   Merging to master/3.4


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement `DataFrame.observe`

Posted by GitBox <gi...@apache.org>.
beliefer commented on code in PR #39091:
URL: https://github.com/apache/spark/pull/39091#discussion_r1055998439


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -338,6 +340,22 @@ class SparkConnectPlanner(session: SparkSession) {
     }
   }
 
+  private def transformCollectMetrics(rel: proto.CollectMetrics): LogicalPlan = {
+    val metrics = rel.getMetricsList.asScala.map { expr =>
+      Column(transformExpression(expr))
+    }
+
+    if (rel.getIsObservation) {

Review Comment:
   OK. Let only support the string.



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -338,6 +340,22 @@ class SparkConnectPlanner(session: SparkSession) {
     }
   }
 
+  private def transformCollectMetrics(rel: proto.CollectMetrics): LogicalPlan = {
+    val metrics = rel.getMetricsList.asScala.map { expr =>
+      Column(transformExpression(expr))
+    }
+
+    if (rel.getIsObservation) {

Review Comment:
   OK. I got it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement `DataFrame.observe`

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on PR #39091:
URL: https://github.com/apache/spark/pull/39091#issuecomment-1415786451

   @grundprinzip @hvanhovell  Would you mind elaborating the necessary changes so that we can get it in 3.4?
   
   There were too many comments to follow.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement `DataFrame.observe`

Posted by GitBox <gi...@apache.org>.
beliefer commented on PR #39091:
URL: https://github.com/apache/spark/pull/39091#issuecomment-1359204590

   > I don't have enough experience if it's worth it to do another full round trip to the server for that. Can we experiment for now in just immediately returning them? The observed metrics should be relatively small so it should not be a big deal?
   
   This job was finished.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement `DataFrame.observe`

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on code in PR #39091:
URL: https://github.com/apache/spark/pull/39091#discussion_r1055282372


##########
connector/connect/common/src/main/protobuf/spark/connect/base.proto:
##########
@@ -181,6 +184,16 @@ message ExecutePlanResponse {
       string metric_type = 3;
     }
   }
+
+  message ObservedMetrics {
+
+    repeated ObservedMetricsObject metrics_objects = 1;
+
+    message ObservedMetricsObject {
+      string name = 1;
+      repeated string values = 2;

Review Comment:
   I'm not sure I understand, can you please expand your answer a bit?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement `DataFrame.observe`

Posted by GitBox <gi...@apache.org>.
beliefer commented on code in PR #39091:
URL: https://github.com/apache/spark/pull/39091#discussion_r1055994212


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -338,6 +340,22 @@ class SparkConnectPlanner(session: SparkSession) {
     }
   }
 
+  private def transformCollectMetrics(rel: proto.CollectMetrics): LogicalPlan = {
+    val metrics = rel.getMetricsList.asScala.map { expr =>
+      Column(transformExpression(expr))
+    }
+
+    if (rel.getIsObservation) {

Review Comment:
   OK. I got it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement `DataFrame.observe`

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on code in PR #39091:
URL: https://github.com/apache/spark/pull/39091#discussion_r1058070997


##########
python/pyspark/sql/tests/connect/test_connect_basic.py:
##########
@@ -907,6 +907,38 @@ def test_random_split(self):
             self.assert_eq(relations[i].toPandas(), datasets[i].toPandas())
             i += 1
 
+    def test_observe(self):

Review Comment:
   Please add tests for the two assertions you're adding in dataframe.py



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell closed pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement `DataFrame.observe`

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell closed pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement `DataFrame.observe`
URL: https://github.com/apache/spark/pull/39091


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement `DataFrame.observe`

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on PR #39091:
URL: https://github.com/apache/spark/pull/39091#issuecomment-1455360592

   @hvanhovell @grundprinzip @HyukjinKwon @zhengruifeng @amaliujia Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement `DataFrame.observe`

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on code in PR #39091:
URL: https://github.com/apache/spark/pull/39091#discussion_r1055510989


##########
connector/connect/common/src/main/protobuf/spark/connect/relations.proto:
##########
@@ -598,3 +599,18 @@ message ToSchema {
   // The Sever side will update the dataframe with this schema.
   DataType schema = 2;
 }
+
+// Collect arbitrary (named) metrics from a dataset.
+message CollectMetrics {
+  // (Required) The input relation.
+  Relation input = 1;
+
+  // (Required) Name of the metrics.
+  string name = 2;
+
+  // (Required) The metric sequence.
+  repeated Expression metrics = 3;
+
+  // (Optional) The identity whether Observation are used.
+  optional bool is_observation = 4;

Review Comment:
   So I have checked the code for how DF.observe works. In Scala it has two different overloads, one for Observation and one for string. Both end up calling the same underlying method on the dataframe. Both end up using the `CollectMetrics` and wrap it around the logical plan.
   
   There is no need to have this special type for using the Observation. The simplification for Observation should be created on the client side.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement `DataFrame.observe`

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on PR #39091:
URL: https://github.com/apache/spark/pull/39091#issuecomment-1358101112

   > 2\. create a new relation `GetObservation` for get `Observation` from cache with timeout. If we can get the metrics successfully, wrap the metrics with a local relation and return it to client.
   
   Today we simply stuff the metrics in the pandas data frame into the `pdf['attrs']` property. I'm wondering if we can just do the same here. 
   
   I don't have enough experience if it's worth it to do another full round trip to the server for that. Can we experiment for now in just immediately returning them? The observed metrics should be relatively small so it should not be a big deal?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement `DataFrame.observe`

Posted by GitBox <gi...@apache.org>.
beliefer commented on PR #39091:
URL: https://github.com/apache/spark/pull/39091#issuecomment-1357520476

   > I think it would be possible to add another result batch type for observed metrics and simply pass them at the end.
   
   I have an idea:
   1. cache the `Observation` at server.
   2. create a new relation `GetObservation` for get `Observation` from cache with timeout. If we can get the metrics successfully, wrap the metrics with a local relation and return it to client.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement `DataFrame.observe`

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on PR #39091:
URL: https://github.com/apache/spark/pull/39091#issuecomment-1356677876

   > > @beliefer thanks for working on this. I have one question how are we going to get the observed metrics to the client? This seems to be missing from the implementation. One of the approaches would be to send it in a similar way as the metrics in the result code path.
   > 
   > Good question. The result of datasets could passed by grpc server. But the `ObservationListener` runs on server, it seems we need another way to get.
   
   I think it would be possible to add another result batch type for observed metrics and simply pass them at the end.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement `DataFrame.observe`

Posted by GitBox <gi...@apache.org>.
beliefer commented on PR #39091:
URL: https://github.com/apache/spark/pull/39091#issuecomment-1365847958

   ping @grundprinzip 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement `DataFrame.observe`

Posted by GitBox <gi...@apache.org>.
beliefer commented on code in PR #39091:
URL: https://github.com/apache/spark/pull/39091#discussion_r1060490793


##########
python/pyspark/sql/tests/connect/test_connect_basic.py:
##########
@@ -907,6 +907,38 @@ def test_random_split(self):
             self.assert_eq(relations[i].toPandas(), datasets[i].toPandas())
             i += 1
 
+    def test_observe(self):

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement `DataFrame.observe`

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on code in PR #39091:
URL: https://github.com/apache/spark/pull/39091#discussion_r1061134797


##########
python/pyspark/sql/connect/plan.py:
##########
@@ -924,6 +924,55 @@ def plan(self, session: "SparkConnectClient") -> proto.Relation:
         return plan
 
 
+class CollectMetrics(LogicalPlan):
+    """Logical plan object for a CollectMetrics operation."""
+
+    def __init__(
+        self,
+        child: Optional["LogicalPlan"],
+        name: str,
+        exprs: List["ColumnOrName"],
+    ) -> None:
+        super().__init__(child)
+        self._name = name
+        self._exprs = exprs
+
+    def col_to_expr(self, col: "ColumnOrName", session: "SparkConnectClient") -> proto.Expression:
+        if isinstance(col, Column):
+            return col.to_plan(session)
+        else:
+            return self.unresolved_attr(col)
+
+    def plan(self, session: "SparkConnectClient") -> proto.Relation:
+        assert self._child is not None
+
+        plan = proto.Relation()
+        plan.collect_metrics.input.CopyFrom(self._child.plan(session))
+        plan.collect_metrics.name = self._name
+        plan.collect_metrics.metrics.extend([self.col_to_expr(x, session) for x in self._exprs])
+        return plan
+
+    def print(self, indent: int = 0) -> str:

Review Comment:
   we don't need `print` and `_repr_html_` now



##########
connector/connect/common/src/main/protobuf/spark/connect/base.proto:
##########
@@ -158,6 +159,9 @@ message ExecutePlanResponse {
   // batch of results and then represent the overall state of the query execution.
   Metrics metrics = 4;
 
+  // The metrics observed during the execution of the query plan.
+  optional ObservedMetrics observed_metrics = 5;

Review Comment:
   I think this should be `repeated`?  we may have multi observations in one plan
   
   ```
   In [10]: observation1 = Observation("my metrics 1")
   
   In [11]: observation2 = Observation("my metrics 2")
   
   In [12]: observed_df = df.observe(observation1, count(lit(1)).alias("count"), max(col("age"))).withColumn("xyz", lit(1)).observe(observation
       ...: 2, count(lit(1)).alias("count"), max(col("name")))
   
   In [13]: observed_df.count()
   Out[13]: 2
   
   In [14]: observation1.get
   Out[14]: {'count': 2, 'max(age)': 5}
   
   In [15]: observation2.get
   Out[15]: {'count': 2, 'max(name)': 'Bob'}
   
   In [16]: observed_df.explain()
   == Physical Plan ==
   CollectMetrics my metrics 2, [count(1) AS count#42L, max(name#1) AS max(name)#44]
   +- *(2) Project [age#0L, name#1, 1 AS xyz#37]
      +- CollectMetrics my metrics 1, [count(1) AS count#34L, max(age#0L) AS max(age)#36L]
         +- *(1) Scan ExistingRDD[age#0L,name#1]
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement `DataFrame.observe`

Posted by GitBox <gi...@apache.org>.
beliefer commented on code in PR #39091:
URL: https://github.com/apache/spark/pull/39091#discussion_r1061202835


##########
connector/connect/common/src/main/protobuf/spark/connect/base.proto:
##########
@@ -158,6 +159,9 @@ message ExecutePlanResponse {
   // batch of results and then represent the overall state of the query execution.
   Metrics metrics = 4;
 
+  // The metrics observed during the execution of the query plan.
+  optional ObservedMetrics observed_metrics = 5;

Review Comment:
   Yes. but let `ObservedMetricsObject` be `repeated`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement `DataFrame.observe`

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #39091:
URL: https://github.com/apache/spark/pull/39091#discussion_r1125288854


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -338,6 +340,22 @@ class SparkConnectPlanner(session: SparkSession) {
     }
   }
 
+  private def transformCollectMetrics(rel: proto.CollectMetrics): LogicalPlan = {
+    val metrics = rel.getMetricsList.asScala.map { expr =>
+      Column(transformExpression(expr))
+    }
+
+    if (rel.getIsObservation) {

Review Comment:
   Please remove the `is_observation` code path.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement `DataFrame.observe`

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #39091:
URL: https://github.com/apache/spark/pull/39091#discussion_r1125286902


##########
connector/connect/common/src/main/protobuf/spark/connect/base.proto:
##########
@@ -181,6 +185,17 @@ message ExecutePlanResponse {
       string metric_type = 3;
     }
   }
+
+  // This message is only used by dataframe.observe if the observation is Observation.
+  message ObservedMetrics {
+
+    repeated ObservedMetricsObject metrics_objects = 1;
+
+    message ObservedMetricsObject {

Review Comment:
   This name is a bit awkward. If we go with the repeated approach, see the comment above, the just name this `ObservedMetrics`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement `DataFrame.observe`

Posted by GitBox <gi...@apache.org>.
beliefer commented on code in PR #39091:
URL: https://github.com/apache/spark/pull/39091#discussion_r1058301114


##########
connector/connect/common/src/main/protobuf/spark/connect/base.proto:
##########
@@ -181,6 +184,18 @@ message ExecutePlanResponse {
       string metric_type = 3;
     }
   }
+
+  // This message is only used by dataframe.observe if the observation is Observation.
+  message ObservedMetrics {
+
+    repeated ObservedMetricsObject metrics_objects = 1;
+
+    message ObservedMetricsObject {
+      string name = 1;
+      DataType schema = 2;
+      repeated string values = 3;

Review Comment:
   Good idea.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement `DataFrame.observe`

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on code in PR #39091:
URL: https://github.com/apache/spark/pull/39091#discussion_r1055281974


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -372,6 +374,22 @@ class SparkConnectPlanner(session: SparkSession) {
     }
   }
 
+  private def transformCollectMetrics(rel: proto.CollectMetrics): LogicalPlan = {
+    val metrics = rel.getMetricsList.asScala.toSeq.map { expr =>
+      Column(transformExpression(expr))
+    }
+
+    if (rel.getIsObservation) {
+      val observation = Observation(rel.getName)
+      Dataset
+        .ofRows(session, transformRelation(rel.getInput))
+        .observe(observation, metrics.head, metrics.tail: _*)
+        .logicalPlan
+    } else {
+      CollectMetrics(rel.getName, metrics.map(_.named), transformRelation(rel.getInput))

Review Comment:
   Can you please give me an example of where we're exposing the catalyst operator directly in our API (in particular in the Dataset API)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement `DataFrame.observe`

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on code in PR #39091:
URL: https://github.com/apache/spark/pull/39091#discussion_r1055515097


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala:
##########
@@ -179,6 +184,29 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[ExecutePlanResp
       .build()
   }
 
+  private def sendObservedMetricsToResponse(
+      clientId: String,
+      observedMetrics: Map[String, Row]): ExecutePlanResponse = {
+    val metricsObjects = observedMetrics.map { case (name, row) =>
+      val cols = (0 until row.length).map(row(_).toString)
+      ExecutePlanResponse.ObservedMetrics.ObservedMetricsObject
+        .newBuilder()
+        .setName(name)
+        .addAllValues(cols.asJava)
+        .build()
+    }
+    // Send a last batch with the observed metrics

Review Comment:
   ```suggestion
       // Prepare a response with the observed metrics.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement `DataFrame.observe`

Posted by GitBox <gi...@apache.org>.
beliefer commented on code in PR #39091:
URL: https://github.com/apache/spark/pull/39091#discussion_r1055337443


##########
connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala:
##########
@@ -619,6 +619,50 @@ class SparkConnectProtoSuite extends PlanTest with SparkConnectPlanTest {
     comparePlans(connectPlan1, sparkPlan1)
   }
 
+  test("Test observe") {

Review Comment:
   OK



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement `DataFrame.observe`

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on code in PR #39091:
URL: https://github.com/apache/spark/pull/39091#discussion_r1051733557


##########
python/pyspark/sql/connect/dataframe.py:
##########
@@ -891,6 +892,75 @@ def to_jcols(
 
     melt = unpivot
 
+    def observe(
+        self,
+        observation: Union["Observation", str],
+        *exprs: Column,
+    ) -> "DataFrame":
+        """Define (named) metrics to observe on the DataFrame. This method returns an 'observed'
+        DataFrame that returns the same result as the input, with the following guarantees:
+
+        * It will compute the defined aggregates (metrics) on all the data that is flowing through
+            the Dataset at that point.
+
+        * It will report the value of the defined aggregate columns as soon as we reach a completion
+            point. A completion point is either the end of a query (batch mode) or the end of a
+            streaming epoch. The value of the aggregates only reflects the data processed since
+            the previous completion point.
+
+        The metrics columns must either contain a literal (e.g. lit(42)), or should contain one or
+        more aggregate functions (e.g. sum(a) or sum(a + b) + avg(c) - lit(1)). Expressions that
+        contain references to the input Dataset's columns must always be wrapped in an aggregate
+        function.
+
+        A user can observe these metrics by adding
+        Python's :class:`~pyspark.sql.streaming.StreamingQueryListener`,
+        Scala/Java's ``org.apache.spark.sql.streaming.StreamingQueryListener`` or Scala/Java's
+        ``org.apache.spark.sql.util.QueryExecutionListener`` to the spark session.
+
+        .. versionadded:: 3.3.0
+
+        Parameters
+        ----------
+        observation : :class:`Observation` or str
+            `str` to specify the name, or an :class:`Observation` instance to obtain the metric.
+
+            .. versionchanged:: 3.4.0
+               Added support for `str` in this parameter.

Review Comment:
   ```suggestion
   
   ```



##########
python/pyspark/sql/connect/dataframe.py:
##########
@@ -891,6 +892,75 @@ def to_jcols(
 
     melt = unpivot
 
+    def observe(
+        self,
+        observation: Union["Observation", str],
+        *exprs: Column,
+    ) -> "DataFrame":
+        """Define (named) metrics to observe on the DataFrame. This method returns an 'observed'
+        DataFrame that returns the same result as the input, with the following guarantees:
+
+        * It will compute the defined aggregates (metrics) on all the data that is flowing through
+            the Dataset at that point.
+
+        * It will report the value of the defined aggregate columns as soon as we reach a completion
+            point. A completion point is either the end of a query (batch mode) or the end of a
+            streaming epoch. The value of the aggregates only reflects the data processed since
+            the previous completion point.
+
+        The metrics columns must either contain a literal (e.g. lit(42)), or should contain one or
+        more aggregate functions (e.g. sum(a) or sum(a + b) + avg(c) - lit(1)). Expressions that
+        contain references to the input Dataset's columns must always be wrapped in an aggregate
+        function.
+
+        A user can observe these metrics by adding
+        Python's :class:`~pyspark.sql.streaming.StreamingQueryListener`,
+        Scala/Java's ``org.apache.spark.sql.streaming.StreamingQueryListener`` or Scala/Java's
+        ``org.apache.spark.sql.util.QueryExecutionListener`` to the spark session.
+
+        .. versionadded:: 3.3.0
+
+        Parameters
+        ----------
+        observation : :class:`Observation` or str
+            `str` to specify the name, or an :class:`Observation` instance to obtain the metric.
+
+            .. versionchanged:: 3.4.0
+               Added support for `str` in this parameter.
+        exprs : :class:`Column`
+            column expressions (:class:`Column`).
+
+        Returns
+        -------
+        :class:`DataFrame`
+            the observed :class:`DataFrame`.
+
+        Notes
+        -----
+        When ``observation`` is :class:`Observation`, this method only supports batch queries.
+        When ``observation`` is a string, this method works for both batch and streaming queries.
+        Continuous execution is currently not supported yet.
+        """
+        from pyspark.sql import Observation
+
+        if len(exprs) == 0:
+            raise ValueError("'exprs' should not be empty")
+        if not all(isinstance(c, Column) for c in exprs):
+            raise ValueError("all 'exprs' should be Column")
+
+        if isinstance(observation, Observation):
+            return DataFrame.withPlan(
+                plan.CollectMetrics(self._plan, str(observation._name), list(exprs), True),

Review Comment:
   is the implementation in pyspark equivalent to this? https://github.com/apache/spark/blob/c014fa2e18713f67deb072a4336286f4a3f4d3f4/python/pyspark/sql/observation.py#L86-L110



##########
python/pyspark/sql/connect/dataframe.py:
##########
@@ -891,6 +892,75 @@ def to_jcols(
 
     melt = unpivot
 
+    def observe(
+        self,
+        observation: Union["Observation", str],
+        *exprs: Column,
+    ) -> "DataFrame":
+        """Define (named) metrics to observe on the DataFrame. This method returns an 'observed'
+        DataFrame that returns the same result as the input, with the following guarantees:
+
+        * It will compute the defined aggregates (metrics) on all the data that is flowing through
+            the Dataset at that point.
+
+        * It will report the value of the defined aggregate columns as soon as we reach a completion
+            point. A completion point is either the end of a query (batch mode) or the end of a
+            streaming epoch. The value of the aggregates only reflects the data processed since
+            the previous completion point.
+
+        The metrics columns must either contain a literal (e.g. lit(42)), or should contain one or
+        more aggregate functions (e.g. sum(a) or sum(a + b) + avg(c) - lit(1)). Expressions that
+        contain references to the input Dataset's columns must always be wrapped in an aggregate
+        function.
+
+        A user can observe these metrics by adding
+        Python's :class:`~pyspark.sql.streaming.StreamingQueryListener`,
+        Scala/Java's ``org.apache.spark.sql.streaming.StreamingQueryListener`` or Scala/Java's
+        ``org.apache.spark.sql.util.QueryExecutionListener`` to the spark session.
+
+        .. versionadded:: 3.3.0

Review Comment:
   ```suggestion
           .. versionadded:: 3.4.0
   ```



##########
python/pyspark/sql/connect/dataframe.py:
##########
@@ -891,6 +892,75 @@ def to_jcols(
 
     melt = unpivot
 
+    def observe(
+        self,
+        observation: Union["Observation", str],
+        *exprs: Column,
+    ) -> "DataFrame":
+        """Define (named) metrics to observe on the DataFrame. This method returns an 'observed'
+        DataFrame that returns the same result as the input, with the following guarantees:
+
+        * It will compute the defined aggregates (metrics) on all the data that is flowing through
+            the Dataset at that point.
+
+        * It will report the value of the defined aggregate columns as soon as we reach a completion
+            point. A completion point is either the end of a query (batch mode) or the end of a
+            streaming epoch. The value of the aggregates only reflects the data processed since
+            the previous completion point.
+
+        The metrics columns must either contain a literal (e.g. lit(42)), or should contain one or
+        more aggregate functions (e.g. sum(a) or sum(a + b) + avg(c) - lit(1)). Expressions that
+        contain references to the input Dataset's columns must always be wrapped in an aggregate
+        function.
+
+        A user can observe these metrics by adding
+        Python's :class:`~pyspark.sql.streaming.StreamingQueryListener`,
+        Scala/Java's ``org.apache.spark.sql.streaming.StreamingQueryListener`` or Scala/Java's
+        ``org.apache.spark.sql.util.QueryExecutionListener`` to the spark session.
+
+        .. versionadded:: 3.3.0
+
+        Parameters
+        ----------
+        observation : :class:`Observation` or str
+            `str` to specify the name, or an :class:`Observation` instance to obtain the metric.
+
+            .. versionchanged:: 3.4.0
+               Added support for `str` in this parameter.
+        exprs : :class:`Column`
+            column expressions (:class:`Column`).
+
+        Returns
+        -------
+        :class:`DataFrame`
+            the observed :class:`DataFrame`.
+
+        Notes
+        -----
+        When ``observation`` is :class:`Observation`, this method only supports batch queries.
+        When ``observation`` is a string, this method works for both batch and streaming queries.

Review Comment:
   `streaming queries` is out of scope now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement `DataFrame.observe`

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on code in PR #39091:
URL: https://github.com/apache/spark/pull/39091#discussion_r1055183690


##########
python/pyspark/sql/connect/dataframe.py:
##########
@@ -45,6 +45,7 @@
     UnresolvedRegex,
 )
 from pyspark.sql.connect.functions import col, lit
+from pyspark.sql import Observation

Review Comment:
   You can't use it for type annotations here if it's not legal to construct the type. In addition you're using it to access the name IIRC. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement `DataFrame.observe`

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #39091:
URL: https://github.com/apache/spark/pull/39091#discussion_r1125287159


##########
connector/connect/common/src/main/protobuf/spark/connect/relations.proto:
##########
@@ -598,3 +599,18 @@ message ToSchema {
   // The Sever side will update the dataframe with this schema.
   DataType schema = 2;
 }
+
+// Collect arbitrary (named) metrics from a dataset.
+message CollectMetrics {
+  // (Required) The input relation.
+  Relation input = 1;
+
+  // (Required) Name of the metrics.
+  string name = 2;
+
+  // (Required) The metric sequence.
+  repeated Expression metrics = 3;
+
+  // (Optional) The identity whether Observation are used.
+  optional bool is_observation = 4;

Review Comment:
   Yeah you should be able to remove this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement `DataFrame.observe`

Posted by GitBox <gi...@apache.org>.
beliefer commented on PR #39091:
URL: https://github.com/apache/spark/pull/39091#issuecomment-1383316479

   ping @hvanhovell 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement `DataFrame.observe`

Posted by GitBox <gi...@apache.org>.
beliefer commented on PR #39091:
URL: https://github.com/apache/spark/pull/39091#issuecomment-1376762870

   It seems the failure is unrelated with this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement DataFrame.observe

Posted by GitBox <gi...@apache.org>.
beliefer commented on code in PR #39091:
URL: https://github.com/apache/spark/pull/39091#discussion_r1051302541


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala:
##########
@@ -126,6 +126,20 @@ package object dsl {
           Expression.UnresolvedFunction.newBuilder().setFunctionName("min").addArguments(e))
         .build()
 
+    def proto_max(e: Expression): Expression =

Review Comment:
   I just follows up the existing `proto_min`.



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala:
##########
@@ -126,6 +126,20 @@ package object dsl {
           Expression.UnresolvedFunction.newBuilder().setFunctionName("min").addArguments(e))
         .build()
 
+    def proto_max(e: Expression): Expression =

Review Comment:
   I just follow up the existing `proto_min`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement DataFrame.observe

Posted by GitBox <gi...@apache.org>.
beliefer commented on code in PR #39091:
URL: https://github.com/apache/spark/pull/39091#discussion_r1051302794


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -338,6 +340,22 @@ class SparkConnectPlanner(session: SparkSession) {
     }
   }
 
+  private def transformCollectMetrics(rel: proto.CollectMetrics): LogicalPlan = {
+    val metrics = rel.getMetricsList.asScala.map { expr =>
+      Column(transformExpression(expr))
+    }
+
+    if (rel.getIsObservation) {

Review Comment:
   The Observation registers `ObservationListener` on `ExecutionListenerManager`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement `DataFrame.observe`

Posted by GitBox <gi...@apache.org>.
beliefer commented on code in PR #39091:
URL: https://github.com/apache/spark/pull/39091#discussion_r1055997117


##########
connector/connect/common/src/main/protobuf/spark/connect/relations.proto:
##########
@@ -598,3 +599,18 @@ message ToSchema {
   // The Sever side will update the dataframe with this schema.
   DataType schema = 2;
 }
+
+// Collect arbitrary (named) metrics from a dataset.
+message CollectMetrics {
+  // (Required) The input relation.
+  Relation input = 1;
+
+  // (Required) Name of the metrics.
+  string name = 2;
+
+  // (Required) The metric sequence.
+  repeated Expression metrics = 3;
+
+  // (Optional) The identity whether Observation are used.
+  optional bool is_observation = 4;

Review Comment:
   If we not using the Observation. The connect API will not consistent with pyspark API.
   cc @cloud-fan 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement DataFrame.observe

Posted by GitBox <gi...@apache.org>.
hvanhovell commented on code in PR #39091:
URL: https://github.com/apache/spark/pull/39091#discussion_r1051008772


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -338,6 +340,22 @@ class SparkConnectPlanner(session: SparkSession) {
     }
   }
 
+  private def transformCollectMetrics(rel: proto.CollectMetrics): LogicalPlan = {
+    val metrics = rel.getMetricsList.asScala.map { expr =>
+      Column(transformExpression(expr))
+    }
+
+    if (rel.getIsObservation) {

Review Comment:
   What is the difference between the code paths?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement `DataFrame.observe`

Posted by GitBox <gi...@apache.org>.
beliefer commented on PR #39091:
URL: https://github.com/apache/spark/pull/39091#issuecomment-1360966070

   ping @hvanhovell @grundprinzip @zhengruifeng @amaliujia 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement `DataFrame.observe`

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on PR #39091:
URL: https://github.com/apache/spark/pull/39091#issuecomment-1428036668

   @beliefer will take a look today. Thanks for your hard work and patience!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement `DataFrame.observe`

Posted by GitBox <gi...@apache.org>.
beliefer commented on PR #39091:
URL: https://github.com/apache/spark/pull/39091#issuecomment-1375067246

   > In particular, the discussion on the `isObservation` flag in the proto message needs to be addressed to simplify.
   
   Hi, @grundprinzip . In fact, I removed the `Observation` that used as the parameters of observe. But there are some issue occurs. If the connect's observe API doesn't support `Observation` as parameter, it can't pass the doctest that uses the examples show int document of pyspark's observe API to test.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement `DataFrame.observe`

Posted by GitBox <gi...@apache.org>.
beliefer commented on code in PR #39091:
URL: https://github.com/apache/spark/pull/39091#discussion_r1055975763


##########
connector/connect/common/src/main/protobuf/spark/connect/relations.proto:
##########
@@ -598,3 +599,18 @@ message ToSchema {
   // The Sever side will update the dataframe with this schema.
   DataType schema = 2;
 }
+
+// Collect arbitrary (named) metrics from a dataset.
+message CollectMetrics {
+  // (Required) The input relation.
+  Relation input = 1;
+
+  // (Required) Name of the metrics.
+  string name = 2;
+
+  // (Required) The metric sequence.
+  repeated Expression metrics = 3;
+
+  // (Optional) The identity whether Observation are used.
+  optional bool is_observation = 4;

Review Comment:
   When using Observation, DF.observe cache the plan. The behavior is different. The flag used to tell server side whether cache the plan.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement `DataFrame.observe`

Posted by GitBox <gi...@apache.org>.
beliefer commented on code in PR #39091:
URL: https://github.com/apache/spark/pull/39091#discussion_r1055997117


##########
connector/connect/common/src/main/protobuf/spark/connect/relations.proto:
##########
@@ -598,3 +599,18 @@ message ToSchema {
   // The Sever side will update the dataframe with this schema.
   DataType schema = 2;
 }
+
+// Collect arbitrary (named) metrics from a dataset.
+message CollectMetrics {
+  // (Required) The input relation.
+  Relation input = 1;
+
+  // (Required) Name of the metrics.
+  string name = 2;
+
+  // (Required) The metric sequence.
+  repeated Expression metrics = 3;
+
+  // (Optional) The identity whether Observation are used.
+  optional bool is_observation = 4;

Review Comment:
   OK



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement `DataFrame.observe`

Posted by GitBox <gi...@apache.org>.
beliefer commented on code in PR #39091:
URL: https://github.com/apache/spark/pull/39091#discussion_r1055224132


##########
connector/connect/common/src/main/protobuf/spark/connect/base.proto:
##########
@@ -181,6 +184,16 @@ message ExecutePlanResponse {
       string metric_type = 3;
     }
   }
+
+  message ObservedMetrics {
+
+    repeated ObservedMetricsObject metrics_objects = 1;
+
+    message ObservedMetricsObject {
+      string name = 1;
+      repeated string values = 2;

Review Comment:
   Not the same.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement `DataFrame.observe`

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on code in PR #39091:
URL: https://github.com/apache/spark/pull/39091#discussion_r1055538038


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -338,6 +340,22 @@ class SparkConnectPlanner(session: SparkSession) {
     }
   }
 
+  private def transformCollectMetrics(rel: proto.CollectMetrics): LogicalPlan = {
+    val metrics = rel.getMetricsList.asScala.map { expr =>
+      Column(transformExpression(expr))
+    }
+
+    if (rel.getIsObservation) {

Review Comment:
   This explanation is not fully correct. The `Observation` class uses event listeners to be able to fetch the metrics as soon as they appear without waiting for the DS command to finish. Since we're not having an event listener at this point in time it's not the same thing. Please simplify the PR accordingly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement `DataFrame.observe`

Posted by GitBox <gi...@apache.org>.
beliefer commented on code in PR #39091:
URL: https://github.com/apache/spark/pull/39091#discussion_r1055975763


##########
connector/connect/common/src/main/protobuf/spark/connect/relations.proto:
##########
@@ -598,3 +599,18 @@ message ToSchema {
   // The Sever side will update the dataframe with this schema.
   DataType schema = 2;
 }
+
+// Collect arbitrary (named) metrics from a dataset.
+message CollectMetrics {
+  // (Required) The input relation.
+  Relation input = 1;
+
+  // (Required) Name of the metrics.
+  string name = 2;
+
+  // (Required) The metric sequence.
+  repeated Expression metrics = 3;
+
+  // (Optional) The identity whether Observation are used.
+  optional bool is_observation = 4;

Review Comment:
   When using Observation, DF.observe cache the plan. The behavior is different. The flag used to tell server side whether cache the plan.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement `DataFrame.observe`

Posted by GitBox <gi...@apache.org>.
hvanhovell commented on code in PR #39091:
URL: https://github.com/apache/spark/pull/39091#discussion_r1055797044


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -338,6 +340,22 @@ class SparkConnectPlanner(session: SparkSession) {
     }
   }
 
+  private def transformCollectMetrics(rel: proto.CollectMetrics): LogicalPlan = {
+    val metrics = rel.getMetricsList.asScala.map { expr =>
+      Column(transformExpression(expr))
+    }
+
+    if (rel.getIsObservation) {

Review Comment:
   We don't need Observation here. We just need to send the observed metrics as part of the response stream.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement `DataFrame.observe`

Posted by GitBox <gi...@apache.org>.
beliefer commented on code in PR #39091:
URL: https://github.com/apache/spark/pull/39091#discussion_r1055231316


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -372,6 +374,22 @@ class SparkConnectPlanner(session: SparkSession) {
     }
   }
 
+  private def transformCollectMetrics(rel: proto.CollectMetrics): LogicalPlan = {
+    val metrics = rel.getMetricsList.asScala.toSeq.map { expr =>
+      Column(transformExpression(expr))
+    }
+
+    if (rel.getIsObservation) {
+      val observation = Observation(rel.getName)
+      Dataset
+        .ofRows(session, transformRelation(rel.getInput))
+        .observe(observation, metrics.head, metrics.tail: _*)
+        .logicalPlan
+    } else {
+      CollectMetrics(rel.getName, metrics.map(_.named), transformRelation(rel.getInput))

Review Comment:
   Many other api exposing operator directly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement `DataFrame.observe`

Posted by GitBox <gi...@apache.org>.
hvanhovell commented on code in PR #39091:
URL: https://github.com/apache/spark/pull/39091#discussion_r1055992205


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -372,6 +374,22 @@ class SparkConnectPlanner(session: SparkSession) {
     }
   }
 
+  private def transformCollectMetrics(rel: proto.CollectMetrics): LogicalPlan = {
+    val metrics = rel.getMetricsList.asScala.toSeq.map { expr =>
+      Column(transformExpression(expr))
+    }
+
+    if (rel.getIsObservation) {
+      val observation = Observation(rel.getName)
+      Dataset
+        .ofRows(session, transformRelation(rel.getInput))
+        .observe(observation, metrics.head, metrics.tail: _*)
+        .logicalPlan
+    } else {
+      CollectMetrics(rel.getName, metrics.map(_.named), transformRelation(rel.getInput))

Review Comment:
   I am sorry, but where is the logicalplan cached in all of this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement `DataFrame.observe`

Posted by GitBox <gi...@apache.org>.
beliefer commented on code in PR #39091:
URL: https://github.com/apache/spark/pull/39091#discussion_r1055990134


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -372,6 +374,22 @@ class SparkConnectPlanner(session: SparkSession) {
     }
   }
 
+  private def transformCollectMetrics(rel: proto.CollectMetrics): LogicalPlan = {
+    val metrics = rel.getMetricsList.asScala.toSeq.map { expr =>
+      Column(transformExpression(expr))
+    }
+
+    if (rel.getIsObservation) {
+      val observation = Observation(rel.getName)
+      Dataset
+        .ofRows(session, transformRelation(rel.getInput))
+        .observe(observation, metrics.head, metrics.tail: _*)
+        .logicalPlan
+    } else {
+      CollectMetrics(rel.getName, metrics.map(_.named), transformRelation(rel.getInput))

Review Comment:
   The above branch will cache the logical plan but the below branch do not.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement `DataFrame.observe`

Posted by GitBox <gi...@apache.org>.
beliefer commented on code in PR #39091:
URL: https://github.com/apache/spark/pull/39091#discussion_r1055986739


##########
connector/connect/common/src/main/protobuf/spark/connect/base.proto:
##########
@@ -181,6 +184,16 @@ message ExecutePlanResponse {
       string metric_type = 3;
     }
   }
+
+  message ObservedMetrics {
+
+    repeated ObservedMetricsObject metrics_objects = 1;
+
+    message ObservedMetricsObject {
+      string name = 1;
+      repeated string values = 2;

Review Comment:
   A metric may have multiple different value.
   I will add the schema.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement `DataFrame.observe`

Posted by GitBox <gi...@apache.org>.
hvanhovell commented on code in PR #39091:
URL: https://github.com/apache/spark/pull/39091#discussion_r1055795321


##########
connector/connect/common/src/main/protobuf/spark/connect/base.proto:
##########
@@ -181,6 +184,16 @@ message ExecutePlanResponse {
       string metric_type = 3;
     }
   }
+
+  message ObservedMetrics {
+
+    repeated ObservedMetricsObject metrics_objects = 1;
+
+    message ObservedMetricsObject {
+      string name = 1;
+      repeated string values = 2;

Review Comment:
   Why use values to represent the data? You can just use literals. We may need to add a schema as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement `DataFrame.observe`

Posted by GitBox <gi...@apache.org>.
beliefer commented on code in PR #39091:
URL: https://github.com/apache/spark/pull/39091#discussion_r1055270233


##########
connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala:
##########
@@ -619,6 +619,50 @@ class SparkConnectProtoSuite extends PlanTest with SparkConnectPlanTest {
     comparePlans(connectPlan1, sparkPlan1)
   }
 
+  test("Test observe") {

Review Comment:
   This test just test the protocol msg. I can add them into end-to-end place.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement DataFrame.observe

Posted by GitBox <gi...@apache.org>.
hvanhovell commented on PR #39091:
URL: https://github.com/apache/spark/pull/39091#issuecomment-1355312686

   @beliefer thanks for working on this. I have one question how are we going to get the observed metrics to the client? This seems to be missing from the implementation. One of the approaches would be to send it in a similar way as the metrics in the result code path.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] amaliujia commented on a diff in pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement DataFrame.observe

Posted by GitBox <gi...@apache.org>.
amaliujia commented on code in PR #39091:
URL: https://github.com/apache/spark/pull/39091#discussion_r1051069651


##########
python/pyspark/sql/connect/dataframe.py:
##########
@@ -891,6 +892,75 @@ def to_jcols(
 
     melt = unpivot
 
+    def observe(
+        self,
+        observation: Union["Observation", str],
+        *exprs: Column,
+    ) -> "DataFrame":
+        """Define (named) metrics to observe on the DataFrame. This method returns an 'observed'
+        DataFrame that returns the same result as the input, with the following guarantees:
+
+        * It will compute the defined aggregates (metrics) on all the data that is flowing through
+            the Dataset at that point.
+
+        * It will report the value of the defined aggregate columns as soon as we reach a completion
+            point. A completion point is either the end of a query (batch mode) or the end of a
+            streaming epoch. The value of the aggregates only reflects the data processed since
+            the previous completion point.
+
+        The metrics columns must either contain a literal (e.g. lit(42)), or should contain one or
+        more aggregate functions (e.g. sum(a) or sum(a + b) + avg(c) - lit(1)). Expressions that
+        contain references to the input Dataset's columns must always be wrapped in an aggregate
+        function.
+
+        A user can observe these metrics by adding
+        Python's :class:`~pyspark.sql.streaming.StreamingQueryListener`,
+        Scala/Java's ``org.apache.spark.sql.streaming.StreamingQueryListener`` or Scala/Java's
+        ``org.apache.spark.sql.util.QueryExecutionListener`` to the spark session.
+
+        .. versionadded:: 3.3.0

Review Comment:
   3.4.0?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] amaliujia commented on a diff in pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement DataFrame.observe

Posted by GitBox <gi...@apache.org>.
amaliujia commented on code in PR #39091:
URL: https://github.com/apache/spark/pull/39091#discussion_r1051068680


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala:
##########
@@ -126,6 +126,20 @@ package object dsl {
           Expression.UnresolvedFunction.newBuilder().setFunctionName("min").addArguments(e))
         .build()
 
+    def proto_max(e: Expression): Expression =

Review Comment:
   There is need to add `proto_` prefix? Just call it max? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement `DataFrame.observe`

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on code in PR #39091:
URL: https://github.com/apache/spark/pull/39091#discussion_r1061150381


##########
connector/connect/common/src/main/protobuf/spark/connect/base.proto:
##########
@@ -158,6 +159,9 @@ message ExecutePlanResponse {
   // batch of results and then represent the overall state of the query execution.
   Metrics metrics = 4;
 
+  // The metrics observed during the execution of the query plan.
+  optional ObservedMetrics observed_metrics = 5;

Review Comment:
   may bad, `ObservedMetrics` can contains multi observations



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a diff in pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement `DataFrame.observe`

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #39091:
URL: https://github.com/apache/spark/pull/39091#discussion_r1125655121


##########
connector/connect/common/src/main/protobuf/spark/connect/relations.proto:
##########
@@ -598,3 +599,18 @@ message ToSchema {
   // The Sever side will update the dataframe with this schema.
   DataType schema = 2;
 }
+
+// Collect arbitrary (named) metrics from a dataset.
+message CollectMetrics {
+  // (Required) The input relation.
+  Relation input = 1;
+
+  // (Required) Name of the metrics.
+  string name = 2;
+
+  // (Required) The metric sequence.
+  repeated Expression metrics = 3;
+
+  // (Optional) The identity whether Observation are used.
+  optional bool is_observation = 4;

Review Comment:
   OK



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement `DataFrame.observe`

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on PR #39091:
URL: https://github.com/apache/spark/pull/39091#issuecomment-1427479417

   ping @hvanhovell Could you review again?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org