You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "LuciferYang (via GitHub)" <gi...@apache.org> on 2023/04/21 02:59:18 UTC

[GitHub] [spark] LuciferYang opened a new pull request, #40892: [SPARK-43128][CONNECT][SS] Make `recentProgress` and `lastProgress` return `StreamingQueryProgress` consistent with the native Scala Api

LuciferYang opened a new pull request, #40892:
URL: https://github.com/apache/spark/pull/40892

   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40892: [SPARK-43128][CONNECT] Make `recentProgress` and `lastProgress` return `StreamingQueryProgress` consistent with the native Scala Api

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40892:
URL: https://github.com/apache/spark/pull/40892#discussion_r1173464572


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/progress.scala:
##########
@@ -17,6 +17,297 @@
 
 package org.apache.spark.sql.streaming
 
-class StreamingQueryProgress private[sql] (val json: String) {
-  // TODO(SPARK-43128): (Implement full object by parsing from json).
+import java.{util => ju}
+import java.lang.{Long => JLong}
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.util.control.NonFatal
+
+import com.fasterxml.jackson.annotation.{JsonSetter, Nulls}
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize
+import org.json4s._
+import org.json4s.JsonAST.JValue
+import org.json4s.JsonDSL._
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.annotation.Evolving
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
+import org.apache.spark.sql.streaming.SafeJsonSerializer.{safeDoubleToJValue, safeMapToJValue}
+import org.apache.spark.sql.streaming.SinkProgress.DEFAULT_NUM_OUTPUT_ROWS
+
+/**
+ * Information about updates made to stateful operators in a [[StreamingQuery]] during a trigger.
+ */
+@Evolving
+class StateOperatorProgress private[spark] (
+    val operatorName: String,
+    val numRowsTotal: Long,
+    val numRowsUpdated: Long,
+    val allUpdatesTimeMs: Long,
+    val numRowsRemoved: Long,
+    val allRemovalsTimeMs: Long,
+    val commitTimeMs: Long,
+    val memoryUsedBytes: Long,
+    val numRowsDroppedByWatermark: Long,
+    val numShufflePartitions: Long,
+    val numStateStoreInstances: Long,
+    val customMetrics: ju.Map[String, JLong] = new ju.HashMap())
+    extends Serializable {
+
+  /** The compact JSON representation of this progress. */
+  def json: String = compact(render(jsonValue))
+
+  /** The pretty (i.e. indented) JSON representation of this progress. */
+  def prettyJson: String = pretty(render(jsonValue))
+
+  private[sql] def copy(
+      newNumRowsUpdated: Long,
+      newNumRowsDroppedByWatermark: Long): StateOperatorProgress =
+    new StateOperatorProgress(
+      operatorName = operatorName,
+      numRowsTotal = numRowsTotal,
+      numRowsUpdated = newNumRowsUpdated,
+      allUpdatesTimeMs = allUpdatesTimeMs,
+      numRowsRemoved = numRowsRemoved,
+      allRemovalsTimeMs = allRemovalsTimeMs,
+      commitTimeMs = commitTimeMs,
+      memoryUsedBytes = memoryUsedBytes,
+      numRowsDroppedByWatermark = newNumRowsDroppedByWatermark,
+      numShufflePartitions = numShufflePartitions,
+      numStateStoreInstances = numStateStoreInstances,
+      customMetrics = customMetrics)
+
+  private[sql] def jsonValue: JValue = {
+    ("operatorName" -> JString(operatorName)) ~
+      ("numRowsTotal" -> JInt(numRowsTotal)) ~
+      ("numRowsUpdated" -> JInt(numRowsUpdated)) ~
+      ("allUpdatesTimeMs" -> JInt(allUpdatesTimeMs)) ~
+      ("numRowsRemoved" -> JInt(numRowsRemoved)) ~
+      ("allRemovalsTimeMs" -> JInt(allRemovalsTimeMs)) ~
+      ("commitTimeMs" -> JInt(commitTimeMs)) ~
+      ("memoryUsedBytes" -> JInt(memoryUsedBytes)) ~
+      ("numRowsDroppedByWatermark" -> JInt(numRowsDroppedByWatermark)) ~
+      ("numShufflePartitions" -> JInt(numShufflePartitions)) ~
+      ("numStateStoreInstances" -> JInt(numStateStoreInstances)) ~
+      ("customMetrics" -> {
+        if (!customMetrics.isEmpty) {
+          val keys = customMetrics.keySet.asScala.toSeq.sorted
+          keys.map { k => k -> JInt(customMetrics.get(k).toLong): JObject }.reduce(_ ~ _)
+        } else {
+          JNothing
+        }
+      })
+  }
+
+  override def toString: String = prettyJson
+}
+
+/**
+ * Information about progress made in the execution of a [[StreamingQuery]] during a trigger. Each
+ * event relates to processing done for a single trigger of the streaming query. Events are
+ * emitted even when no new data is available to be processed.
+ *
+ * @param id
+ *   A unique query id that persists across restarts. See `StreamingQuery.id()`.
+ * @param runId
+ *   A query id that is unique for every start/restart. See `StreamingQuery.runId()`.
+ * @param name
+ *   User-specified name of the query, null if not specified.
+ * @param timestamp
+ *   Beginning time of the trigger in ISO8601 format, i.e. UTC timestamps.
+ * @param batchId
+ *   A unique id for the current batch of data being processed. Note that in the case of retries
+ *   after a failure a given batchId my be executed more than once. Similarly, when there is no
+ *   data to be processed, the batchId will not be incremented.
+ * @param batchDuration
+ *   The process duration of each batch.
+ * @param durationMs
+ *   The amount of time taken to perform various operations in milliseconds.
+ * @param eventTime
+ *   Statistics of event time seen in this batch. It may contain the following keys:
+ *   {{{
+ *                   "max" -> "2016-12-05T20:54:20.827Z"  // maximum event time seen in this trigger
+ *                   "min" -> "2016-12-05T20:54:20.827Z"  // minimum event time seen in this trigger
+ *                   "avg" -> "2016-12-05T20:54:20.827Z"  // average event time seen in this trigger
+ *                   "watermark" -> "2016-12-05T20:54:20.827Z"  // watermark used in this trigger
+ *   }}}
+ *   All timestamps are in ISO8601 format, i.e. UTC timestamps.
+ * @param stateOperators
+ *   Information about operators in the query that store state.
+ * @param sources
+ *   detailed statistics on data being read from each of the streaming sources.
+ * @since 3.5.0
+ */
+@Evolving
+class StreamingQueryProgress private[spark] (
+    val id: UUID,
+    val runId: UUID,
+    val name: String,
+    val timestamp: String,
+    val batchId: Long,
+    val batchDuration: Long,
+    val durationMs: ju.Map[String, JLong],
+    val eventTime: ju.Map[String, String],
+    val stateOperators: Array[StateOperatorProgress],
+    val sources: Array[SourceProgress],
+    val sink: SinkProgress,
+    @JsonSetter(nulls = Nulls.AS_EMPTY)
+    @JsonDeserialize(contentAs = classOf[GenericRowWithSchema])
+    val observedMetrics: ju.Map[String, Row])

Review Comment:
   Deserialize result of `observedMetrics` has problem now,  `schema` fo GenericRowWithSchema is null



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on pull request #40892: [SPARK-43128][CONNECT][SS] Make `recentProgress` and `lastProgress` return `StreamingQueryProgress` consistent with the native Scala Api

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on PR #40892:
URL: https://github.com/apache/spark/pull/40892#issuecomment-1550894768

   Thanks @HeartSaVioR @HyukjinKwon @rangadi ~ 
   
   I have already tested my new permissions in other pr :)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40892: [SPARK-43128][CONNECT][SS] Make `recentProgress` and `lastProgress` return `StreamingQueryProgress` consistent with the native Scala Api

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40892:
URL: https://github.com/apache/spark/pull/40892#discussion_r1173259054


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/progress.scala:
##########
@@ -17,6 +17,297 @@
 
 package org.apache.spark.sql.streaming
 
