You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "HyukjinKwon (via GitHub)" <gi...@apache.org> on 2023/12/15 22:22:33 UTC

[PR] [SPARK-46424][PYTHON][SQL] Support Python metrics in Python Data Source [spark]

HyukjinKwon opened a new pull request, #44375:
URL: https://github.com/apache/spark/pull/44375

   ### What changes were proposed in this pull request?
   
   This PR proposes to support Python metrics in Python Data Source so the metrics are reported same as other Python execution and API.
   
   ### Why are the changes needed?
   
   Same metics (https://github.com/apache/spark/pull/33559) should be shown in Python Data Source reading. This is last missing part compared to other Python execution and API.
   
   ### Does this PR introduce _any_ user-facing change?
   
   Python Data Source has not been released yet, so no end-user facing change.
   It shows some new metrics in UI.
   
   ### How was this patch tested?
   
   Unittests were added, and manually tested via UI.
   
   TBD: screenshot
   TBD: unittests
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46424][PYTHON][SQL] Support Python metrics in Python Data Source [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #44375:
URL: https://github.com/apache/spark/pull/44375#discussion_r1428616539


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ApplyInPandasWithStatePythonRunner.scala:
##########
@@ -61,14 +61,12 @@ class ApplyInPandasWithStatePythonRunner(
     keySchema: StructType,
     outputSchema: StructType,
     stateValueSchema: StructType,
-    pyMetrics: Map[String, SQLMetric],
+    override val pythonMetrics: Map[String, SQLMetric],
     jobArtifactUUID: Option[String])
   extends BasePythonRunner[InType, OutType](funcs, evalType, argOffsets, jobArtifactUUID)
   with PythonArrowInput[InType]
   with PythonArrowOutput[OutType] {
 
-  override val pythonMetrics: Option[Map[String, SQLMetric]] = Some(pyMetrics)

Review Comment:
   All those changes are actually some revert of https://github.com/apache/spark/pull/44305 because we now support metrics, and no need to make it optional anymore



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46424][PYTHON][SQL] Support Python metrics in Python Data Source [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on PR #44375:
URL: https://github.com/apache/spark/pull/44375#issuecomment-1858676603

   cc @cloud-fan and @allisonwang-db 
   
   cc @viirya too who actually added this IIRC :-).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46424][PYTHON][SQL] Support Python metrics in Python Data Source [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44375:
URL: https://github.com/apache/spark/pull/44375#discussion_r1429445056


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonDataSource.scala:
##########
@@ -124,21 +128,78 @@ class PythonPartitionReaderFactory(
 
   override def createReader(partition: InputPartition): PartitionReader[InternalRow] = {
     new PartitionReader[InternalRow] {
-      private val outputIter = source.createPartitionReadIteratorInPython(
-        partition.asInstanceOf[PythonInputPartition],
-        pickledReadFunc,
-        outputSchema,
-        jobArtifactUUID)
+      // Dummy SQLMetrics. The result is manually reported via DSv2 interface
+      // via passing the value to `CustomTaskMetric`. Note that `pythonOtherMetricsDesc`
+      // is not used when it is reported. It is to reuse existing Python runner.
+      // See also `UserDefinedPythonDataSource.createPythonMetrics`.
+      private[this] val metrics: Map[String, SQLMetric] = {
+        PythonSQLMetrics.pythonSizeMetricsDesc.keys
+          .map(_ -> new SQLMetric("size", -1)).toMap ++

Review Comment:
   The code is a little tricky. In the DS v2 framework, the reader (runs at the executor side) needs to update and report the current value of its metrics. To reuse existing code, here we use `SQLMetrics` and its value will be updated within `createMapInBatchEvaluatorFactory` (which calls `MapInBatchEvaluatorFactory`). Then we take the value from `SQLMetric` and report it via the DS v2 framework in `currentMetricsValues`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46424][PYTHON][SQL] Support Python metrics in Python Data Source [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon closed pull request #44375: [SPARK-46424][PYTHON][SQL] Support Python metrics in Python Data Source
URL: https://github.com/apache/spark/pull/44375


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46424][PYTHON][SQL] Support Python metrics in Python Data Source [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #44375:
URL: https://github.com/apache/spark/pull/44375#discussion_r1430833376


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonDataSource.scala:
##########
@@ -124,21 +128,78 @@ class PythonPartitionReaderFactory(
 
   override def createReader(partition: InputPartition): PartitionReader[InternalRow] = {
     new PartitionReader[InternalRow] {
-      private val outputIter = source.createPartitionReadIteratorInPython(
-        partition.asInstanceOf[PythonInputPartition],
-        pickledReadFunc,
-        outputSchema,
-        jobArtifactUUID)
+      // Dummy SQLMetrics. The result is manually reported via DSv2 interface
+      // via passing the value to `CustomTaskMetric`. Note that `pythonOtherMetricsDesc`
+      // is not used when it is reported. It is to reuse existing Python runner.
+      // See also `UserDefinedPythonDataSource.createPythonMetrics`.
+      private[this] val metrics: Map[String, SQLMetric] = {
+        PythonSQLMetrics.pythonSizeMetricsDesc.keys
+          .map(_ -> new SQLMetric("size", -1)).toMap ++
+        PythonSQLMetrics.pythonOtherMetricsDesc.keys
+          .map(_ -> new SQLMetric("sum", -1)).toMap
+      }
+
+      private val outputIter = {
+        val evaluatorFactory = source.createMapInBatchEvaluatorFactory(
+          pickledReadFunc,
+          outputSchema,
+          metrics,
+          jobArtifactUUID)
+
+        val part = partition.asInstanceOf[PythonInputPartition]
+        evaluatorFactory.createEvaluator().eval(
+          part.index, Iterator.single(InternalRow(part.pickedPartition)))
+      }
 
       override def next(): Boolean = outputIter.hasNext
 
       override def get(): InternalRow = outputIter.next()
 
       override def close(): Unit = {}
+
+      override def currentMetricsValues(): Array[CustomTaskMetric] = {
+        source.createPythonTaskMetrics(metrics.map { case (k, v) => k -> v.value})
+      }
     }
   }
 }
 
+class PythonCustomMetric extends CustomMetric {
+  private var initName: String = _
+  private var initDescription: String = _
+  def initialize(n: String, d: String): Unit = {

Review Comment:
   Oh, actually I think I can just provide multiple constructors.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46424][PYTHON][SQL] Support Python metrics in Python Data Source [spark]

Posted by "allisonwang-db (via GitHub)" <gi...@apache.org>.
allisonwang-db commented on code in PR #44375:
URL: https://github.com/apache/spark/pull/44375#discussion_r1429412650


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonDataSource.scala:
##########
@@ -124,21 +128,78 @@ class PythonPartitionReaderFactory(
 
   override def createReader(partition: InputPartition): PartitionReader[InternalRow] = {
     new PartitionReader[InternalRow] {
-      private val outputIter = source.createPartitionReadIteratorInPython(
-        partition.asInstanceOf[PythonInputPartition],
-        pickledReadFunc,
-        outputSchema,
-        jobArtifactUUID)
+      // Dummy SQLMetrics. The result is manually reported via DSv2 interface
+      // via passing the value to `CustomTaskMetric`. Note that `pythonOtherMetricsDesc`
+      // is not used when it is reported. It is to reuse existing Python runner.
+      // See also `UserDefinedPythonDataSource.createPythonMetrics`.
+      private[this] val metrics: Map[String, SQLMetric] = {
+        PythonSQLMetrics.pythonSizeMetricsDesc.keys
+          .map(_ -> new SQLMetric("size", -1)).toMap ++

Review Comment:
   Just to make sure I understand this part. Are these size metrics automatically updated by the DSv2 framework? 
   
   Also, is it possible to support user-defined Python metrics in the future?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46424][PYTHON][SQL] Support Python metrics in Python Data Source [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on PR #44375:
URL: https://github.com/apache/spark/pull/44375#issuecomment-1862012383

   Made a PR to clean up.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46424][PYTHON][SQL] Support Python metrics in Python Data Source [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44375:
URL: https://github.com/apache/spark/pull/44375#discussion_r1429446981


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonDataSource.scala:
##########
@@ -124,21 +128,78 @@ class PythonPartitionReaderFactory(
 
   override def createReader(partition: InputPartition): PartitionReader[InternalRow] = {
     new PartitionReader[InternalRow] {
-      private val outputIter = source.createPartitionReadIteratorInPython(
-        partition.asInstanceOf[PythonInputPartition],
-        pickledReadFunc,
-        outputSchema,
-        jobArtifactUUID)
+      // Dummy SQLMetrics. The result is manually reported via DSv2 interface
+      // via passing the value to `CustomTaskMetric`. Note that `pythonOtherMetricsDesc`
+      // is not used when it is reported. It is to reuse existing Python runner.
+      // See also `UserDefinedPythonDataSource.createPythonMetrics`.
+      private[this] val metrics: Map[String, SQLMetric] = {
+        PythonSQLMetrics.pythonSizeMetricsDesc.keys
+          .map(_ -> new SQLMetric("size", -1)).toMap ++
+        PythonSQLMetrics.pythonOtherMetricsDesc.keys
+          .map(_ -> new SQLMetric("sum", -1)).toMap
+      }
+
+      private val outputIter = {
+        val evaluatorFactory = source.createMapInBatchEvaluatorFactory(
+          pickledReadFunc,
+          outputSchema,
+          metrics,
+          jobArtifactUUID)
+
+        val part = partition.asInstanceOf[PythonInputPartition]
+        evaluatorFactory.createEvaluator().eval(
+          part.index, Iterator.single(InternalRow(part.pickedPartition)))
+      }
 
       override def next(): Boolean = outputIter.hasNext
 
       override def get(): InternalRow = outputIter.next()
 
       override def close(): Unit = {}
+
+      override def currentMetricsValues(): Array[CustomTaskMetric] = {
+        source.createPythonTaskMetrics(metrics.map { case (k, v) => k -> v.value})
+      }
     }
   }
 }
 
+class PythonCustomMetric extends CustomMetric {
+  private var initName: String = _
+  private var initDescription: String = _
+  def initialize(n: String, d: String): Unit = {
+    initName = n
+    initDescription = d
+  }
+  override def name(): String = {
+    assert(initName != null)
+    initName
+  }
+  override def description(): String = {
+    assert(initDescription != null)
+    initDescription
+  }
+  override def aggregateTaskMetrics(taskMetrics: Array[Long]): String = {
+    SQLMetrics.stringValue("size", taskMetrics, Array.empty[Long])
+  }
+}
+
+class PythonCustomTaskMetric extends CustomTaskMetric {
+  private var initName: String = _
+  private var initValue: Long = -1L
+  def initialize(n: String, v: Long): Unit = {

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46424][PYTHON][SQL] Support Python metrics in Python Data Source [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #44375:
URL: https://github.com/apache/spark/pull/44375#discussion_r1430831406


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonDataSource.scala:
##########
@@ -124,21 +128,78 @@ class PythonPartitionReaderFactory(
 
   override def createReader(partition: InputPartition): PartitionReader[InternalRow] = {
     new PartitionReader[InternalRow] {
-      private val outputIter = source.createPartitionReadIteratorInPython(
-        partition.asInstanceOf[PythonInputPartition],
-        pickledReadFunc,
-        outputSchema,
-        jobArtifactUUID)
+      // Dummy SQLMetrics. The result is manually reported via DSv2 interface
+      // via passing the value to `CustomTaskMetric`. Note that `pythonOtherMetricsDesc`
+      // is not used when it is reported. It is to reuse existing Python runner.
+      // See also `UserDefinedPythonDataSource.createPythonMetrics`.
+      private[this] val metrics: Map[String, SQLMetric] = {
+        PythonSQLMetrics.pythonSizeMetricsDesc.keys
+          .map(_ -> new SQLMetric("size", -1)).toMap ++
+        PythonSQLMetrics.pythonOtherMetricsDesc.keys
+          .map(_ -> new SQLMetric("sum", -1)).toMap
+      }
+
+      private val outputIter = {
+        val evaluatorFactory = source.createMapInBatchEvaluatorFactory(
+          pickledReadFunc,
+          outputSchema,
+          metrics,
+          jobArtifactUUID)
+
+        val part = partition.asInstanceOf[PythonInputPartition]
+        evaluatorFactory.createEvaluator().eval(
+          part.index, Iterator.single(InternalRow(part.pickedPartition)))
+      }
 
       override def next(): Boolean = outputIter.hasNext
 
       override def get(): InternalRow = outputIter.next()
 
       override def close(): Unit = {}
+
+      override def currentMetricsValues(): Array[CustomTaskMetric] = {
+        source.createPythonTaskMetrics(metrics.map { case (k, v) => k -> v.value})
+      }
     }
   }
 }
 
+class PythonCustomMetric extends CustomMetric {
+  private var initName: String = _
+  private var initDescription: String = _
+  def initialize(n: String, d: String): Unit = {
+    initName = n
+    initDescription = d
+  }
+  override def name(): String = {
+    assert(initName != null)
+    initName
+  }
+  override def description(): String = {
+    assert(initDescription != null)
+    initDescription
+  }
+  override def aggregateTaskMetrics(taskMetrics: Array[Long]): String = {
+    SQLMetrics.stringValue("size", taskMetrics, Array.empty[Long])
+  }
+}
+
+class PythonCustomTaskMetric extends CustomTaskMetric {
+  private var initName: String = _
+  private var initValue: Long = -1L

Review Comment:
   Yeah, I can rename it. Actually we should just say `metricValue` to make it less confusing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46424][PYTHON][SQL] Support Python metrics in Python Data Source [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44375:
URL: https://github.com/apache/spark/pull/44375#discussion_r1429445850


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonDataSource.scala:
##########
@@ -179,11 +240,11 @@ case class UserDefinedPythonDataSource(dataSourceCls: PythonFunction) {
   /**
    * (Executor-side) Create an iterator that reads the input partitions.
    */
-  def createPartitionReadIteratorInPython(
-      partition: PythonInputPartition,
+  def createMapInBatchEvaluatorFactory(

Review Comment:
   do we plan to reuse this method further? If not we can make it return `Iterator[InternalRow]` to simplify the caller code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46424][PYTHON][SQL] Support Python metrics in Python Data Source [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on PR #44375:
URL: https://github.com/apache/spark/pull/44375#issuecomment-1858857944

   Merged to master


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46424][PYTHON][SQL] Support Python metrics in Python Data Source [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44375:
URL: https://github.com/apache/spark/pull/44375#discussion_r1429447534


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonDataSource.scala:
##########
@@ -124,21 +128,78 @@ class PythonPartitionReaderFactory(
 
   override def createReader(partition: InputPartition): PartitionReader[InternalRow] = {
     new PartitionReader[InternalRow] {
-      private val outputIter = source.createPartitionReadIteratorInPython(
-        partition.asInstanceOf[PythonInputPartition],
-        pickledReadFunc,
-        outputSchema,
-        jobArtifactUUID)
+      // Dummy SQLMetrics. The result is manually reported via DSv2 interface
+      // via passing the value to `CustomTaskMetric`. Note that `pythonOtherMetricsDesc`
+      // is not used when it is reported. It is to reuse existing Python runner.
+      // See also `UserDefinedPythonDataSource.createPythonMetrics`.
+      private[this] val metrics: Map[String, SQLMetric] = {
+        PythonSQLMetrics.pythonSizeMetricsDesc.keys
+          .map(_ -> new SQLMetric("size", -1)).toMap ++
+        PythonSQLMetrics.pythonOtherMetricsDesc.keys
+          .map(_ -> new SQLMetric("sum", -1)).toMap
+      }
+
+      private val outputIter = {
+        val evaluatorFactory = source.createMapInBatchEvaluatorFactory(
+          pickledReadFunc,
+          outputSchema,
+          metrics,
+          jobArtifactUUID)
+
+        val part = partition.asInstanceOf[PythonInputPartition]
+        evaluatorFactory.createEvaluator().eval(
+          part.index, Iterator.single(InternalRow(part.pickedPartition)))
+      }
 
       override def next(): Boolean = outputIter.hasNext
 
       override def get(): InternalRow = outputIter.next()
 
       override def close(): Unit = {}
+
+      override def currentMetricsValues(): Array[CustomTaskMetric] = {
+        source.createPythonTaskMetrics(metrics.map { case (k, v) => k -> v.value})
+      }
     }
   }
 }
 
+class PythonCustomMetric extends CustomMetric {
+  private var initName: String = _
+  private var initDescription: String = _
+  def initialize(n: String, d: String): Unit = {
+    initName = n
+    initDescription = d
+  }
+  override def name(): String = {
+    assert(initName != null)
+    initName
+  }
+  override def description(): String = {
+    assert(initDescription != null)
+    initDescription
+  }
+  override def aggregateTaskMetrics(taskMetrics: Array[Long]): String = {
+    SQLMetrics.stringValue("size", taskMetrics, Array.empty[Long])
+  }
+}
+
+class PythonCustomTaskMetric extends CustomTaskMetric {
+  private var initName: String = _
+  private var initValue: Long = -1L

Review Comment:
   I think it's `currentValue`? We always create new instances of `PythonCustomTaskMetric` when reporting the DS v2 metrics. We can optimize this part though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46424][PYTHON][SQL] Support Python metrics in Python Data Source [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44375:
URL: https://github.com/apache/spark/pull/44375#discussion_r1429446785


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonDataSource.scala:
##########
@@ -124,21 +128,78 @@ class PythonPartitionReaderFactory(
 
   override def createReader(partition: InputPartition): PartitionReader[InternalRow] = {
     new PartitionReader[InternalRow] {
-      private val outputIter = source.createPartitionReadIteratorInPython(
-        partition.asInstanceOf[PythonInputPartition],
-        pickledReadFunc,
-        outputSchema,
-        jobArtifactUUID)
+      // Dummy SQLMetrics. The result is manually reported via DSv2 interface
+      // via passing the value to `CustomTaskMetric`. Note that `pythonOtherMetricsDesc`
+      // is not used when it is reported. It is to reuse existing Python runner.
+      // See also `UserDefinedPythonDataSource.createPythonMetrics`.
+      private[this] val metrics: Map[String, SQLMetric] = {
+        PythonSQLMetrics.pythonSizeMetricsDesc.keys
+          .map(_ -> new SQLMetric("size", -1)).toMap ++
+        PythonSQLMetrics.pythonOtherMetricsDesc.keys
+          .map(_ -> new SQLMetric("sum", -1)).toMap
+      }
+
+      private val outputIter = {
+        val evaluatorFactory = source.createMapInBatchEvaluatorFactory(
+          pickledReadFunc,
+          outputSchema,
+          metrics,
+          jobArtifactUUID)
+
+        val part = partition.asInstanceOf[PythonInputPartition]
+        evaluatorFactory.createEvaluator().eval(
+          part.index, Iterator.single(InternalRow(part.pickedPartition)))
+      }
 
       override def next(): Boolean = outputIter.hasNext
 
       override def get(): InternalRow = outputIter.next()
 
       override def close(): Unit = {}
+
+      override def currentMetricsValues(): Array[CustomTaskMetric] = {
+        source.createPythonTaskMetrics(metrics.map { case (k, v) => k -> v.value})
+      }
     }
   }
 }
 
+class PythonCustomMetric extends CustomMetric {
+  private var initName: String = _
+  private var initDescription: String = _
+  def initialize(n: String, d: String): Unit = {

Review Comment:
   the caller side always call `initialize` right after instantiating `PythonCustomMetric`, shall we just add constructor with parameters?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46424][PYTHON][SQL] Support Python metrics in Python Data Source [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #44375:
URL: https://github.com/apache/spark/pull/44375#discussion_r1430829500


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonDataSource.scala:
##########
@@ -124,21 +128,78 @@ class PythonPartitionReaderFactory(
 
   override def createReader(partition: InputPartition): PartitionReader[InternalRow] = {
     new PartitionReader[InternalRow] {
-      private val outputIter = source.createPartitionReadIteratorInPython(
-        partition.asInstanceOf[PythonInputPartition],
-        pickledReadFunc,
-        outputSchema,
-        jobArtifactUUID)
+      // Dummy SQLMetrics. The result is manually reported via DSv2 interface
+      // via passing the value to `CustomTaskMetric`. Note that `pythonOtherMetricsDesc`
+      // is not used when it is reported. It is to reuse existing Python runner.
+      // See also `UserDefinedPythonDataSource.createPythonMetrics`.
+      private[this] val metrics: Map[String, SQLMetric] = {
+        PythonSQLMetrics.pythonSizeMetricsDesc.keys
+          .map(_ -> new SQLMetric("size", -1)).toMap ++
+        PythonSQLMetrics.pythonOtherMetricsDesc.keys
+          .map(_ -> new SQLMetric("sum", -1)).toMap
+      }
+
+      private val outputIter = {
+        val evaluatorFactory = source.createMapInBatchEvaluatorFactory(
+          pickledReadFunc,
+          outputSchema,
+          metrics,
+          jobArtifactUUID)
+
+        val part = partition.asInstanceOf[PythonInputPartition]
+        evaluatorFactory.createEvaluator().eval(
+          part.index, Iterator.single(InternalRow(part.pickedPartition)))
+      }
 
       override def next(): Boolean = outputIter.hasNext
 
       override def get(): InternalRow = outputIter.next()
 
       override def close(): Unit = {}
+
+      override def currentMetricsValues(): Array[CustomTaskMetric] = {
+        source.createPythonTaskMetrics(metrics.map { case (k, v) => k -> v.value})
+      }
     }
   }
 }
 
+class PythonCustomMetric extends CustomMetric {
+  private var initName: String = _
+  private var initDescription: String = _
+  def initialize(n: String, d: String): Unit = {

Review Comment:
   I couldn't because it requires to have 0-argument constructor:
    https://github.com/apache/spark/blob/8cd466136528e479514a1d8dfe8ecaba9d0f8cce/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala#L225-L227



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46424][PYTHON][SQL] Support Python metrics in Python Data Source [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #44375:
URL: https://github.com/apache/spark/pull/44375#discussion_r1430831565


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonDataSource.scala:
##########
@@ -179,11 +240,11 @@ case class UserDefinedPythonDataSource(dataSourceCls: PythonFunction) {
   /**
    * (Executor-side) Create an iterator that reads the input partitions.
    */
-  def createPartitionReadIteratorInPython(
-      partition: PythonInputPartition,
+  def createMapInBatchEvaluatorFactory(

Review Comment:
   👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org