You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/12/21 20:03:26 UTC

[GitHub] [spark] grundprinzip commented on a diff in pull request #39091: [SPARK-41527][CONNECT][PYTHON] Implement `DataFrame.observe`

grundprinzip commented on code in PR #39091:
URL: https://github.com/apache/spark/pull/39091#discussion_r1054795195


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -372,6 +374,22 @@ class SparkConnectPlanner(session: SparkSession) {
     }
   }
 
+  private def transformCollectMetrics(rel: proto.CollectMetrics): LogicalPlan = {
+    val metrics = rel.getMetricsList.asScala.toSeq.map { expr =>
+      Column(transformExpression(expr))
+    }
+
+    if (rel.getIsObservation) {
+      val observation = Observation(rel.getName)
+      Dataset
+        .ofRows(session, transformRelation(rel.getInput))
+        .observe(observation, metrics.head, metrics.tail: _*)
+        .logicalPlan
+    } else {
+      CollectMetrics(rel.getName, metrics.map(_.named), transformRelation(rel.getInput))

Review Comment:
   Does this actually make sense? Looking the way this class is used it seems that only `.observe` actually creates an instance of `CollectMetrics`. I'm not sure we're actually exposing this operator as such.
   
   @hvanhovell what's your perspective?



##########
python/pyspark/sql/connect/dataframe.py:
##########
@@ -45,6 +45,7 @@
     UnresolvedRegex,
 )
 from pyspark.sql.connect.functions import col, lit
+from pyspark.sql import Observation

Review Comment:
   I'm not sure this is going to work. The tricky part is that the Observation instance heavily depends on the `_jvm` object. So I'm worried using the type here because it will not be possible to even construct this when using Spark Connect. 
   
   For now, we have two ways out here: 
   
   1) For now we just use string and do the Observation class as a follow up
   2) We're adding an Observation class now.
   
   Personally, to get this PR moving, I would vote for 1
   
   
   



##########
connector/connect/common/src/main/protobuf/spark/connect/base.proto:
##########
@@ -158,6 +158,9 @@ message ExecutePlanResponse {
   // batch of results and then represent the overall state of the query execution.
   Metrics metrics = 4;
 
+  // The metrics observed during the execution of the query plan.
+  ObservedMetrics observed_metrics = 5;

Review Comment:
   ```suggestion
     optional ObservedMetrics observed_metrics = 5;
   ```



##########
connector/connect/common/src/main/protobuf/spark/connect/relations.proto:
##########
@@ -598,3 +599,18 @@ message ToSchema {
   // The Sever side will update the dataframe with this schema.
   DataType schema = 2;
 }
+
+// Collect arbitrary (named) metrics from a dataset.
+message CollectMetrics {
+  // (Required) The input relation.
+  Relation input = 1;
+
+  // (Required) Name of the metrics.
+  string name = 2;
+
+  // (Required) The metric sequence.
+  repeated Expression metrics = 3;
+
+  // (Optional) The identity whether Observation are used.
+  bool is_observation = 4;

Review Comment:
   What happens if this is false?
   ```suggestion
     optional bool is_observation = 4;
   ```



##########
connector/connect/common/src/main/protobuf/spark/connect/base.proto:
##########
@@ -181,6 +184,16 @@ message ExecutePlanResponse {
       string metric_type = 3;
     }
   }
+
+  message ObservedMetrics {
+
+    repeated ObservedMetricsObject metrics_objects = 1;
+
+    message ObservedMetricsObject {
+      string name = 1;
+      repeated string values = 2;

Review Comment:
   Is this equivalent to what we do for regular observations?



##########
connector/connect/common/src/main/protobuf/spark/connect/base.proto:
##########
@@ -181,6 +184,16 @@ message ExecutePlanResponse {
       string metric_type = 3;
     }
   }
+
+  message ObservedMetrics {

Review Comment:
   Doc?



##########
connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala:
##########
@@ -619,6 +619,50 @@ class SparkConnectProtoSuite extends PlanTest with SparkConnectPlanTest {
     comparePlans(connectPlan1, sparkPlan1)
   }
 
+  test("Test observe") {

Review Comment:
   please add negative tests for throwing analysis exceptions when submitting non aggregation functions.



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala:
##########
@@ -179,6 +183,29 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[ExecutePlanResp
       .build()
   }
 
+  def sendObservedMetricsToResponse(

Review Comment:
   ```suggestion
     private def sendObservedMetricsToResponse(
   ```
   
   doc?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org