-class StreamingQueryProgress private[sql] (val json: String) {
-  // TODO(SPARK-43128): (Implement full object by parsing from json).
+import java.{util => ju}
+import java.lang.{Long => JLong}
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.util.control.NonFatal
+
+import com.fasterxml.jackson.annotation.{JsonSetter, Nulls}
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize
+import org.json4s._
+import org.json4s.JsonAST.JValue
+import org.json4s.JsonDSL._
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.annotation.Evolving
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
+import org.apache.spark.sql.streaming.SafeJsonSerializer.{safeDoubleToJValue, safeMapToJValue}
+import org.apache.spark.sql.streaming.SinkProgress.DEFAULT_NUM_OUTPUT_ROWS
+
+/**
+ * Information about updates made to stateful operators in a [[StreamingQuery]] during a trigger.
+ */
+@Evolving
+class StateOperatorProgress private[spark] (
+    val operatorName: String,
+    val numRowsTotal: Long,
+    val numRowsUpdated: Long,
+    val allUpdatesTimeMs: Long,
+    val numRowsRemoved: Long,
+    val allRemovalsTimeMs: Long,
+    val commitTimeMs: Long,
+    val memoryUsedBytes: Long,
+    val numRowsDroppedByWatermark: Long,
+    val numShufflePartitions: Long,
+    val numStateStoreInstances: Long,
+    val customMetrics: ju.Map[String, JLong] = new ju.HashMap())
+    extends Serializable {
+
+  /** The compact JSON representation of this progress. */
+  def json: String = compact(render(jsonValue))
+
+  /** The pretty (i.e. indented) JSON representation of this progress. */
+  def prettyJson: String = pretty(render(jsonValue))
+
+  private[sql] def copy(
+      newNumRowsUpdated: Long,
+      newNumRowsDroppedByWatermark: Long): StateOperatorProgress =
+    new StateOperatorProgress(
+      operatorName = operatorName,
+      numRowsTotal = numRowsTotal,
+      numRowsUpdated = newNumRowsUpdated,
+      allUpdatesTimeMs = allUpdatesTimeMs,
+      numRowsRemoved = numRowsRemoved,
+      allRemovalsTimeMs = allRemovalsTimeMs,
+      commitTimeMs = commitTimeMs,
+      memoryUsedBytes = memoryUsedBytes,
+      numRowsDroppedByWatermark = newNumRowsDroppedByWatermark,
+      numShufflePartitions = numShufflePartitions,
+      numStateStoreInstances = numStateStoreInstances,
+      customMetrics = customMetrics)
+
+  private[sql] def jsonValue: JValue = {
+    ("operatorName" -> JString(operatorName)) ~
+      ("numRowsTotal" -> JInt(numRowsTotal)) ~
+      ("numRowsUpdated" -> JInt(numRowsUpdated)) ~
+      ("allUpdatesTimeMs" -> JInt(allUpdatesTimeMs)) ~
+      ("numRowsRemoved" -> JInt(numRowsRemoved)) ~
+      ("allRemovalsTimeMs" -> JInt(allRemovalsTimeMs)) ~
+      ("commitTimeMs" -> JInt(commitTimeMs)) ~
+      ("memoryUsedBytes" -> JInt(memoryUsedBytes)) ~
+      ("numRowsDroppedByWatermark" -> JInt(numRowsDroppedByWatermark)) ~
+      ("numShufflePartitions" -> JInt(numShufflePartitions)) ~
+      ("numStateStoreInstances" -> JInt(numStateStoreInstances)) ~
+      ("customMetrics" -> {
+        if (!customMetrics.isEmpty) {
+          val keys = customMetrics.keySet.asScala.toSeq.sorted
+          keys.map { k => k -> JInt(customMetrics.get(k).toLong): JObject }.reduce(_ ~ _)
+        } else {
+          JNothing
+        }
+      })
+  }
+
+  override def toString: String = prettyJson
+}
+
+/**
+ * Information about progress made in the execution of a [[StreamingQuery]] during a trigger. Each
+ * event relates to processing done for a single trigger of the streaming query. Events are
+ * emitted even when no new data is available to be processed.
+ *
+ * @param id
+ *   A unique query id that persists across restarts. See `StreamingQuery.id()`.
+ * @param runId
+ *   A query id that is unique for every start/restart. See `StreamingQuery.runId()`.
+ * @param name
+ *   User-specified name of the query, null if not specified.
+ * @param timestamp
+ *   Beginning time of the trigger in ISO8601 format, i.e. UTC timestamps.
+ * @param batchId
+ *   A unique id for the current batch of data being processed. Note that in the case of retries
+ *   after a failure a given batchId my be executed more than once. Similarly, when there is no
+ *   data to be processed, the batchId will not be incremented.
+ * @param batchDuration
+ *   The process duration of each batch.
+ * @param durationMs
+ *   The amount of time taken to perform various operations in milliseconds.
+ * @param eventTime
+ *   Statistics of event time seen in this batch. It may contain the following keys:
+ *   {{{
+ *                   "max" -> "2016-12-05T20:54:20.827Z"  // maximum event time seen in this trigger
+ *                   "min" -> "2016-12-05T20:54:20.827Z"  // minimum event time seen in this trigger
+ *                   "avg" -> "2016-12-05T20:54:20.827Z"  // average event time seen in this trigger
+ *                   "watermark" -> "2016-12-05T20:54:20.827Z"  // watermark used in this trigger
+ *   }}}
+ *   All timestamps are in ISO8601 format, i.e. UTC timestamps.
+ * @param stateOperators
+ *   Information about operators in the query that store state.
+ * @param sources
+ *   detailed statistics on data being read from each of the streaming sources.
+ * @since 3.5.0
+ */
+@Evolving
+class StreamingQueryProgress private[spark] (
+    val id: UUID,
+    val runId: UUID,
+    val name: String,
+    val timestamp: String,
+    val batchId: Long,
+    val batchDuration: Long,
+    val durationMs: ju.Map[String, JLong],
+    val eventTime: ju.Map[String, String],
+    val stateOperators: Array[StateOperatorProgress],
+    val sources: Array[SourceProgress],
+    val sink: SinkProgress,
+    @JsonSetter(nulls = Nulls.AS_EMPTY)
+    @JsonDeserialize(contentAs = classOf[GenericRowWithSchema])
+    val observedMetrics: ju.Map[String, Row])

Review Comment:
   `observedMetrics` maybe null, so add `@JsonSetter(nulls = Nulls.AS_EMPTY)` to set it as empty map when it is null



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] bogao007 commented on a diff in pull request #40892: [SPARK-43128][CONNECT] Make `recentProgress` and `lastProgress` return `StreamingQueryProgress` consistent with the native Scala Api

Posted by "bogao007 (via GitHub)" <gi...@apache.org>.
bogao007 commented on code in PR #40892:
URL: https://github.com/apache/spark/pull/40892#discussion_r1173392468


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/progress.scala:
##########
@@ -17,6 +17,297 @@
 
 package org.apache.spark.sql.streaming
 
