You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "bogao007 (via GitHub)" <gi...@apache.org> on 2023/04/21 07:15:17 UTC

[GitHub] [spark] bogao007 commented on a diff in pull request #40892: [SPARK-43128][CONNECT] Make `recentProgress` and `lastProgress` return `StreamingQueryProgress` consistent with the native Scala Api

bogao007 commented on code in PR #40892:
URL: https://github.com/apache/spark/pull/40892#discussion_r1173392468


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/progress.scala:
##########
@@ -17,6 +17,297 @@
 
 package org.apache.spark.sql.streaming
 
-class StreamingQueryProgress private[sql] (val json: String) {
-  // TODO(SPARK-43128): (Implement full object by parsing from json).
+import java.{util => ju}
+import java.lang.{Long => JLong}
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.util.control.NonFatal
+
+import com.fasterxml.jackson.annotation.{JsonSetter, Nulls}
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize
+import org.json4s._
+import org.json4s.JsonAST.JValue
+import org.json4s.JsonDSL._
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.annotation.Evolving
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
+import org.apache.spark.sql.streaming.SafeJsonSerializer.{safeDoubleToJValue, safeMapToJValue}
+import org.apache.spark.sql.streaming.SinkProgress.DEFAULT_NUM_OUTPUT_ROWS
+
+/**
+ * Information about updates made to stateful operators in a [[StreamingQuery]] during a trigger.
+ */
+@Evolving
+class StateOperatorProgress private[spark] (
+    val operatorName: String,
+    val numRowsTotal: Long,
+    val numRowsUpdated: Long,
+    val allUpdatesTimeMs: Long,
+    val numRowsRemoved: Long,
+    val allRemovalsTimeMs: Long,
+    val commitTimeMs: Long,
+    val memoryUsedBytes: Long,
+    val numRowsDroppedByWatermark: Long,
+    val numShufflePartitions: Long,
+    val numStateStoreInstances: Long,
+    val customMetrics: ju.Map[String, JLong] = new ju.HashMap())
+    extends Serializable {
+
+  /** The compact JSON representation of this progress. */
+  def json: String = compact(render(jsonValue))
+
+  /** The pretty (i.e. indented) JSON representation of this progress. */
+  def prettyJson: String = pretty(render(jsonValue))
+
+  private[sql] def copy(
+      newNumRowsUpdated: Long,
+      newNumRowsDroppedByWatermark: Long): StateOperatorProgress =
+    new StateOperatorProgress(
+      operatorName = operatorName,
+      numRowsTotal = numRowsTotal,
+      numRowsUpdated = newNumRowsUpdated,
+      allUpdatesTimeMs = allUpdatesTimeMs,
+      numRowsRemoved = numRowsRemoved,
+      allRemovalsTimeMs = allRemovalsTimeMs,
+      commitTimeMs = commitTimeMs,
+      memoryUsedBytes = memoryUsedBytes,
+      numRowsDroppedByWatermark = newNumRowsDroppedByWatermark,
+      numShufflePartitions = numShufflePartitions,
+      numStateStoreInstances = numStateStoreInstances,
+      customMetrics = customMetrics)
+
+  private[sql] def jsonValue: JValue = {
+    ("operatorName" -> JString(operatorName)) ~
+      ("numRowsTotal" -> JInt(numRowsTotal)) ~
+      ("numRowsUpdated" -> JInt(numRowsUpdated)) ~
+      ("allUpdatesTimeMs" -> JInt(allUpdatesTimeMs)) ~
+      ("numRowsRemoved" -> JInt(numRowsRemoved)) ~
+      ("allRemovalsTimeMs" -> JInt(allRemovalsTimeMs)) ~
+      ("commitTimeMs" -> JInt(commitTimeMs)) ~
+      ("memoryUsedBytes" -> JInt(memoryUsedBytes)) ~
+      ("numRowsDroppedByWatermark" -> JInt(numRowsDroppedByWatermark)) ~
+      ("numShufflePartitions" -> JInt(numShufflePartitions)) ~
+      ("numStateStoreInstances" -> JInt(numStateStoreInstances)) ~
+      ("customMetrics" -> {
+        if (!customMetrics.isEmpty) {
+          val keys = customMetrics.keySet.asScala.toSeq.sorted
+          keys.map { k => k -> JInt(customMetrics.get(k).toLong): JObject }.reduce(_ ~ _)
+        } else {
+          JNothing
+        }
+      })
+  }
+
+  override def toString: String = prettyJson
+}
+
+/**
+ * Information about progress made in the execution of a [[StreamingQuery]] during a trigger. Each
+ * event relates to processing done for a single trigger of the streaming query. Events are
+ * emitted even when no new data is available to be processed.
+ *
+ * @param id
+ *   A unique query id that persists across restarts. See `StreamingQuery.id()`.
+ * @param runId
+ *   A query id that is unique for every start/restart. See `StreamingQuery.runId()`.
+ * @param name
+ *   User-specified name of the query, null if not specified.
+ * @param timestamp
+ *   Beginning time of the trigger in ISO8601 format, i.e. UTC timestamps.
+ * @param batchId
+ *   A unique id for the current batch of data being processed. Note that in the case of retries
+ *   after a failure a given batchId my be executed more than once. Similarly, when there is no
+ *   data to be processed, the batchId will not be incremented.
+ * @param batchDuration
+ *   The process duration of each batch.
+ * @param durationMs
+ *   The amount of time taken to perform various operations in milliseconds.
+ * @param eventTime
+ *   Statistics of event time seen in this batch. It may contain the following keys:
+ *   {{{
+ *                   "max" -> "2016-12-05T20:54:20.827Z"  // maximum event time seen in this trigger
+ *                   "min" -> "2016-12-05T20:54:20.827Z"  // minimum event time seen in this trigger
+ *                   "avg" -> "2016-12-05T20:54:20.827Z"  // average event time seen in this trigger
+ *                   "watermark" -> "2016-12-05T20:54:20.827Z"  // watermark used in this trigger
+ *   }}}
+ *   All timestamps are in ISO8601 format, i.e. UTC timestamps.
+ * @param stateOperators
+ *   Information about operators in the query that store state.
+ * @param sources
+ *   detailed statistics on data being read from each of the streaming sources.
+ * @since 3.5.0
+ */
+@Evolving
+class StreamingQueryProgress private[spark] (
+    val id: UUID,
+    val runId: UUID,
+    val name: String,
+    val timestamp: String,
+    val batchId: Long,
+    val batchDuration: Long,
+    val durationMs: ju.Map[String, JLong],
+    val eventTime: ju.Map[String, String],
+    val stateOperators: Array[StateOperatorProgress],
+    val sources: Array[SourceProgress],
+    val sink: SinkProgress,
+    @JsonSetter(nulls = Nulls.AS_EMPTY)
+    @JsonDeserialize(contentAs = classOf[GenericRowWithSchema])
+    val observedMetrics: ju.Map[String, Row])
+    extends Serializable {
+
+  /** The aggregate (across all sources) number of records processed in a trigger. */
+  def numInputRows: Long = sources.map(_.numInputRows).sum

Review Comment:
   Aggregated metrics like `numInputRows`, `inputRowsPerSecond` and `processedRowsPerSecond` are being calculated inside this class. Can they be safely deserialized from json string? Might also worth to add them into the test case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org