-class StreamingQueryProgress private[sql] (val json: String) {
-  // TODO(SPARK-43128): (Implement full object by parsing from json).
+import java.{util => ju}
+import java.lang.{Long => JLong}
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.util.control.NonFatal
+
+import com.fasterxml.jackson.annotation.{JsonSetter, Nulls}
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize
+import org.json4s._
+import org.json4s.JsonAST.JValue
+import org.json4s.JsonDSL._
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.annotation.Evolving
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
+import org.apache.spark.sql.streaming.SafeJsonSerializer.{safeDoubleToJValue, safeMapToJValue}
+import org.apache.spark.sql.streaming.SinkProgress.DEFAULT_NUM_OUTPUT_ROWS
+
+/**
+ * Information about updates made to stateful operators in a [[StreamingQuery]] during a trigger.
+ */
+@Evolving
+class StateOperatorProgress private[spark] (
+    val operatorName: String,
+    val numRowsTotal: Long,
+    val numRowsUpdated: Long,
+    val allUpdatesTimeMs: Long,
+    val numRowsRemoved: Long,
+    val allRemovalsTimeMs: Long,
+    val commitTimeMs: Long,
+    val memoryUsedBytes: Long,
+    val numRowsDroppedByWatermark: Long,
+    val numShufflePartitions: Long,
+    val numStateStoreInstances: Long,
+    val customMetrics: ju.Map[String, JLong] = new ju.HashMap())
+    extends Serializable {
+
+  /** The compact JSON representation of this progress. */
+  def json: String = compact(render(jsonValue))
+
+  /** The pretty (i.e. indented) JSON representation of this progress. */
+  def prettyJson: String = pretty(render(jsonValue))
+
+  private[sql] def copy(
+      newNumRowsUpdated: Long,
+      newNumRowsDroppedByWatermark: Long): StateOperatorProgress =
+    new StateOperatorProgress(
+      operatorName = operatorName,
+      numRowsTotal = numRowsTotal,
+      numRowsUpdated = newNumRowsUpdated,
+      allUpdatesTimeMs = allUpdatesTimeMs,
+      numRowsRemoved = numRowsRemoved,
+      allRemovalsTimeMs = allRemovalsTimeMs,
+      commitTimeMs = commitTimeMs,
+      memoryUsedBytes = memoryUsedBytes,
+      numRowsDroppedByWatermark = newNumRowsDroppedByWatermark,
+      numShufflePartitions = numShufflePartitions,
+      numStateStoreInstances = numStateStoreInstances,
+      customMetrics = customMetrics)
+
+  private[sql] def jsonValue: JValue = {
+    ("operatorName" -> JString(operatorName)) ~
+      ("numRowsTotal" -> JInt(numRowsTotal)) ~
+      ("numRowsUpdated" -> JInt(numRowsUpdated)) ~
+      ("allUpdatesTimeMs" -> JInt(allUpdatesTimeMs)) ~
+      ("numRowsRemoved" -> JInt(numRowsRemoved)) ~
+      ("allRemovalsTimeMs" -> JInt(allRemovalsTimeMs)) ~
+      ("commitTimeMs" -> JInt(commitTimeMs)) ~
+      ("memoryUsedBytes" -> JInt(memoryUsedBytes)) ~
+      ("numRowsDroppedByWatermark" -> JInt(numRowsDroppedByWatermark)) ~
+      ("numShufflePartitions" -> JInt(numShufflePartitions)) ~
+      ("numStateStoreInstances" -> JInt(numStateStoreInstances)) ~
+      ("customMetrics" -> {
+        if (!customMetrics.isEmpty) {
+          val keys = customMetrics.keySet.asScala.toSeq.sorted
+          keys.map { k => k -> JInt(customMetrics.get(k).toLong): JObject }.reduce(_ ~ _)
+        } else {
+          JNothing
+        }
+      })
+  }
+
+  override def toString: String = prettyJson
+}
+
+/**
+ * Information about progress made in the execution of a [[StreamingQuery]] during a trigger. Each
+ * event relates to processing done for a single trigger of the streaming query. Events are
+ * emitted even when no new data is available to be processed.
+ *
+ * @param id
+ *   A unique query id that persists across restarts. See `StreamingQuery.id()`.
+ * @param runId
+ *   A query id that is unique for every start/restart. See `StreamingQuery.runId()`.
+ * @param name
+ *   User-specified name of the query, null if not specified.
+ * @param timestamp
+ *   Beginning time of the trigger in ISO8601 format, i.e. UTC timestamps.
+ * @param batchId
+ *   A unique id for the current batch of data being processed. Note that in the case of retries
+ *   after a failure a given batchId my be executed more than once. Similarly, when there is no
+ *   data to be processed, the batchId will not be incremented.
+ * @param batchDuration
+ *   The process duration of each batch.
+ * @param durationMs
+ *   The amount of time taken to perform various operations in milliseconds.
+ * @param eventTime
+ *   Statistics of event time seen in this batch. It may contain the following keys:
+ *   {{{
+ *                   "max" -> "2016-12-05T20:54:20.827Z"  // maximum event time seen in this trigger
+ *                   "min" -> "2016-12-05T20:54:20.827Z"  // minimum event time seen in this trigger
+ *                   "avg" -> "2016-12-05T20:54:20.827Z"  // average event time seen in this trigger
+ *                   "watermark" -> "2016-12-05T20:54:20.827Z"  // watermark used in this trigger
+ *   }}}
+ *   All timestamps are in ISO8601 format, i.e. UTC timestamps.
+ * @param stateOperators
+ *   Information about operators in the query that store state.
+ * @param sources
+ *   detailed statistics on data being read from each of the streaming sources.
+ * @since 3.5.0
+ */
+@Evolving
+class StreamingQueryProgress private[spark] (
+    val id: UUID,
+    val runId: UUID,
+    val name: String,
+    val timestamp: String,
+    val batchId: Long,
+    val batchDuration: Long,
+    val durationMs: ju.Map[String, JLong],
+    val eventTime: ju.Map[String, String],
+    val stateOperators: Array[StateOperatorProgress],
+    val sources: Array[SourceProgress],
+    val sink: SinkProgress,
+    @JsonSetter(nulls = Nulls.AS_EMPTY)
+    @JsonDeserialize(contentAs = classOf[GenericRowWithSchema])
+    val observedMetrics: ju.Map[String, Row])
+    extends Serializable {
+
+  /** The aggregate (across all sources) number of records processed in a trigger. */
+  def numInputRows: Long = sources.map(_.numInputRows).sum

Review Comment:
   Aggregated metrics like `numInputRows`, `inputRowsPerSecond` and `processedRowsPerSecond` are being calculated inside this class. Can they be safely deserialized from json string? Might also worth it to add them into the test case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] bogao007 commented on pull request #40892: [SPARK-43128][CONNECT] Make `recentProgress` and `lastProgress` return `StreamingQueryProgress` consistent with the native Scala Api

Posted by "bogao007 (via GitHub)" <gi...@apache.org>.
bogao007 commented on PR #40892:
URL: https://github.com/apache/spark/pull/40892#issuecomment-1517378241

   cc @rangadi 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #40892: [SPARK-43128][CONNECT][SS] Make `recentProgress` and `lastProgress` return `StreamingQueryProgress` consistent with the native Scala Api

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on PR #40892:
URL: https://github.com/apache/spark/pull/40892#issuecomment-1547265202

   I'll leave this to you for self-merging, so that you can test your new permission. Congrats again :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #40892: [SPARK-43128][CONNECT][SS] Make `recentProgress` and `lastProgress` return `StreamingQueryProgress` consistent with the native Scala Api

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on PR #40892:
URL: https://github.com/apache/spark/pull/40892#issuecomment-1550890958

   Thanks @LuciferYang , I merged this to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40892: [SPARK-43128][CONNECT] Make `recentProgress` and `lastProgress` return `StreamingQueryProgress` consistent with the native Scala Api

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40892:
URL: https://github.com/apache/spark/pull/40892#discussion_r1173598454


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -2182,7 +2182,7 @@ class SparkConnectPlanner(val session: SparkSession) {
         respBuilder.setRecentProgress(
           StreamingQueryCommandResult.RecentProgressResult
             .newBuilder()
-            .addAllRecentProgressJson(progressReports.map(_.json).asJava)
+            .addAllRecentProgressJson(progressReports.map(StreamingQueryProgress.jsonString).asJava)

Review Comment:
   The recentProgressJosn should like
   
   ```
   {
     "id" : "33ac26f4-1c39-46ce-b798-f3d2a21211d4",
     "runId" : "849c2c9a-b9f8-446f-9180-259a60fd888c",
     "name" : "myName",
     "timestamp" : "2016-12-05T20:54:20.827Z",
     "batchId" : 2,
     "batchDuration" : 0,
     "durationMs" : {
       "total" : 0
     },
     ...
     "observedMetrics" : {
       "event1" : {
         "values" : [ 1, 3.0 ],
         "schema" : {
           "type" : "struct",
           "fields" : [ {
             "name" : "c1",
             "type" : "long",
             "nullable" : true,
             "metadata" : { }
           }, {
             "name" : "c2",
             "type" : "double",
             "nullable" : true,
             "metadata" : { }
           } ]
         }
       },
       "event2" : {
         "values" : [ 1, "hello", "world" ],
         "schema" : {
           "type" : "struct",
           "fields" : [ {
             "name" : "rc",
             "type" : "long",
             "nullable" : true,
             "metadata" : { }
           }, {
             "name" : "min_q",
             "type" : "string",
             "nullable" : true,
             "metadata" : { }
           }, {
             "name" : "max_q",
             "type" : "string",
             "nullable" : true,
             "metadata" : { }
           } ]
         }
       }
     }
   }
   ```
   
   then we can rebuild `observedMetrics` on the connect client side



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40892: [SPARK-43128][CONNECT][SS] Make `recentProgress` and `lastProgress` return `StreamingQueryProgress` consistent with the native Scala Api

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40892:
URL: https://github.com/apache/spark/pull/40892#discussion_r1173259564


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/progress.scala:
##########
@@ -17,6 +17,297 @@
 
 package org.apache.spark.sql.streaming
 
-class StreamingQueryProgress private[sql] (val json: String) {
-  // TODO(SPARK-43128): (Implement full object by parsing from json).
+import java.{util => ju}
+import java.lang.{Long => JLong}
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.util.control.NonFatal
+
+import com.fasterxml.jackson.annotation.{JsonSetter, Nulls}
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize
+import org.json4s._
+import org.json4s.JsonAST.JValue
+import org.json4s.JsonDSL._
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.annotation.Evolving
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
+import org.apache.spark.sql.streaming.SafeJsonSerializer.{safeDoubleToJValue, safeMapToJValue}
+import org.apache.spark.sql.streaming.SinkProgress.DEFAULT_NUM_OUTPUT_ROWS
+
+/**
+ * Information about updates made to stateful operators in a [[StreamingQuery]] during a trigger.
+ */
+@Evolving
+class StateOperatorProgress private[spark] (
+    val operatorName: String,
+    val numRowsTotal: Long,
+    val numRowsUpdated: Long,
+    val allUpdatesTimeMs: Long,
+    val numRowsRemoved: Long,
+    val allRemovalsTimeMs: Long,
+    val commitTimeMs: Long,
+    val memoryUsedBytes: Long,
+    val numRowsDroppedByWatermark: Long,
+    val numShufflePartitions: Long,
+    val numStateStoreInstances: Long,
+    val customMetrics: ju.Map[String, JLong] = new ju.HashMap())

Review Comment:
   Should we also add `@JsonSetter(nulls = Nulls.AS_EMPTY)` as protection for other collection types?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40892: [SPARK-43128][CONNECT] Make `recentProgress` and `lastProgress` return `StreamingQueryProgress` consistent with the native Scala Api

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40892:
URL: https://github.com/apache/spark/pull/40892#discussion_r1173508858


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/progress.scala:
##########
@@ -17,6 +17,297 @@
 
 package org.apache.spark.sql.streaming
 
-class StreamingQueryProgress private[sql] (val json: String) {
-  // TODO(SPARK-43128): (Implement full object by parsing from json).
+import java.{util => ju}
+import java.lang.{Long => JLong}
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.util.control.NonFatal
+
+import com.fasterxml.jackson.annotation.{JsonSetter, Nulls}
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize
+import org.json4s._
+import org.json4s.JsonAST.JValue
+import org.json4s.JsonDSL._
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.annotation.Evolving
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
+import org.apache.spark.sql.streaming.SafeJsonSerializer.{safeDoubleToJValue, safeMapToJValue}
+import org.apache.spark.sql.streaming.SinkProgress.DEFAULT_NUM_OUTPUT_ROWS
+
+/**
+ * Information about updates made to stateful operators in a [[StreamingQuery]] during a trigger.
+ */
+@Evolving
+class StateOperatorProgress private[spark] (
+    val operatorName: String,
+    val numRowsTotal: Long,
+    val numRowsUpdated: Long,
+    val allUpdatesTimeMs: Long,
+    val numRowsRemoved: Long,
+    val allRemovalsTimeMs: Long,
+    val commitTimeMs: Long,
+    val memoryUsedBytes: Long,
+    val numRowsDroppedByWatermark: Long,
+    val numShufflePartitions: Long,
+    val numStateStoreInstances: Long,
+    val customMetrics: ju.Map[String, JLong] = new ju.HashMap())
+    extends Serializable {
+
+  /** The compact JSON representation of this progress. */
+  def json: String = compact(render(jsonValue))
+
+  /** The pretty (i.e. indented) JSON representation of this progress. */
+  def prettyJson: String = pretty(render(jsonValue))
+
+  private[sql] def copy(
+      newNumRowsUpdated: Long,
+      newNumRowsDroppedByWatermark: Long): StateOperatorProgress =
+    new StateOperatorProgress(
+      operatorName = operatorName,
+      numRowsTotal = numRowsTotal,
+      numRowsUpdated = newNumRowsUpdated,
+      allUpdatesTimeMs = allUpdatesTimeMs,
+      numRowsRemoved = numRowsRemoved,
+      allRemovalsTimeMs = allRemovalsTimeMs,
+      commitTimeMs = commitTimeMs,
+      memoryUsedBytes = memoryUsedBytes,
+      numRowsDroppedByWatermark = newNumRowsDroppedByWatermark,
+      numShufflePartitions = numShufflePartitions,
+      numStateStoreInstances = numStateStoreInstances,
+      customMetrics = customMetrics)
+
+  private[sql] def jsonValue: JValue = {
+    ("operatorName" -> JString(operatorName)) ~
+      ("numRowsTotal" -> JInt(numRowsTotal)) ~
+      ("numRowsUpdated" -> JInt(numRowsUpdated)) ~
+      ("allUpdatesTimeMs" -> JInt(allUpdatesTimeMs)) ~
+      ("numRowsRemoved" -> JInt(numRowsRemoved)) ~
+      ("allRemovalsTimeMs" -> JInt(allRemovalsTimeMs)) ~
+      ("commitTimeMs" -> JInt(commitTimeMs)) ~
+      ("memoryUsedBytes" -> JInt(memoryUsedBytes)) ~
+      ("numRowsDroppedByWatermark" -> JInt(numRowsDroppedByWatermark)) ~
+      ("numShufflePartitions" -> JInt(numShufflePartitions)) ~
+      ("numStateStoreInstances" -> JInt(numStateStoreInstances)) ~
+      ("customMetrics" -> {
+        if (!customMetrics.isEmpty) {
+          val keys = customMetrics.keySet.asScala.toSeq.sorted
+          keys.map { k => k -> JInt(customMetrics.get(k).toLong): JObject }.reduce(_ ~ _)
+        } else {
+          JNothing
+        }
+      })
+  }
+
+  override def toString: String = prettyJson
+}
+
+/**
+ * Information about progress made in the execution of a [[StreamingQuery]] during a trigger. Each
+ * event relates to processing done for a single trigger of the streaming query. Events are
+ * emitted even when no new data is available to be processed.
+ *
+ * @param id
+ *   A unique query id that persists across restarts. See `StreamingQuery.id()`.
+ * @param runId
+ *   A query id that is unique for every start/restart. See `StreamingQuery.runId()`.
+ * @param name
+ *   User-specified name of the query, null if not specified.
+ * @param timestamp
+ *   Beginning time of the trigger in ISO8601 format, i.e. UTC timestamps.
+ * @param batchId
+ *   A unique id for the current batch of data being processed. Note that in the case of retries
+ *   after a failure a given batchId my be executed more than once. Similarly, when there is no
+ *   data to be processed, the batchId will not be incremented.
+ * @param batchDuration
+ *   The process duration of each batch.
+ * @param durationMs
+ *   The amount of time taken to perform various operations in milliseconds.
+ * @param eventTime
+ *   Statistics of event time seen in this batch. It may contain the following keys:
+ *   {{{
+ *                   "max" -> "2016-12-05T20:54:20.827Z"  // maximum event time seen in this trigger
+ *                   "min" -> "2016-12-05T20:54:20.827Z"  // minimum event time seen in this trigger
+ *                   "avg" -> "2016-12-05T20:54:20.827Z"  // average event time seen in this trigger
+ *                   "watermark" -> "2016-12-05T20:54:20.827Z"  // watermark used in this trigger
+ *   }}}
+ *   All timestamps are in ISO8601 format, i.e. UTC timestamps.
+ * @param stateOperators
+ *   Information about operators in the query that store state.
+ * @param sources
+ *   detailed statistics on data being read from each of the streaming sources.
+ * @since 3.5.0
+ */
+@Evolving
+class StreamingQueryProgress private[spark] (
+    val id: UUID,
+    val runId: UUID,
+    val name: String,
+    val timestamp: String,
+    val batchId: Long,
+    val batchDuration: Long,
+    val durationMs: ju.Map[String, JLong],
+    val eventTime: ju.Map[String, String],
+    val stateOperators: Array[StateOperatorProgress],
+    val sources: Array[SourceProgress],
+    val sink: SinkProgress,
+    @JsonSetter(nulls = Nulls.AS_EMPTY)
+    @JsonDeserialize(contentAs = classOf[GenericRowWithSchema])
+    val observedMetrics: ju.Map[String, Row])

Review Comment:
   https://github.com/apache/spark/blob/d407a42090d7c027050be7ee723f7e3d8f686ed7/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala#L2182-L2186
   
   @rangadi @bogao007 I think we should serialize the `StreamingQueryProgress` to json as a pojo to  maintain the row schema information in `observedMetrics`, `progressReports.map(_.json)` will lose schmea information of `GenericRowWithSchema`, this results in the inability to deserialize to GenericRowWithSchema on the client side
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on pull request #40892: [SPARK-43128][CONNECT] Make `recentProgress` and `lastProgress` return `StreamingQueryProgress` consistent with the native Scala Api

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on PR #40892:
URL: https://github.com/apache/spark/pull/40892#issuecomment-1517213684

   If this is not consistent with the initial idea of this ticket, please let me know
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rangadi commented on a diff in pull request #40892: [SPARK-43128][CONNECT][SS] Make `recentProgress` and `lastProgress` return `StreamingQueryProgress` consistent with the native Scala Api

Posted by "rangadi (via GitHub)" <gi...@apache.org>.
rangadi commented on code in PR #40892:
URL: https://github.com/apache/spark/pull/40892#discussion_r1177220790


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala:
##########
@@ -163,14 +163,14 @@ class RemoteStreamingQuery(
 
   override def recentProgress: Array[StreamingQueryProgress] = {
     executeQueryCmd(_.setRecentProgress(true)).getRecentProgress.getRecentProgressJsonList.asScala
-      .map(json => new StreamingQueryProgress(json))
+      .map(StreamingQueryProgress.fromJson)
       .toArray
   }
 
   override def lastProgress: StreamingQueryProgress = {
     executeQueryCmd(
       _.setLastProgress(true)).getRecentProgress.getRecentProgressJsonList.asScala.headOption
-      .map(json => new StreamingQueryProgress(json))
+      .map(StreamingQueryProgress.fromJson)

Review Comment:
   This is a good idea (at least for now). So essentially we don't need to add many protobuf message to match :).  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on pull request #40892: [SPARK-43128][CONNECT][SS] Make `recentProgress` and `lastProgress` return `StreamingQueryProgress` consistent with the native Scala Api

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on PR #40892:
URL: https://github.com/apache/spark/pull/40892#issuecomment-1537695404

   rebased and resolved conflicts


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on pull request #40892: [SPARK-43128][CONNECT][SS] Make `recentProgress` and `lastProgress` return `StreamingQueryProgress` consistent with the native Scala Api

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on PR #40892:
URL: https://github.com/apache/spark/pull/40892#issuecomment-1538352094

   friendly ping @HyukjinKwon can we merge this one?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #40892: [SPARK-43128][CONNECT][SS] Make `recentProgress` and `lastProgress` return `StreamingQueryProgress` consistent with the native Scala Api

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on PR #40892:
URL: https://github.com/apache/spark/pull/40892#issuecomment-1550888639

   I'll just help merging this one as it has been here for multiple weeks and we don't want to require this PR to be rebased anymore.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR closed pull request #40892: [SPARK-43128][CONNECT][SS] Make `recentProgress` and `lastProgress` return `StreamingQueryProgress` consistent with the native Scala Api

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR closed pull request #40892: [SPARK-43128][CONNECT][SS] Make `recentProgress` and `lastProgress` return `StreamingQueryProgress` consistent with the native Scala Api
URL: https://github.com/apache/spark/pull/40892


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on pull request #40892: [SPARK-43128][CONNECT][SS] Make `recentProgress` and `lastProgress` return `StreamingQueryProgress` consistent with the native Scala Api

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on PR #40892:
URL: https://github.com/apache/spark/pull/40892#issuecomment-1547269299

   > I'll leave this to you for self-merging, so that you can test your new permission. Congrats again :)
   
   Thanks @HeartSaVioR :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] bogao007 commented on a diff in pull request #40892: [SPARK-43128][CONNECT] Make `recentProgress` and `lastProgress` return `StreamingQueryProgress` consistent with the native Scala Api

Posted by "bogao007 (via GitHub)" <gi...@apache.org>.
bogao007 commented on code in PR #40892:
URL: https://github.com/apache/spark/pull/40892#discussion_r1173392468


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/progress.scala:
##########
@@ -17,6 +17,297 @@
 
 package org.apache.spark.sql.streaming
 
-class StreamingQueryProgress private[sql] (val json: String) {
-  // TODO(SPARK-43128): (Implement full object by parsing from json).
+import java.{util => ju}
+import java.lang.{Long => JLong}
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.util.control.NonFatal
+
+import com.fasterxml.jackson.annotation.{JsonSetter, Nulls}
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize
+import org.json4s._
+import org.json4s.JsonAST.JValue
+import org.json4s.JsonDSL._
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.annotation.Evolving
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
+import org.apache.spark.sql.streaming.SafeJsonSerializer.{safeDoubleToJValue, safeMapToJValue}
+import org.apache.spark.sql.streaming.SinkProgress.DEFAULT_NUM_OUTPUT_ROWS
+
+/**
+ * Information about updates made to stateful operators in a [[StreamingQuery]] during a trigger.
+ */
+@Evolving
+class StateOperatorProgress private[spark] (
+    val operatorName: String,
+    val numRowsTotal: Long,
+    val numRowsUpdated: Long,
+    val allUpdatesTimeMs: Long,
+    val numRowsRemoved: Long,
+    val allRemovalsTimeMs: Long,
+    val commitTimeMs: Long,
+    val memoryUsedBytes: Long,
+    val numRowsDroppedByWatermark: Long,
+    val numShufflePartitions: Long,
+    val numStateStoreInstances: Long,
+    val customMetrics: ju.Map[String, JLong] = new ju.HashMap())
+    extends Serializable {
+
+  /** The compact JSON representation of this progress. */
+  def json: String = compact(render(jsonValue))
+
+  /** The pretty (i.e. indented) JSON representation of this progress. */
+  def prettyJson: String = pretty(render(jsonValue))
+
+  private[sql] def copy(
+      newNumRowsUpdated: Long,
+      newNumRowsDroppedByWatermark: Long): StateOperatorProgress =
+    new StateOperatorProgress(
+      operatorName = operatorName,
+      numRowsTotal = numRowsTotal,
+      numRowsUpdated = newNumRowsUpdated,
+      allUpdatesTimeMs = allUpdatesTimeMs,
+      numRowsRemoved = numRowsRemoved,
+      allRemovalsTimeMs = allRemovalsTimeMs,
+      commitTimeMs = commitTimeMs,
+      memoryUsedBytes = memoryUsedBytes,
+      numRowsDroppedByWatermark = newNumRowsDroppedByWatermark,
+      numShufflePartitions = numShufflePartitions,
+      numStateStoreInstances = numStateStoreInstances,
+      customMetrics = customMetrics)
+
+  private[sql] def jsonValue: JValue = {
+    ("operatorName" -> JString(operatorName)) ~
+      ("numRowsTotal" -> JInt(numRowsTotal)) ~
+      ("numRowsUpdated" -> JInt(numRowsUpdated)) ~
+      ("allUpdatesTimeMs" -> JInt(allUpdatesTimeMs)) ~
+      ("numRowsRemoved" -> JInt(numRowsRemoved)) ~
+      ("allRemovalsTimeMs" -> JInt(allRemovalsTimeMs)) ~
+      ("commitTimeMs" -> JInt(commitTimeMs)) ~
+      ("memoryUsedBytes" -> JInt(memoryUsedBytes)) ~
+      ("numRowsDroppedByWatermark" -> JInt(numRowsDroppedByWatermark)) ~
+      ("numShufflePartitions" -> JInt(numShufflePartitions)) ~
+      ("numStateStoreInstances" -> JInt(numStateStoreInstances)) ~
+      ("customMetrics" -> {
+        if (!customMetrics.isEmpty) {
+          val keys = customMetrics.keySet.asScala.toSeq.sorted
+          keys.map { k => k -> JInt(customMetrics.get(k).toLong): JObject }.reduce(_ ~ _)
+        } else {
+          JNothing
+        }
+      })
+  }
+
+  override def toString: String = prettyJson
+}
+
+/**
+ * Information about progress made in the execution of a [[StreamingQuery]] during a trigger. Each
+ * event relates to processing done for a single trigger of the streaming query. Events are
+ * emitted even when no new data is available to be processed.
+ *
+ * @param id
+ *   A unique query id that persists across restarts. See `StreamingQuery.id()`.
+ * @param runId
+ *   A query id that is unique for every start/restart. See `StreamingQuery.runId()`.
+ * @param name
+ *   User-specified name of the query, null if not specified.
+ * @param timestamp
+ *   Beginning time of the trigger in ISO8601 format, i.e. UTC timestamps.
+ * @param batchId
+ *   A unique id for the current batch of data being processed. Note that in the case of retries
+ *   after a failure a given batchId my be executed more than once. Similarly, when there is no
+ *   data to be processed, the batchId will not be incremented.
+ * @param batchDuration
+ *   The process duration of each batch.
+ * @param durationMs
+ *   The amount of time taken to perform various operations in milliseconds.
+ * @param eventTime
+ *   Statistics of event time seen in this batch. It may contain the following keys:
+ *   {{{
+ *                   "max" -> "2016-12-05T20:54:20.827Z"  // maximum event time seen in this trigger
+ *                   "min" -> "2016-12-05T20:54:20.827Z"  // minimum event time seen in this trigger
+ *                   "avg" -> "2016-12-05T20:54:20.827Z"  // average event time seen in this trigger
+ *                   "watermark" -> "2016-12-05T20:54:20.827Z"  // watermark used in this trigger
+ *   }}}
+ *   All timestamps are in ISO8601 format, i.e. UTC timestamps.
+ * @param stateOperators
+ *   Information about operators in the query that store state.
+ * @param sources
+ *   detailed statistics on data being read from each of the streaming sources.
+ * @since 3.5.0
+ */
+@Evolving
+class StreamingQueryProgress private[spark] (
+    val id: UUID,
+    val runId: UUID,
+    val name: String,
+    val timestamp: String,
+    val batchId: Long,
+    val batchDuration: Long,
+    val durationMs: ju.Map[String, JLong],
+    val eventTime: ju.Map[String, String],
+    val stateOperators: Array[StateOperatorProgress],
+    val sources: Array[SourceProgress],
+    val sink: SinkProgress,
+    @JsonSetter(nulls = Nulls.AS_EMPTY)
+    @JsonDeserialize(contentAs = classOf[GenericRowWithSchema])
+    val observedMetrics: ju.Map[String, Row])
+    extends Serializable {
+
+  /** The aggregate (across all sources) number of records processed in a trigger. */
+  def numInputRows: Long = sources.map(_.numInputRows).sum

Review Comment:
   Aggregated metrics like `numInputRows`, `inputRowsPerSecond` and `processedRowsPerSecond` are being calculated inside this class. Can they be safely deserialized from json string? Might also worth to add them into the test case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40892: [SPARK-43128][CONNECT] Make `recentProgress` and `lastProgress` return `StreamingQueryProgress` consistent with the native Scala Api

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40892:
URL: https://github.com/apache/spark/pull/40892#discussion_r1173602327


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/progress.scala:
##########
@@ -17,6 +17,297 @@
 
 package org.apache.spark.sql.streaming
 
-class StreamingQueryProgress private[sql] (val json: String) {
-  // TODO(SPARK-43128): (Implement full object by parsing from json).
+import java.{util => ju}
+import java.lang.{Long => JLong}
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.util.control.NonFatal
+
+import com.fasterxml.jackson.annotation.{JsonSetter, Nulls}
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize
+import org.json4s._
+import org.json4s.JsonAST.JValue
+import org.json4s.JsonDSL._
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.annotation.Evolving
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
+import org.apache.spark.sql.streaming.SafeJsonSerializer.{safeDoubleToJValue, safeMapToJValue}
+import org.apache.spark.sql.streaming.SinkProgress.DEFAULT_NUM_OUTPUT_ROWS
+
+/**
+ * Information about updates made to stateful operators in a [[StreamingQuery]] during a trigger.
+ */
+@Evolving
+class StateOperatorProgress private[spark] (
+    val operatorName: String,
+    val numRowsTotal: Long,
+    val numRowsUpdated: Long,
+    val allUpdatesTimeMs: Long,
+    val numRowsRemoved: Long,
+    val allRemovalsTimeMs: Long,
+    val commitTimeMs: Long,
+    val memoryUsedBytes: Long,
+    val numRowsDroppedByWatermark: Long,
+    val numShufflePartitions: Long,
+    val numStateStoreInstances: Long,
+    val customMetrics: ju.Map[String, JLong] = new ju.HashMap())
+    extends Serializable {
+
+  /** The compact JSON representation of this progress. */
+  def json: String = compact(render(jsonValue))
+
+  /** The pretty (i.e. indented) JSON representation of this progress. */
+  def prettyJson: String = pretty(render(jsonValue))
+
+  private[sql] def copy(
+      newNumRowsUpdated: Long,
+      newNumRowsDroppedByWatermark: Long): StateOperatorProgress =
+    new StateOperatorProgress(
+      operatorName = operatorName,
+      numRowsTotal = numRowsTotal,
+      numRowsUpdated = newNumRowsUpdated,
+      allUpdatesTimeMs = allUpdatesTimeMs,
+      numRowsRemoved = numRowsRemoved,
+      allRemovalsTimeMs = allRemovalsTimeMs,
+      commitTimeMs = commitTimeMs,
+      memoryUsedBytes = memoryUsedBytes,
+      numRowsDroppedByWatermark = newNumRowsDroppedByWatermark,
+      numShufflePartitions = numShufflePartitions,
+      numStateStoreInstances = numStateStoreInstances,
+      customMetrics = customMetrics)
+
+  private[sql] def jsonValue: JValue = {
+    ("operatorName" -> JString(operatorName)) ~
+      ("numRowsTotal" -> JInt(numRowsTotal)) ~
+      ("numRowsUpdated" -> JInt(numRowsUpdated)) ~
+      ("allUpdatesTimeMs" -> JInt(allUpdatesTimeMs)) ~
+      ("numRowsRemoved" -> JInt(numRowsRemoved)) ~
+      ("allRemovalsTimeMs" -> JInt(allRemovalsTimeMs)) ~
+      ("commitTimeMs" -> JInt(commitTimeMs)) ~
+      ("memoryUsedBytes" -> JInt(memoryUsedBytes)) ~
+      ("numRowsDroppedByWatermark" -> JInt(numRowsDroppedByWatermark)) ~
+      ("numShufflePartitions" -> JInt(numShufflePartitions)) ~
+      ("numStateStoreInstances" -> JInt(numStateStoreInstances)) ~
+      ("customMetrics" -> {
+        if (!customMetrics.isEmpty) {
+          val keys = customMetrics.keySet.asScala.toSeq.sorted
+          keys.map { k => k -> JInt(customMetrics.get(k).toLong): JObject }.reduce(_ ~ _)
+        } else {
+          JNothing
+        }
+      })
+  }
+
+  override def toString: String = prettyJson
+}
+
+/**
+ * Information about progress made in the execution of a [[StreamingQuery]] during a trigger. Each
+ * event relates to processing done for a single trigger of the streaming query. Events are
+ * emitted even when no new data is available to be processed.
+ *
+ * @param id
+ *   A unique query id that persists across restarts. See `StreamingQuery.id()`.
+ * @param runId
+ *   A query id that is unique for every start/restart. See `StreamingQuery.runId()`.
+ * @param name
+ *   User-specified name of the query, null if not specified.
+ * @param timestamp
+ *   Beginning time of the trigger in ISO8601 format, i.e. UTC timestamps.
+ * @param batchId
+ *   A unique id for the current batch of data being processed. Note that in the case of retries
+ *   after a failure a given batchId my be executed more than once. Similarly, when there is no
+ *   data to be processed, the batchId will not be incremented.
+ * @param batchDuration
+ *   The process duration of each batch.
+ * @param durationMs
+ *   The amount of time taken to perform various operations in milliseconds.
+ * @param eventTime
+ *   Statistics of event time seen in this batch. It may contain the following keys:
+ *   {{{
+ *                   "max" -> "2016-12-05T20:54:20.827Z"  // maximum event time seen in this trigger
+ *                   "min" -> "2016-12-05T20:54:20.827Z"  // minimum event time seen in this trigger
+ *                   "avg" -> "2016-12-05T20:54:20.827Z"  // average event time seen in this trigger
+ *                   "watermark" -> "2016-12-05T20:54:20.827Z"  // watermark used in this trigger
+ *   }}}
+ *   All timestamps are in ISO8601 format, i.e. UTC timestamps.
+ * @param stateOperators
+ *   Information about operators in the query that store state.
+ * @param sources
+ *   detailed statistics on data being read from each of the streaming sources.
+ * @since 3.5.0
+ */
+@Evolving
+class StreamingQueryProgress private[spark] (
+    val id: UUID,
+    val runId: UUID,
+    val name: String,
+    val timestamp: String,
+    val batchId: Long,
+    val batchDuration: Long,
+    val durationMs: ju.Map[String, JLong],
+    val eventTime: ju.Map[String, String],
+    val stateOperators: Array[StateOperatorProgress],
+    val sources: Array[SourceProgress],
+    val sink: SinkProgress,
+    @JsonSetter(nulls = Nulls.AS_EMPTY)
+    @JsonDeserialize(contentAs = classOf[GenericRowWithSchema])
+    val observedMetrics: ju.Map[String, Row])

Review Comment:
   Maybe py test will fail, let's wait for ci



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40892: [SPARK-43128][CONNECT] Make `recentProgress` and `lastProgress` return `StreamingQueryProgress` consistent with the native Scala Api

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40892:
URL: https://github.com/apache/spark/pull/40892#discussion_r1173464572


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/progress.scala:
##########
@@ -17,6 +17,297 @@
 
 package org.apache.spark.sql.streaming
 
-class StreamingQueryProgress private[sql] (val json: String) {
-  // TODO(SPARK-43128): (Implement full object by parsing from json).
+import java.{util => ju}
+import java.lang.{Long => JLong}
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.util.control.NonFatal
+
+import com.fasterxml.jackson.annotation.{JsonSetter, Nulls}
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize
+import org.json4s._
+import org.json4s.JsonAST.JValue
+import org.json4s.JsonDSL._
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.annotation.Evolving
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
+import org.apache.spark.sql.streaming.SafeJsonSerializer.{safeDoubleToJValue, safeMapToJValue}
+import org.apache.spark.sql.streaming.SinkProgress.DEFAULT_NUM_OUTPUT_ROWS
+
+/**
+ * Information about updates made to stateful operators in a [[StreamingQuery]] during a trigger.
+ */
+@Evolving
+class StateOperatorProgress private[spark] (
+    val operatorName: String,
+    val numRowsTotal: Long,
+    val numRowsUpdated: Long,
+    val allUpdatesTimeMs: Long,
+    val numRowsRemoved: Long,
+    val allRemovalsTimeMs: Long,
+    val commitTimeMs: Long,
+    val memoryUsedBytes: Long,
+    val numRowsDroppedByWatermark: Long,
+    val numShufflePartitions: Long,
+    val numStateStoreInstances: Long,
+    val customMetrics: ju.Map[String, JLong] = new ju.HashMap())
+    extends Serializable {
+
+  /** The compact JSON representation of this progress. */
+  def json: String = compact(render(jsonValue))
+
+  /** The pretty (i.e. indented) JSON representation of this progress. */
+  def prettyJson: String = pretty(render(jsonValue))
+
+  private[sql] def copy(
+      newNumRowsUpdated: Long,
+      newNumRowsDroppedByWatermark: Long): StateOperatorProgress =
+    new StateOperatorProgress(
+      operatorName = operatorName,
+      numRowsTotal = numRowsTotal,
+      numRowsUpdated = newNumRowsUpdated,
+      allUpdatesTimeMs = allUpdatesTimeMs,
+      numRowsRemoved = numRowsRemoved,
+      allRemovalsTimeMs = allRemovalsTimeMs,
+      commitTimeMs = commitTimeMs,
+      memoryUsedBytes = memoryUsedBytes,
+      numRowsDroppedByWatermark = newNumRowsDroppedByWatermark,
+      numShufflePartitions = numShufflePartitions,
+      numStateStoreInstances = numStateStoreInstances,
+      customMetrics = customMetrics)
+
+  private[sql] def jsonValue: JValue = {
+    ("operatorName" -> JString(operatorName)) ~
+      ("numRowsTotal" -> JInt(numRowsTotal)) ~
+      ("numRowsUpdated" -> JInt(numRowsUpdated)) ~
+      ("allUpdatesTimeMs" -> JInt(allUpdatesTimeMs)) ~
+      ("numRowsRemoved" -> JInt(numRowsRemoved)) ~
+      ("allRemovalsTimeMs" -> JInt(allRemovalsTimeMs)) ~
+      ("commitTimeMs" -> JInt(commitTimeMs)) ~
+      ("memoryUsedBytes" -> JInt(memoryUsedBytes)) ~
+      ("numRowsDroppedByWatermark" -> JInt(numRowsDroppedByWatermark)) ~
+      ("numShufflePartitions" -> JInt(numShufflePartitions)) ~
+      ("numStateStoreInstances" -> JInt(numStateStoreInstances)) ~
+      ("customMetrics" -> {
+        if (!customMetrics.isEmpty) {
+          val keys = customMetrics.keySet.asScala.toSeq.sorted
+          keys.map { k => k -> JInt(customMetrics.get(k).toLong): JObject }.reduce(_ ~ _)
+        } else {
+          JNothing
+        }
+      })
+  }
+
+  override def toString: String = prettyJson
+}
+
+/**
+ * Information about progress made in the execution of a [[StreamingQuery]] during a trigger. Each
+ * event relates to processing done for a single trigger of the streaming query. Events are
+ * emitted even when no new data is available to be processed.
+ *
+ * @param id
+ *   A unique query id that persists across restarts. See `StreamingQuery.id()`.
+ * @param runId
+ *   A query id that is unique for every start/restart. See `StreamingQuery.runId()`.
+ * @param name
+ *   User-specified name of the query, null if not specified.
+ * @param timestamp
+ *   Beginning time of the trigger in ISO8601 format, i.e. UTC timestamps.
+ * @param batchId
+ *   A unique id for the current batch of data being processed. Note that in the case of retries
+ *   after a failure a given batchId my be executed more than once. Similarly, when there is no
+ *   data to be processed, the batchId will not be incremented.
+ * @param batchDuration
+ *   The process duration of each batch.
+ * @param durationMs
+ *   The amount of time taken to perform various operations in milliseconds.
+ * @param eventTime
+ *   Statistics of event time seen in this batch. It may contain the following keys:
+ *   {{{
+ *                   "max" -> "2016-12-05T20:54:20.827Z"  // maximum event time seen in this trigger
+ *                   "min" -> "2016-12-05T20:54:20.827Z"  // minimum event time seen in this trigger
+ *                   "avg" -> "2016-12-05T20:54:20.827Z"  // average event time seen in this trigger
+ *                   "watermark" -> "2016-12-05T20:54:20.827Z"  // watermark used in this trigger
+ *   }}}
+ *   All timestamps are in ISO8601 format, i.e. UTC timestamps.
+ * @param stateOperators
+ *   Information about operators in the query that store state.
+ * @param sources
+ *   detailed statistics on data being read from each of the streaming sources.
+ * @since 3.5.0
+ */
+@Evolving
+class StreamingQueryProgress private[spark] (
+    val id: UUID,
+    val runId: UUID,
+    val name: String,
+    val timestamp: String,
+    val batchId: Long,
+    val batchDuration: Long,
+    val durationMs: ju.Map[String, JLong],
+    val eventTime: ju.Map[String, String],
+    val stateOperators: Array[StateOperatorProgress],
+    val sources: Array[SourceProgress],
+    val sink: SinkProgress,
+    @JsonSetter(nulls = Nulls.AS_EMPTY)
+    @JsonDeserialize(contentAs = classOf[GenericRowWithSchema])
+    val observedMetrics: ju.Map[String, Row])

Review Comment:
   Deserialize result of `observedMetrics` has problem now,  `schema` of `GenericRowWithSchema` is null.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40892: [SPARK-43128][CONNECT] Make `recentProgress` and `lastProgress` return `StreamingQueryProgress` consistent with the native Scala Api

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40892:
URL: https://github.com/apache/spark/pull/40892#discussion_r1173272269


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/progress.scala:
##########
@@ -17,6 +17,297 @@
 
 package org.apache.spark.sql.streaming
 
-class StreamingQueryProgress private[sql] (val json: String) {
-  // TODO(SPARK-43128): (Implement full object by parsing from json).
+import java.{util => ju}
+import java.lang.{Long => JLong}
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.util.control.NonFatal
+
+import com.fasterxml.jackson.annotation.{JsonSetter, Nulls}
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize
+import org.json4s._
+import org.json4s.JsonAST.JValue
+import org.json4s.JsonDSL._
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.annotation.Evolving
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
+import org.apache.spark.sql.streaming.SafeJsonSerializer.{safeDoubleToJValue, safeMapToJValue}
+import org.apache.spark.sql.streaming.SinkProgress.DEFAULT_NUM_OUTPUT_ROWS
+
+/**
+ * Information about updates made to stateful operators in a [[StreamingQuery]] during a trigger.
+ */
+@Evolving
+class StateOperatorProgress private[spark] (

Review Comment:
   Just used in `[spark]` , So no mima check was added
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] bogao007 commented on a diff in pull request #40892: [SPARK-43128][CONNECT] Make `recentProgress` and `lastProgress` return `StreamingQueryProgress` consistent with the native Scala Api

Posted by "bogao007 (via GitHub)" <gi...@apache.org>.
bogao007 commented on code in PR #40892:
URL: https://github.com/apache/spark/pull/40892#discussion_r1175960363


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryProgressSuite.scala:
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
+import org.apache.spark.sql.connect.client.util.ConnectFunSuite
+import org.apache.spark.sql.types.StructType
+
+class StreamingQueryProgressSuite extends ConnectFunSuite {
+  test("test seder StreamingQueryProgress from json") {
+    val jsonStringFromServerSide =
+      s"""
+         |{
+         |  "id" : "33ac26f4-1c39-46ce-b798-f3d2a21211d4",
+         |  "runId" : "849c2c9a-b9f8-446f-9180-259a60fd888c",
+         |  "name" : "myName",
+         |  "timestamp" : "2016-12-05T20:54:20.827Z",
+         |  "batchId" : 2,
+         |  "batchDuration" : 0,
+         |  "durationMs" : {
+         |    "total" : 0
+         |  },
+         |  "eventTime" : {
+         |    "min" : "2016-12-05T20:54:20.827Z",
+         |    "avg" : "2016-12-05T20:54:20.827Z",
+         |    "watermark" : "2016-12-05T20:54:20.827Z",
+         |    "max" : "2016-12-05T20:54:20.827Z"
+         |  },
+         |  "stateOperators" : [ {
+         |    "operatorName" : "op1",
+         |    "numRowsTotal" : 0,
+         |    "numRowsUpdated" : 1,
+         |    "allUpdatesTimeMs" : 1,
+         |    "numRowsRemoved" : 2,
+         |    "allRemovalsTimeMs" : 34,
+         |    "commitTimeMs" : 23,
+         |    "memoryUsedBytes" : 3,
+         |    "numRowsDroppedByWatermark" : 0,
+         |    "numShufflePartitions" : 2,
+         |    "numStateStoreInstances" : 2,
+         |    "customMetrics" : {
+         |      "stateOnCurrentVersionSizeBytes" : 2,
+         |      "loadedMapCacheHitCount" : 1,
+         |      "loadedMapCacheMissCount" : 0
+         |    }
+         |  } ],
+         |  "sources" : [ {
+         |    "description" : "source",
+         |    "startOffset" : "123",
+         |    "endOffset" : "456",
+         |    "latestOffset" : "789",
+         |    "numInputRows" : 678,
+         |    "inputRowsPerSecond" : 10.0,
+         |    "processedRowsPerSecond" : "Infinity",
+         |    "metrics" : { }
+         |  } ],
+         |  "sink" : {
+         |    "description" : "sink",
+         |    "numOutputRows" : -1,
+         |    "metrics" : { }
+         |  },
+         |  "observedMetrics" : {
+         |    "event1" : {
+         |      "values" : [ 1, 3.0 ],
+         |      "schema" : {
+         |        "type" : "struct",
+         |        "fields" : [ {
+         |          "name" : "c1",
+         |          "type" : "long",
+         |          "nullable" : true,
+         |          "metadata" : { }
+         |        }, {
+         |          "name" : "c2",
+         |          "type" : "double",
+         |          "nullable" : true,
+         |          "metadata" : { }
+         |        } ]
+         |      }
+         |    },
+         |    "event2" : {
+         |      "values" : [ 1, "hello", "world" ],
+         |      "schema" : {
+         |        "type" : "struct",
+         |        "fields" : [ {
+         |          "name" : "rc",
+         |          "type" : "long",
+         |          "nullable" : true,
+         |          "metadata" : { }
+         |        }, {
+         |          "name" : "min_q",
+         |          "type" : "string",
+         |          "nullable" : true,
+         |          "metadata" : { }
+         |        }, {
+         |          "name" : "max_q",
+         |          "type" : "string",
+         |          "nullable" : true,
+         |          "metadata" : { }
+         |        } ]
+         |      }
+         |    }
+         |  }
+         |}
+      """.stripMargin.trim
+
+    // scalastyle:off
+    println(jsonStringFromServerSide)
+
+    val result = StreamingQueryProgress.fromJson(jsonStringFromServerSide)
+    assert(result.id.toString === "33ac26f4-1c39-46ce-b798-f3d2a21211d4")
+    assert(result.runId.toString === "849c2c9a-b9f8-446f-9180-259a60fd888c")
+    assert(result.numInputRows === 678)

Review Comment:
   Maybe better to have multiple sources in the test case and check if the `numInputRows` equals to the sum of the individual `numInputRows` in different sources.



##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/progress.scala:
##########
@@ -17,6 +17,297 @@
 
 package org.apache.spark.sql.streaming
 
-class StreamingQueryProgress private[sql] (val json: String) {
-  // TODO(SPARK-43128): (Implement full object by parsing from json).
+import java.{util => ju}
+import java.lang.{Long => JLong}
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.util.control.NonFatal
+
+import com.fasterxml.jackson.annotation.{JsonSetter, Nulls}
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize
+import org.json4s._
+import org.json4s.JsonAST.JValue
+import org.json4s.JsonDSL._
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.annotation.Evolving
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
+import org.apache.spark.sql.streaming.SafeJsonSerializer.{safeDoubleToJValue, safeMapToJValue}
+import org.apache.spark.sql.streaming.SinkProgress.DEFAULT_NUM_OUTPUT_ROWS
+
+/**
+ * Information about updates made to stateful operators in a [[StreamingQuery]] during a trigger.
+ */
+@Evolving
+class StateOperatorProgress private[spark] (
+    val operatorName: String,
+    val numRowsTotal: Long,
+    val numRowsUpdated: Long,
+    val allUpdatesTimeMs: Long,
+    val numRowsRemoved: Long,
+    val allRemovalsTimeMs: Long,
+    val commitTimeMs: Long,
+    val memoryUsedBytes: Long,
+    val numRowsDroppedByWatermark: Long,
+    val numShufflePartitions: Long,
+    val numStateStoreInstances: Long,
+    val customMetrics: ju.Map[String, JLong] = new ju.HashMap())
+    extends Serializable {
+
+  /** The compact JSON representation of this progress. */
+  def json: String = compact(render(jsonValue))
+
+  /** The pretty (i.e. indented) JSON representation of this progress. */
+  def prettyJson: String = pretty(render(jsonValue))
+
+  private[sql] def copy(
+      newNumRowsUpdated: Long,
+      newNumRowsDroppedByWatermark: Long): StateOperatorProgress =
+    new StateOperatorProgress(
+      operatorName = operatorName,
+      numRowsTotal = numRowsTotal,
+      numRowsUpdated = newNumRowsUpdated,
+      allUpdatesTimeMs = allUpdatesTimeMs,
+      numRowsRemoved = numRowsRemoved,
+      allRemovalsTimeMs = allRemovalsTimeMs,
+      commitTimeMs = commitTimeMs,
+      memoryUsedBytes = memoryUsedBytes,
+      numRowsDroppedByWatermark = newNumRowsDroppedByWatermark,
+      numShufflePartitions = numShufflePartitions,
+      numStateStoreInstances = numStateStoreInstances,
+      customMetrics = customMetrics)
+
+  private[sql] def jsonValue: JValue = {
+    ("operatorName" -> JString(operatorName)) ~
+      ("numRowsTotal" -> JInt(numRowsTotal)) ~
+      ("numRowsUpdated" -> JInt(numRowsUpdated)) ~
+      ("allUpdatesTimeMs" -> JInt(allUpdatesTimeMs)) ~
+      ("numRowsRemoved" -> JInt(numRowsRemoved)) ~
+      ("allRemovalsTimeMs" -> JInt(allRemovalsTimeMs)) ~
+      ("commitTimeMs" -> JInt(commitTimeMs)) ~
+      ("memoryUsedBytes" -> JInt(memoryUsedBytes)) ~
+      ("numRowsDroppedByWatermark" -> JInt(numRowsDroppedByWatermark)) ~
+      ("numShufflePartitions" -> JInt(numShufflePartitions)) ~
+      ("numStateStoreInstances" -> JInt(numStateStoreInstances)) ~
+      ("customMetrics" -> {
+        if (!customMetrics.isEmpty) {
+          val keys = customMetrics.keySet.asScala.toSeq.sorted
+          keys.map { k => k -> JInt(customMetrics.get(k).toLong): JObject }.reduce(_ ~ _)
+        } else {
+          JNothing
+        }
+      })
+  }
+
+  override def toString: String = prettyJson
+}
+
+/**
+ * Information about progress made in the execution of a [[StreamingQuery]] during a trigger. Each
+ * event relates to processing done for a single trigger of the streaming query. Events are
+ * emitted even when no new data is available to be processed.
+ *
+ * @param id
+ *   A unique query id that persists across restarts. See `StreamingQuery.id()`.
+ * @param runId
+ *   A query id that is unique for every start/restart. See `StreamingQuery.runId()`.
+ * @param name
+ *   User-specified name of the query, null if not specified.
+ * @param timestamp
+ *   Beginning time of the trigger in ISO8601 format, i.e. UTC timestamps.
+ * @param batchId
+ *   A unique id for the current batch of data being processed. Note that in the case of retries
+ *   after a failure a given batchId my be executed more than once. Similarly, when there is no
+ *   data to be processed, the batchId will not be incremented.
+ * @param batchDuration
+ *   The process duration of each batch.
+ * @param durationMs
+ *   The amount of time taken to perform various operations in milliseconds.
+ * @param eventTime
+ *   Statistics of event time seen in this batch. It may contain the following keys:
+ *   {{{
+ *                   "max" -> "2016-12-05T20:54:20.827Z"  // maximum event time seen in this trigger
+ *                   "min" -> "2016-12-05T20:54:20.827Z"  // minimum event time seen in this trigger
+ *                   "avg" -> "2016-12-05T20:54:20.827Z"  // average event time seen in this trigger
+ *                   "watermark" -> "2016-12-05T20:54:20.827Z"  // watermark used in this trigger
+ *   }}}
+ *   All timestamps are in ISO8601 format, i.e. UTC timestamps.
+ * @param stateOperators
+ *   Information about operators in the query that store state.
+ * @param sources
+ *   detailed statistics on data being read from each of the streaming sources.
+ * @since 3.5.0
+ */
+@Evolving
+class StreamingQueryProgress private[spark] (
+    val id: UUID,
+    val runId: UUID,
+    val name: String,
+    val timestamp: String,
+    val batchId: Long,
+    val batchDuration: Long,
+    val durationMs: ju.Map[String, JLong],
+    val eventTime: ju.Map[String, String],
+    val stateOperators: Array[StateOperatorProgress],
+    val sources: Array[SourceProgress],
+    val sink: SinkProgress,
+    @JsonSetter(nulls = Nulls.AS_EMPTY)
+    @JsonDeserialize(contentAs = classOf[GenericRowWithSchema])
+    val observedMetrics: ju.Map[String, Row])

Review Comment:
   Yep, we would not be able to deserialize `observedMetrics` without this change, thanks for the update!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40892: [SPARK-43128][CONNECT] Make `recentProgress` and `lastProgress` return `StreamingQueryProgress` consistent with the native Scala Api

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40892:
URL: https://github.com/apache/spark/pull/40892#discussion_r1175980270


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryProgressSuite.scala:
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
+import org.apache.spark.sql.connect.client.util.ConnectFunSuite
+import org.apache.spark.sql.types.StructType
+
+class StreamingQueryProgressSuite extends ConnectFunSuite {
+  test("test seder StreamingQueryProgress from json") {
+    val jsonStringFromServerSide =
+      s"""
+         |{
+         |  "id" : "33ac26f4-1c39-46ce-b798-f3d2a21211d4",
+         |  "runId" : "849c2c9a-b9f8-446f-9180-259a60fd888c",
+         |  "name" : "myName",
+         |  "timestamp" : "2016-12-05T20:54:20.827Z",
+         |  "batchId" : 2,
+         |  "batchDuration" : 0,
+         |  "durationMs" : {
+         |    "total" : 0
+         |  },
+         |  "eventTime" : {
+         |    "min" : "2016-12-05T20:54:20.827Z",
+         |    "avg" : "2016-12-05T20:54:20.827Z",
+         |    "watermark" : "2016-12-05T20:54:20.827Z",
+         |    "max" : "2016-12-05T20:54:20.827Z"
+         |  },
+         |  "stateOperators" : [ {
+         |    "operatorName" : "op1",
+         |    "numRowsTotal" : 0,
+         |    "numRowsUpdated" : 1,
+         |    "allUpdatesTimeMs" : 1,
+         |    "numRowsRemoved" : 2,
+         |    "allRemovalsTimeMs" : 34,
+         |    "commitTimeMs" : 23,
+         |    "memoryUsedBytes" : 3,
+         |    "numRowsDroppedByWatermark" : 0,
+         |    "numShufflePartitions" : 2,
+         |    "numStateStoreInstances" : 2,
+         |    "customMetrics" : {
+         |      "stateOnCurrentVersionSizeBytes" : 2,
+         |      "loadedMapCacheHitCount" : 1,
+         |      "loadedMapCacheMissCount" : 0
+         |    }
+         |  } ],
+         |  "sources" : [ {
+         |    "description" : "source",
+         |    "startOffset" : "123",
+         |    "endOffset" : "456",
+         |    "latestOffset" : "789",
+         |    "numInputRows" : 678,
+         |    "inputRowsPerSecond" : 10.0,
+         |    "processedRowsPerSecond" : "Infinity",
+         |    "metrics" : { }
+         |  } ],
+         |  "sink" : {
+         |    "description" : "sink",
+         |    "numOutputRows" : -1,
+         |    "metrics" : { }
+         |  },
+         |  "observedMetrics" : {
+         |    "event1" : {
+         |      "values" : [ 1, 3.0 ],
+         |      "schema" : {
+         |        "type" : "struct",
+         |        "fields" : [ {
+         |          "name" : "c1",
+         |          "type" : "long",
+         |          "nullable" : true,
+         |          "metadata" : { }
+         |        }, {
+         |          "name" : "c2",
+         |          "type" : "double",
+         |          "nullable" : true,
+         |          "metadata" : { }
+         |        } ]
+         |      }
+         |    },
+         |    "event2" : {
+         |      "values" : [ 1, "hello", "world" ],
+         |      "schema" : {
+         |        "type" : "struct",
+         |        "fields" : [ {
+         |          "name" : "rc",
+         |          "type" : "long",
+         |          "nullable" : true,
+         |          "metadata" : { }
+         |        }, {
+         |          "name" : "min_q",
+         |          "type" : "string",
+         |          "nullable" : true,
+         |          "metadata" : { }
+         |        }, {
+         |          "name" : "max_q",
+         |          "type" : "string",
+         |          "nullable" : true,
+         |          "metadata" : { }
+         |        } ]
+         |      }
+         |    }
+         |  }
+         |}
+      """.stripMargin.trim
+
+    // scalastyle:off
+    println(jsonStringFromServerSide)
+
+    val result = StreamingQueryProgress.fromJson(jsonStringFromServerSide)
+    assert(result.id.toString === "33ac26f4-1c39-46ce-b798-f3d2a21211d4")
+    assert(result.runId.toString === "849c2c9a-b9f8-446f-9180-259a60fd888c")
+    assert(result.numInputRows === 678)

Review Comment:
   Added a `sources` item, there change to `assert(result.numInputRows === 1467) // 678 + 789`
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #40892: [SPARK-43128][CONNECT] Make `recentProgress` and `lastProgress` return `StreamingQueryProgress` consistent with the native Scala Api

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on PR #40892:
URL: https://github.com/apache/spark/pull/40892#issuecomment-1517226334

   cc @HeartSaVioR 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40892: [SPARK-43128][CONNECT] Make `recentProgress` and `lastProgress` return `StreamingQueryProgress` consistent with the native Scala Api

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40892:
URL: https://github.com/apache/spark/pull/40892#discussion_r1173464572


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/progress.scala:
##########
@@ -17,6 +17,297 @@
 
 package org.apache.spark.sql.streaming
 
-class StreamingQueryProgress private[sql] (val json: String) {
-  // TODO(SPARK-43128): (Implement full object by parsing from json).
+import java.{util => ju}
+import java.lang.{Long => JLong}
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.util.control.NonFatal
+
+import com.fasterxml.jackson.annotation.{JsonSetter, Nulls}
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize
+import org.json4s._
+import org.json4s.JsonAST.JValue
+import org.json4s.JsonDSL._
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.annotation.Evolving
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
+import org.apache.spark.sql.streaming.SafeJsonSerializer.{safeDoubleToJValue, safeMapToJValue}
+import org.apache.spark.sql.streaming.SinkProgress.DEFAULT_NUM_OUTPUT_ROWS
+
+/**
+ * Information about updates made to stateful operators in a [[StreamingQuery]] during a trigger.
+ */
+@Evolving
+class StateOperatorProgress private[spark] (
+    val operatorName: String,
+    val numRowsTotal: Long,
+    val numRowsUpdated: Long,
+    val allUpdatesTimeMs: Long,
+    val numRowsRemoved: Long,
+    val allRemovalsTimeMs: Long,
+    val commitTimeMs: Long,
+    val memoryUsedBytes: Long,
+    val numRowsDroppedByWatermark: Long,
+    val numShufflePartitions: Long,
+    val numStateStoreInstances: Long,
+    val customMetrics: ju.Map[String, JLong] = new ju.HashMap())
+    extends Serializable {
+
+  /** The compact JSON representation of this progress. */
+  def json: String = compact(render(jsonValue))
+
+  /** The pretty (i.e. indented) JSON representation of this progress. */
+  def prettyJson: String = pretty(render(jsonValue))
+
+  private[sql] def copy(
+      newNumRowsUpdated: Long,
+      newNumRowsDroppedByWatermark: Long): StateOperatorProgress =
+    new StateOperatorProgress(
+      operatorName = operatorName,
+      numRowsTotal = numRowsTotal,
+      numRowsUpdated = newNumRowsUpdated,
+      allUpdatesTimeMs = allUpdatesTimeMs,
+      numRowsRemoved = numRowsRemoved,
+      allRemovalsTimeMs = allRemovalsTimeMs,
+      commitTimeMs = commitTimeMs,
+      memoryUsedBytes = memoryUsedBytes,
+      numRowsDroppedByWatermark = newNumRowsDroppedByWatermark,
+      numShufflePartitions = numShufflePartitions,
+      numStateStoreInstances = numStateStoreInstances,
+      customMetrics = customMetrics)
+
+  private[sql] def jsonValue: JValue = {
+    ("operatorName" -> JString(operatorName)) ~
+      ("numRowsTotal" -> JInt(numRowsTotal)) ~
+      ("numRowsUpdated" -> JInt(numRowsUpdated)) ~
+      ("allUpdatesTimeMs" -> JInt(allUpdatesTimeMs)) ~
+      ("numRowsRemoved" -> JInt(numRowsRemoved)) ~
+      ("allRemovalsTimeMs" -> JInt(allRemovalsTimeMs)) ~
+      ("commitTimeMs" -> JInt(commitTimeMs)) ~
+      ("memoryUsedBytes" -> JInt(memoryUsedBytes)) ~
+      ("numRowsDroppedByWatermark" -> JInt(numRowsDroppedByWatermark)) ~
+      ("numShufflePartitions" -> JInt(numShufflePartitions)) ~
+      ("numStateStoreInstances" -> JInt(numStateStoreInstances)) ~
+      ("customMetrics" -> {
+        if (!customMetrics.isEmpty) {
+          val keys = customMetrics.keySet.asScala.toSeq.sorted
+          keys.map { k => k -> JInt(customMetrics.get(k).toLong): JObject }.reduce(_ ~ _)
+        } else {
+          JNothing
+        }
+      })
+  }
+
+  override def toString: String = prettyJson
+}
+
+/**
+ * Information about progress made in the execution of a [[StreamingQuery]] during a trigger. Each
+ * event relates to processing done for a single trigger of the streaming query. Events are
+ * emitted even when no new data is available to be processed.
+ *
+ * @param id
+ *   A unique query id that persists across restarts. See `StreamingQuery.id()`.
+ * @param runId
+ *   A query id that is unique for every start/restart. See `StreamingQuery.runId()`.
+ * @param name
+ *   User-specified name of the query, null if not specified.
+ * @param timestamp
+ *   Beginning time of the trigger in ISO8601 format, i.e. UTC timestamps.
+ * @param batchId
+ *   A unique id for the current batch of data being processed. Note that in the case of retries
+ *   after a failure a given batchId my be executed more than once. Similarly, when there is no
+ *   data to be processed, the batchId will not be incremented.
+ * @param batchDuration
+ *   The process duration of each batch.
+ * @param durationMs
+ *   The amount of time taken to perform various operations in milliseconds.
+ * @param eventTime
+ *   Statistics of event time seen in this batch. It may contain the following keys:
+ *   {{{
+ *                   "max" -> "2016-12-05T20:54:20.827Z"  // maximum event time seen in this trigger
+ *                   "min" -> "2016-12-05T20:54:20.827Z"  // minimum event time seen in this trigger
+ *                   "avg" -> "2016-12-05T20:54:20.827Z"  // average event time seen in this trigger
+ *                   "watermark" -> "2016-12-05T20:54:20.827Z"  // watermark used in this trigger
+ *   }}}
+ *   All timestamps are in ISO8601 format, i.e. UTC timestamps.
+ * @param stateOperators
+ *   Information about operators in the query that store state.
+ * @param sources
+ *   detailed statistics on data being read from each of the streaming sources.
+ * @since 3.5.0
+ */
+@Evolving
+class StreamingQueryProgress private[spark] (
+    val id: UUID,
+    val runId: UUID,
+    val name: String,
+    val timestamp: String,
+    val batchId: Long,
+    val batchDuration: Long,
+    val durationMs: ju.Map[String, JLong],
+    val eventTime: ju.Map[String, String],
+    val stateOperators: Array[StateOperatorProgress],
+    val sources: Array[SourceProgress],
+    val sink: SinkProgress,
+    @JsonSetter(nulls = Nulls.AS_EMPTY)
+    @JsonDeserialize(contentAs = classOf[GenericRowWithSchema])
+    val observedMetrics: ju.Map[String, Row])

Review Comment:
   Deserialize result of `observedMetrics` has problem, `scheam` fo GenericRowWithSchema is null



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org