You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "jerrypeng (via GitHub)" <gi...@apache.org> on 2024/02/14 23:54:58 UTC

[PR] [SPARK-47052][WIP] Separate state tracking variables from MicroBatchExecution/StreamExecution [spark]

jerrypeng opened a new pull request, #45109:
URL: https://github.com/apache/spark/pull/45109

   
   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'common/utils/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   
   To improve code clarity and maintainability, I propose that we move all the variables that track mutable state and metrics for streaming query into a separate class.  With this refactor, it would be easy to track and find all the mutable state a microbatch can have.  
   
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
   -->
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   <!--
   If generative AI tooling has been used in the process of authoring this patch, please include the
   phrase: 'Generated-by: ' followed by the name of the tool and its version.
   If no, write 'No'.
   Please refer to the [ASF Generative Tooling Guidance](https://www.apache.org/legal/generative-tooling.html) for details.
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47052][SS] Separate state tracking variables from MicroBatchExecution/StreamExecution [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR closed pull request #45109: [SPARK-47052][SS] Separate state tracking variables from MicroBatchExecution/StreamExecution
URL: https://github.com/apache/spark/pull/45109


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47052][SS] Separate state tracking variables from MicroBatchExecution/StreamExecution [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on PR #45109:
URL: https://github.com/apache/spark/pull/45109#issuecomment-1955844637

   The CI only failed from pyspark-connect and failures look to be unrelated (not related to streaming).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47052] Separate state tracking variables from MicroBatchExecution/StreamExecution [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #45109:
URL: https://github.com/apache/spark/pull/45109#discussion_r1495255965


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala:
##########
@@ -414,35 +497,91 @@ trait ProgressReporter extends Logging {
     }
   }
 
-  /** Extracts observed metrics from the most recent query execution. */
-  private def extractObservedMetrics(
-      hasNewData: Boolean,
-      lastExecution: QueryExecution): Map[String, Row] = {
-    if (!hasNewData || lastExecution == null) {
-      return Map.empty
+  /** Extract statistics about stateful operators from the executed query plan. */
+  private def extractStateOperatorMetrics(
+    lastExecution: IncrementalExecution): Seq[StateOperatorProgress] = {
+    assert(lastExecution != null, "lastExecution is not available")
+    lastExecution.executedPlan.collect {
+      case p if p.isInstanceOf[StateStoreWriter] =>
+        p.asInstanceOf[StateStoreWriter].getProgress()
     }
-    lastExecution.observedMetrics
   }
 
-  /** Records the duration of running `body` for the next query progress update. */
-  protected def reportTimeTaken[T](triggerDetailKey: String)(body: => T): T = {
-    val startTime = triggerClock.getTimeMillis()
-    val result = body
-    val endTime = triggerClock.getTimeMillis()
-    val timeTaken = math.max(endTime - startTime, 0)
+  /** Extracts statistics from the most recent query execution. */
+  private def extractExecutionStats(
+    hasNewData: Boolean,

Review Comment:
   nit: 2 more spaces



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala:
##########
@@ -414,35 +497,91 @@ trait ProgressReporter extends Logging {
     }
   }
 
-  /** Extracts observed metrics from the most recent query execution. */
-  private def extractObservedMetrics(
-      hasNewData: Boolean,
-      lastExecution: QueryExecution): Map[String, Row] = {
-    if (!hasNewData || lastExecution == null) {
-      return Map.empty
+  /** Extract statistics about stateful operators from the executed query plan. */
+  private def extractStateOperatorMetrics(
+    lastExecution: IncrementalExecution): Seq[StateOperatorProgress] = {
+    assert(lastExecution != null, "lastExecution is not available")
+    lastExecution.executedPlan.collect {
+      case p if p.isInstanceOf[StateStoreWriter] =>
+        p.asInstanceOf[StateStoreWriter].getProgress()
     }
-    lastExecution.observedMetrics
   }
 
-  /** Records the duration of running `body` for the next query progress update. */
-  protected def reportTimeTaken[T](triggerDetailKey: String)(body: => T): T = {
-    val startTime = triggerClock.getTimeMillis()
-    val result = body
-    val endTime = triggerClock.getTimeMillis()
-    val timeTaken = math.max(endTime - startTime, 0)
+  /** Extracts statistics from the most recent query execution. */
+  private def extractExecutionStats(
+    hasNewData: Boolean,
+    sourceToNumInputRows: Map[SparkDataStream, Long],
+    lastExecution: IncrementalExecution): ExecutionStats = {
+    val hasEventTime = progressReporter.logicalPlan().collect {
+      case e: EventTimeWatermark => e
+    }.nonEmpty
 
-    val previousTime = currentDurationsMs.getOrElse(triggerDetailKey, 0L)
-    currentDurationsMs.put(triggerDetailKey, previousTime + timeTaken)
-    logDebug(s"$triggerDetailKey took $timeTaken ms")
-    result
+    val watermarkTimestamp =
+      if (hasEventTime) {
+        Map("watermark" -> progressReporter.formatTimestamp(offsetSeqMetadata.batchWatermarkMs))
+      } else Map.empty[String, String]
+
+    // SPARK-19378: Still report metrics even though no data was processed while reporting progress.
+    val stateOperators = extractStateOperatorMetrics(lastExecution)
+
+    val sinkOutput = sinkCommitProgress.map(_.numOutputRows)
+
+    if (!hasNewData) {
+      return ExecutionStats(Map.empty, stateOperators, watermarkTimestamp, sinkOutput)
+    }
+
+    val eventTimeStats = lastExecution.executedPlan
+      .collect {
+        case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 =>
+          val stats = e.eventTimeStats.value
+          Map(
+            "max" -> stats.max,
+            "min" -> stats.min,
+            "avg" -> stats.avg.toLong).transform((_, v) => progressReporter.formatTimestamp(v))
+      }.headOption.getOrElse(Map.empty) ++ watermarkTimestamp
+
+    ExecutionStats(sourceToNumInputRows, stateOperators, eventTimeStats.toMap, sinkOutput)
   }
 
-  protected def formatTimestamp(millis: Long): String = {
-    timestampFormat.format(Instant.ofEpochMilli(millis))
+  /**
+   * Reset values in the execution stats to removes the values which are specific to the batch.
+   * New execution stats will only retain the values as a snapshot of the query status.
+   * (E.g. for stateful operators, numRowsTotal is a snapshot of the status, whereas
+   * numRowsUpdated is bound to the batch.)
+   * TODO: We do not seem to clear up all values in StateOperatorProgress which are bound to the
+   * batch. Fix this.
+   */
+  private def resetExecStatsForNoExecution(originExecStats: ExecutionStats): ExecutionStats = {
+    val newStatefulOperators = originExecStats.stateOperators.map { so =>
+      so.copy(newNumRowsUpdated = 0, newNumRowsDroppedByWatermark = 0)
+    }
+    val newEventTimeStats = if (originExecStats.eventTimeStats.contains("watermark")) {
+      Map("watermark" -> progressReporter.formatTimestamp(offsetSeqMetadata.batchWatermarkMs))
+    } else {
+      Map.empty[String, String]
+    }
+    val newOutputRows = originExecStats.outputRows.map(_ => 0L)
+    originExecStats.copy(
+      inputRows = Map.empty[SparkDataStream, Long],
+      stateOperators = newStatefulOperators,
+      eventTimeStats = newEventTimeStats,
+      outputRows = newOutputRows)
   }
 
-  /** Updates the message returned in `status`. */
-  protected def updateStatusMessage(message: String): Unit = {
-    currentStatus = currentStatus.copy(message = message)
+  /** Extracts observed metrics from the most recent query execution. */
+  private def extractObservedMetrics(
+    lastExecution: QueryExecution): Map[String, Row] = {

Review Comment:
   nit: 2 more spaces



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala:
##########
@@ -135,75 +219,121 @@ trait ProgressReporter extends Logging {
    * Record the offsets range this trigger will process. Call this before updating
    * `committedOffsets` in `StreamExecution` to make sure that the correct range is recorded.
    */
-  protected def recordTriggerOffsets(
-      from: StreamProgress,
-      to: StreamProgress,
-      latest: StreamProgress): Unit = {
+  def recordTriggerOffsets(
+    from: StreamProgress,

Review Comment:
   nit: 2 more spaces



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala:
##########
@@ -135,75 +219,121 @@ trait ProgressReporter extends Logging {
    * Record the offsets range this trigger will process. Call this before updating
    * `committedOffsets` in `StreamExecution` to make sure that the correct range is recorded.
    */
-  protected def recordTriggerOffsets(
-      from: StreamProgress,
-      to: StreamProgress,
-      latest: StreamProgress): Unit = {
+  def recordTriggerOffsets(
+    from: StreamProgress,
+    to: StreamProgress,
+    latest: StreamProgress): Unit = {
     currentTriggerStartOffsets = from.transform((_, v) => v.json)
     currentTriggerEndOffsets = to.transform((_, v) => v.json)
     currentTriggerLatestOffsets = latest.transform((_, v) => v.json)
     latestStreamProgress = to
+    currentTriggerLatestOffsets = latest.transform((_, v) => v.json)
   }
 
-  private def addNewProgress(newProgress: StreamingQueryProgress): Unit = {
-    progressBuffer.synchronized {
-      progressBuffer += newProgress
-      while (progressBuffer.length >= sparkSession.sessionState.conf.streamingProgressRetention) {
-        progressBuffer.dequeue()
-      }
-    }
-  }
+  /** Finalizes the trigger which did not execute a batch. */
+  def finishNoExecutionTrigger(lastExecutedEpochId: Long): Unit = {
+    currentTriggerEndTimestamp = triggerClock.getTimeMillis()
+    val processingTimeMills = currentTriggerEndTimestamp - currentTriggerStartTimestamp
 
-  private def updateProgress(newProgress: StreamingQueryProgress): Unit = {
-    // Reset noDataEventTimestamp if we processed any data
-    lastNoExecutionProgressEventTime = triggerClock.getTimeMillis()
+    val execStatsOnNoExecution = execStatsOnLatestExecutedBatch.map(resetExecStatsForNoExecution)
 
-    addNewProgress(newProgress)
-    postEvent(new QueryProgressEvent(newProgress))
-    logInfo(s"Streaming query made progress: $newProgress")
-  }
+    val newProgress = constructNewProgress(processingTimeMills, lastExecutedEpochId,
+      execStatsOnNoExecution, Map.empty[String, Row])
 
-  private def updateIdleness(newProgress: StreamingQueryProgress): Unit = {
-    val now = triggerClock.getTimeMillis()
-    if (now - noDataProgressEventInterval >= lastNoExecutionProgressEventTime) {
-      addNewProgress(newProgress)
-      if (lastNoExecutionProgressEventTime > Long.MinValue) {
-        postEvent(new QueryIdleEvent(newProgress.id, newProgress.runId,
-          formatTimestamp(currentTriggerStartTimestamp)))
-        logInfo(s"Streaming query has been idle and waiting for new data more than " +
-          s"$noDataProgressEventInterval ms.")
-      }
+    progressReporter.updateIdleness(id, runId, currentTriggerStartTimestamp, newProgress)
 
-      lastNoExecutionProgressEventTime = now
-    }
+    warnIfFinishTriggerTakesTooLong(currentTriggerEndTimestamp, processingTimeMills)
+
+    currentStatus = currentStatus.copy(isTriggerActive = false)
+  }
+
+  /**
+   * Retrieve a measured duration
+   */
+  def getDuration(key: String): Option[Long] = {
+    currentDurationsMs.get(key)
   }
 
   /**
    * Finalizes the query progress and adds it to list of recent status updates.
    *
    * @param hasNewData Whether the sources of this stream had new data for this trigger.
-   * @param hasExecuted Whether any batch was executed during this trigger. Streaming queries that
-   *                    perform stateful aggregations with timeouts can still run batches even
-   *                    though the sources don't have any new data.
    */
-  protected def finishTrigger(hasNewData: Boolean, hasExecuted: Boolean): Unit = {
-    assert(currentTriggerStartOffsets != null && currentTriggerEndOffsets != null &&
-      currentTriggerLatestOffsets != null)
+  def finishTrigger(
+    hasNewData: Boolean,
+    sourceToNumInputRowsMap: Map[SparkDataStream, Long],
+    lastExecution: IncrementalExecution,
+    lastEpochId: Long): Unit = {
+    assert(
+      currentTriggerStartOffsets != null && currentTriggerEndOffsets != null &&
+        currentTriggerLatestOffsets != null
+    )
     currentTriggerEndTimestamp = triggerClock.getTimeMillis()
-
-    val executionStats = extractExecutionStats(hasNewData, hasExecuted)
     val processingTimeMills = currentTriggerEndTimestamp - currentTriggerStartTimestamp
+    assert(lastExecution != null, "executed batch should provide the information for execution.")
+    val execStats = extractExecutionStats(hasNewData, sourceToNumInputRowsMap, lastExecution)
+    logDebug(s"Execution stats: $execStats")
+
+    val observedMetrics = extractObservedMetrics(lastExecution)
+    val newProgress = constructNewProgress(processingTimeMills, lastEpochId, Some(execStats),
+      observedMetrics)
+
+    progressReporter.lastNoExecutionProgressEventTime = triggerClock.getTimeMillis()
+    progressReporter.updateProgress(newProgress)
+
+    // Update the value since this trigger executes a batch successfully.
+    this.execStatsOnLatestExecutedBatch = Some(execStats)
+
+    warnIfFinishTriggerTakesTooLong(currentTriggerEndTimestamp, processingTimeMills)
+
+    currentStatus = currentStatus.copy(isTriggerActive = false)
+  }
+
+  private def constructNewProgress(
+    processingTimeMills: Long,

Review Comment:
   nit: 2 more spaces



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala:
##########
@@ -414,35 +497,91 @@ trait ProgressReporter extends Logging {
     }
   }
 
-  /** Extracts observed metrics from the most recent query execution. */
-  private def extractObservedMetrics(
-      hasNewData: Boolean,
-      lastExecution: QueryExecution): Map[String, Row] = {
-    if (!hasNewData || lastExecution == null) {
-      return Map.empty
+  /** Extract statistics about stateful operators from the executed query plan. */
+  private def extractStateOperatorMetrics(
+    lastExecution: IncrementalExecution): Seq[StateOperatorProgress] = {

Review Comment:
   nit: 2 more spaces



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala:
##########
@@ -220,94 +350,47 @@ trait ProgressReporter extends Logging {
         metrics = sourceMetrics
       )
     }
+  }
 
-    val sinkOutput = if (hasExecuted) {
-      sinkCommitProgress.map(_.numOutputRows)
-    } else {
-      sinkCommitProgress.map(_ => 0L)
-    }
-
+  private def extractSinkProgress(execStats: Option[ExecutionStats]): SinkProgress = {
+    val sinkOutput = execStats.flatMap(_.outputRows)
     val sinkMetrics = sink match {
-      case withMetrics: ReportsSinkMetrics =>
-        withMetrics.metrics()
+      case withMetrics: ReportsSinkMetrics => withMetrics.metrics()
       case _ => Map[String, String]().asJava
     }
 
-    val sinkProgress = SinkProgress(
-      sink.toString, sinkOutput, sinkMetrics)
-
-    val observedMetrics = extractObservedMetrics(hasNewData, lastExecution)
-
-    val newProgress = new StreamingQueryProgress(
-      id = id,
-      runId = runId,
-      name = name,
-      timestamp = formatTimestamp(currentTriggerStartTimestamp),
-      batchId = currentBatchId,
-      batchDuration = processingTimeMills,
-      durationMs =
-        new java.util.HashMap(currentDurationsMs.toMap.transform((_, v) => long2Long(v)).asJava),
-      eventTime = new java.util.HashMap(executionStats.eventTimeStats.asJava),
-      stateOperators = executionStats.stateOperators.toArray,
-      sources = sourceProgress.toArray,
-      sink = sinkProgress,
-      observedMetrics = new java.util.HashMap(observedMetrics.asJava))
-
-    if (hasExecuted) {
-      updateProgress(newProgress)
-    } else {
-      updateIdleness(newProgress)
-    }
-
-    currentStatus = currentStatus.copy(isTriggerActive = false)
+    SinkProgress(sink.toString, sinkOutput, sinkMetrics)
   }
 
-  /** Extract statistics about stateful operators from the executed query plan. */
-  private def extractStateOperatorMetrics(hasExecuted: Boolean): Seq[StateOperatorProgress] = {
-    if (lastExecution == null) return Nil
-    // lastExecution could belong to one of the previous triggers if `!hasExecuted`.
-    // Walking the plan again should be inexpensive.
-    lastExecution.executedPlan.collect {
-      case p if p.isInstanceOf[StateStoreWriter] =>
-        val progress = p.asInstanceOf[StateStoreWriter].getProgress()
-        if (hasExecuted) {
-          progress
-        } else {
-          progress.copy(newNumRowsUpdated = 0, newNumRowsDroppedByWatermark = 0)
-        }
-    }
+  /**
+   * Override of finishTrigger to extract the map from IncrementalExecution.
+   */
+  def finishTrigger(
+    hasNewData: Boolean,

Review Comment:
   nit: 2 more spaces



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala:
##########
@@ -220,94 +350,47 @@ trait ProgressReporter extends Logging {
         metrics = sourceMetrics
       )
     }
+  }
 
-    val sinkOutput = if (hasExecuted) {
-      sinkCommitProgress.map(_.numOutputRows)
-    } else {
-      sinkCommitProgress.map(_ => 0L)
-    }
-
+  private def extractSinkProgress(execStats: Option[ExecutionStats]): SinkProgress = {
+    val sinkOutput = execStats.flatMap(_.outputRows)
     val sinkMetrics = sink match {
-      case withMetrics: ReportsSinkMetrics =>
-        withMetrics.metrics()
+      case withMetrics: ReportsSinkMetrics => withMetrics.metrics()
       case _ => Map[String, String]().asJava
     }
 
-    val sinkProgress = SinkProgress(
-      sink.toString, sinkOutput, sinkMetrics)
-
-    val observedMetrics = extractObservedMetrics(hasNewData, lastExecution)
-
-    val newProgress = new StreamingQueryProgress(
-      id = id,
-      runId = runId,
-      name = name,
-      timestamp = formatTimestamp(currentTriggerStartTimestamp),
-      batchId = currentBatchId,
-      batchDuration = processingTimeMills,
-      durationMs =
-        new java.util.HashMap(currentDurationsMs.toMap.transform((_, v) => long2Long(v)).asJava),
-      eventTime = new java.util.HashMap(executionStats.eventTimeStats.asJava),
-      stateOperators = executionStats.stateOperators.toArray,
-      sources = sourceProgress.toArray,
-      sink = sinkProgress,
-      observedMetrics = new java.util.HashMap(observedMetrics.asJava))
-
-    if (hasExecuted) {
-      updateProgress(newProgress)
-    } else {
-      updateIdleness(newProgress)
-    }
-
-    currentStatus = currentStatus.copy(isTriggerActive = false)
+    SinkProgress(sink.toString, sinkOutput, sinkMetrics)
   }
 
-  /** Extract statistics about stateful operators from the executed query plan. */
-  private def extractStateOperatorMetrics(hasExecuted: Boolean): Seq[StateOperatorProgress] = {
-    if (lastExecution == null) return Nil
-    // lastExecution could belong to one of the previous triggers if `!hasExecuted`.
-    // Walking the plan again should be inexpensive.
-    lastExecution.executedPlan.collect {
-      case p if p.isInstanceOf[StateStoreWriter] =>
-        val progress = p.asInstanceOf[StateStoreWriter].getProgress()
-        if (hasExecuted) {
-          progress
-        } else {
-          progress.copy(newNumRowsUpdated = 0, newNumRowsDroppedByWatermark = 0)
-        }
-    }
+  /**
+   * Override of finishTrigger to extract the map from IncrementalExecution.
+   */
+  def finishTrigger(
+    hasNewData: Boolean,
+    lastExecution: IncrementalExecution,
+    lastEpoch: Long): Unit = {
+    val map: Map[SparkDataStream, Long] =
+      if (hasNewData) extractSourceToNumInputRows(lastExecution) else Map.empty
+    finishTrigger(hasNewData, map, lastExecution, lastEpoch)
   }
 
-  /** Extracts statistics from the most recent query execution. */
-  private def extractExecutionStats(hasNewData: Boolean, hasExecuted: Boolean): ExecutionStats = {
-    val hasEventTime = logicalPlan.collect { case e: EventTimeWatermark => e }.nonEmpty
-    val watermarkTimestamp =
-      if (hasEventTime) Map("watermark" -> formatTimestamp(offsetSeqMetadata.batchWatermarkMs))
-      else Map.empty[String, String]
-
-    // SPARK-19378: Still report metrics even though no data was processed while reporting progress.
-    val stateOperators = extractStateOperatorMetrics(hasExecuted)
-
-    if (!hasNewData) {
-      return ExecutionStats(Map.empty, stateOperators, watermarkTimestamp)
+  private def warnIfFinishTriggerTakesTooLong(
+    triggerEndTimestamp: Long,

Review Comment:
   nit: 2 more spaces



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala:
##########
@@ -135,75 +219,121 @@ trait ProgressReporter extends Logging {
    * Record the offsets range this trigger will process. Call this before updating
    * `committedOffsets` in `StreamExecution` to make sure that the correct range is recorded.
    */
-  protected def recordTriggerOffsets(
-      from: StreamProgress,
-      to: StreamProgress,
-      latest: StreamProgress): Unit = {
+  def recordTriggerOffsets(
+    from: StreamProgress,
+    to: StreamProgress,
+    latest: StreamProgress): Unit = {
     currentTriggerStartOffsets = from.transform((_, v) => v.json)
     currentTriggerEndOffsets = to.transform((_, v) => v.json)
     currentTriggerLatestOffsets = latest.transform((_, v) => v.json)
     latestStreamProgress = to
+    currentTriggerLatestOffsets = latest.transform((_, v) => v.json)
   }
 
-  private def addNewProgress(newProgress: StreamingQueryProgress): Unit = {
-    progressBuffer.synchronized {
-      progressBuffer += newProgress
-      while (progressBuffer.length >= sparkSession.sessionState.conf.streamingProgressRetention) {
-        progressBuffer.dequeue()
-      }
-    }
-  }
+  /** Finalizes the trigger which did not execute a batch. */
+  def finishNoExecutionTrigger(lastExecutedEpochId: Long): Unit = {
+    currentTriggerEndTimestamp = triggerClock.getTimeMillis()
+    val processingTimeMills = currentTriggerEndTimestamp - currentTriggerStartTimestamp
 
-  private def updateProgress(newProgress: StreamingQueryProgress): Unit = {
-    // Reset noDataEventTimestamp if we processed any data
-    lastNoExecutionProgressEventTime = triggerClock.getTimeMillis()
+    val execStatsOnNoExecution = execStatsOnLatestExecutedBatch.map(resetExecStatsForNoExecution)
 
-    addNewProgress(newProgress)
-    postEvent(new QueryProgressEvent(newProgress))
-    logInfo(s"Streaming query made progress: $newProgress")
-  }
+    val newProgress = constructNewProgress(processingTimeMills, lastExecutedEpochId,
+      execStatsOnNoExecution, Map.empty[String, Row])
 
-  private def updateIdleness(newProgress: StreamingQueryProgress): Unit = {
-    val now = triggerClock.getTimeMillis()
-    if (now - noDataProgressEventInterval >= lastNoExecutionProgressEventTime) {
-      addNewProgress(newProgress)
-      if (lastNoExecutionProgressEventTime > Long.MinValue) {
-        postEvent(new QueryIdleEvent(newProgress.id, newProgress.runId,
-          formatTimestamp(currentTriggerStartTimestamp)))
-        logInfo(s"Streaming query has been idle and waiting for new data more than " +
-          s"$noDataProgressEventInterval ms.")
-      }
+    progressReporter.updateIdleness(id, runId, currentTriggerStartTimestamp, newProgress)
 
-      lastNoExecutionProgressEventTime = now
-    }
+    warnIfFinishTriggerTakesTooLong(currentTriggerEndTimestamp, processingTimeMills)
+
+    currentStatus = currentStatus.copy(isTriggerActive = false)
+  }
+
+  /**
+   * Retrieve a measured duration
+   */
+  def getDuration(key: String): Option[Long] = {
+    currentDurationsMs.get(key)
   }
 
   /**
    * Finalizes the query progress and adds it to list of recent status updates.
    *
    * @param hasNewData Whether the sources of this stream had new data for this trigger.
-   * @param hasExecuted Whether any batch was executed during this trigger. Streaming queries that
-   *                    perform stateful aggregations with timeouts can still run batches even
-   *                    though the sources don't have any new data.
    */
-  protected def finishTrigger(hasNewData: Boolean, hasExecuted: Boolean): Unit = {
-    assert(currentTriggerStartOffsets != null && currentTriggerEndOffsets != null &&
-      currentTriggerLatestOffsets != null)
+  def finishTrigger(
+    hasNewData: Boolean,

Review Comment:
   nit: 2 more spaces



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala:
##########
@@ -40,89 +41,172 @@ import org.apache.spark.util.Clock
 
 /**
  * Responsible for continually reporting statistics about the amount of data processed as well
- * as latency for a streaming query.  This trait is designed to be mixed into the
- * [[StreamExecution]], who is responsible for calling `startTrigger` and `finishTrigger`
- * at the appropriate times. Additionally, the status can updated with `updateStatusMessage` to
- * allow reporting on the streams current state (i.e. "Fetching more data").
+ * as latency for a streaming query.  This class is designed to hold information about
+ * a streaming query and contains methods that can be used on a streaming query,
+ * such as get the most recent progress of the query.
  */
-trait ProgressReporter extends Logging {
+class ProgressReporter(
+    private val sparkSession: SparkSession,
+    private val triggerClock: Clock,
+    val logicalPlan: () => LogicalPlan)
+  extends Logging {
 
-  case class ExecutionStats(
-    inputRows: Map[SparkDataStream, Long],
-    stateOperators: Seq[StateOperatorProgress],
-    eventTimeStats: Map[String, String])
-
-  // Internal state of the stream, required for computing metrics.
-  protected def id: UUID
-  protected def runId: UUID
-  protected def name: String
-  protected def triggerClock: Clock
-  protected def logicalPlan: LogicalPlan
-  protected def lastExecution: QueryExecution
-  protected def newData: Map[SparkDataStream, LogicalPlan]
-  protected def sinkCommitProgress: Option[StreamWriterCommitProgress]
-  protected def sources: Seq[SparkDataStream]
-  protected def sink: Table
+  // The timestamp we report an event that has not executed anything
+  var lastNoExecutionProgressEventTime = Long.MinValue
+
+  /** Holds the most recent query progress updates.  Accesses must lock on the queue itself. */
+  private val progressBuffer = new mutable.Queue[StreamingQueryProgress]()
+
+  val noDataProgressEventInterval: Long =
+    sparkSession.sessionState.conf.streamingNoDataProgressEventInterval
+
+  private val timestampFormat =
+    DateTimeFormatter
+      .ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
+      .withZone(DateTimeUtils.getZoneId("UTC"))
+
+  /** Returns an array containing the most recent query progress updates. */
+  def recentProgress: Array[StreamingQueryProgress] = progressBuffer.synchronized {
+    progressBuffer.toArray
+  }
+
+  /** Returns the most recent query progress update or null if there were no progress updates. */
+  def lastProgress: StreamingQueryProgress = progressBuffer.synchronized {
+    progressBuffer.lastOption.orNull
+  }
+
+  def updateProgress(newProgress: StreamingQueryProgress): Unit = {
+    // Reset noDataEventTimestamp if we processed any data
+    lastNoExecutionProgressEventTime = triggerClock.getTimeMillis()
+
+    addNewProgress(newProgress)
+    postEvent(new QueryProgressEvent(newProgress))
+    logInfo(s"Streaming query made progress: $newProgress")
+  }
+
+  private def addNewProgress(newProgress: StreamingQueryProgress): Unit = {
+    progressBuffer.synchronized {
+      progressBuffer += newProgress
+      while (progressBuffer.length >= sparkSession.sessionState.conf.streamingProgressRetention) {
+        progressBuffer.dequeue()
+      }
+    }
+  }
+
+  def updateIdleness(
+    id: UUID,

Review Comment:
   nit: 2 more spaces



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala:
##########
@@ -135,75 +219,121 @@ trait ProgressReporter extends Logging {
    * Record the offsets range this trigger will process. Call this before updating
    * `committedOffsets` in `StreamExecution` to make sure that the correct range is recorded.
    */
-  protected def recordTriggerOffsets(
-      from: StreamProgress,
-      to: StreamProgress,
-      latest: StreamProgress): Unit = {
+  def recordTriggerOffsets(
+    from: StreamProgress,
+    to: StreamProgress,
+    latest: StreamProgress): Unit = {
     currentTriggerStartOffsets = from.transform((_, v) => v.json)
     currentTriggerEndOffsets = to.transform((_, v) => v.json)
     currentTriggerLatestOffsets = latest.transform((_, v) => v.json)
     latestStreamProgress = to
+    currentTriggerLatestOffsets = latest.transform((_, v) => v.json)
   }
 
-  private def addNewProgress(newProgress: StreamingQueryProgress): Unit = {
-    progressBuffer.synchronized {
-      progressBuffer += newProgress
-      while (progressBuffer.length >= sparkSession.sessionState.conf.streamingProgressRetention) {
-        progressBuffer.dequeue()
-      }
-    }
-  }
+  /** Finalizes the trigger which did not execute a batch. */
+  def finishNoExecutionTrigger(lastExecutedEpochId: Long): Unit = {
+    currentTriggerEndTimestamp = triggerClock.getTimeMillis()
+    val processingTimeMills = currentTriggerEndTimestamp - currentTriggerStartTimestamp
 
-  private def updateProgress(newProgress: StreamingQueryProgress): Unit = {
-    // Reset noDataEventTimestamp if we processed any data
-    lastNoExecutionProgressEventTime = triggerClock.getTimeMillis()
+    val execStatsOnNoExecution = execStatsOnLatestExecutedBatch.map(resetExecStatsForNoExecution)
 
-    addNewProgress(newProgress)
-    postEvent(new QueryProgressEvent(newProgress))
-    logInfo(s"Streaming query made progress: $newProgress")
-  }
+    val newProgress = constructNewProgress(processingTimeMills, lastExecutedEpochId,
+      execStatsOnNoExecution, Map.empty[String, Row])
 
-  private def updateIdleness(newProgress: StreamingQueryProgress): Unit = {
-    val now = triggerClock.getTimeMillis()
-    if (now - noDataProgressEventInterval >= lastNoExecutionProgressEventTime) {
-      addNewProgress(newProgress)
-      if (lastNoExecutionProgressEventTime > Long.MinValue) {
-        postEvent(new QueryIdleEvent(newProgress.id, newProgress.runId,
-          formatTimestamp(currentTriggerStartTimestamp)))
-        logInfo(s"Streaming query has been idle and waiting for new data more than " +
-          s"$noDataProgressEventInterval ms.")
-      }
+    progressReporter.updateIdleness(id, runId, currentTriggerStartTimestamp, newProgress)
 
-      lastNoExecutionProgressEventTime = now
-    }
+    warnIfFinishTriggerTakesTooLong(currentTriggerEndTimestamp, processingTimeMills)
+
+    currentStatus = currentStatus.copy(isTriggerActive = false)
+  }
+
+  /**
+   * Retrieve a measured duration
+   */
+  def getDuration(key: String): Option[Long] = {
+    currentDurationsMs.get(key)
   }
 
   /**
    * Finalizes the query progress and adds it to list of recent status updates.
    *
    * @param hasNewData Whether the sources of this stream had new data for this trigger.
-   * @param hasExecuted Whether any batch was executed during this trigger. Streaming queries that
-   *                    perform stateful aggregations with timeouts can still run batches even
-   *                    though the sources don't have any new data.
    */
-  protected def finishTrigger(hasNewData: Boolean, hasExecuted: Boolean): Unit = {
-    assert(currentTriggerStartOffsets != null && currentTriggerEndOffsets != null &&
-      currentTriggerLatestOffsets != null)
+  def finishTrigger(
+    hasNewData: Boolean,
+    sourceToNumInputRowsMap: Map[SparkDataStream, Long],
+    lastExecution: IncrementalExecution,
+    lastEpochId: Long): Unit = {
+    assert(
+      currentTriggerStartOffsets != null && currentTriggerEndOffsets != null &&
+        currentTriggerLatestOffsets != null
+    )
     currentTriggerEndTimestamp = triggerClock.getTimeMillis()
-
-    val executionStats = extractExecutionStats(hasNewData, hasExecuted)
     val processingTimeMills = currentTriggerEndTimestamp - currentTriggerStartTimestamp
+    assert(lastExecution != null, "executed batch should provide the information for execution.")
+    val execStats = extractExecutionStats(hasNewData, sourceToNumInputRowsMap, lastExecution)
+    logDebug(s"Execution stats: $execStats")
+
+    val observedMetrics = extractObservedMetrics(lastExecution)
+    val newProgress = constructNewProgress(processingTimeMills, lastEpochId, Some(execStats),
+      observedMetrics)
+
+    progressReporter.lastNoExecutionProgressEventTime = triggerClock.getTimeMillis()
+    progressReporter.updateProgress(newProgress)
+
+    // Update the value since this trigger executes a batch successfully.
+    this.execStatsOnLatestExecutedBatch = Some(execStats)
+
+    warnIfFinishTriggerTakesTooLong(currentTriggerEndTimestamp, processingTimeMills)
+
+    currentStatus = currentStatus.copy(isTriggerActive = false)
+  }
+
+  private def constructNewProgress(
+    processingTimeMills: Long,
+    batchId: Long,
+    execStats: Option[ExecutionStats],
+    observedMetrics: Map[String, Row]): StreamingQueryProgress = {
     val processingTimeSec = Math.max(1L, processingTimeMills).toDouble / MILLIS_PER_SECOND
 
     val inputTimeSec = if (lastTriggerStartTimestamp >= 0) {
       (currentTriggerStartTimestamp - lastTriggerStartTimestamp).toDouble / MILLIS_PER_SECOND
     } else {
       Double.PositiveInfinity
     }
-    logDebug(s"Execution stats: $executionStats")
+    val sourceProgress = extractSourceProgress(execStats, inputTimeSec, processingTimeSec)
+    val sinkProgress = extractSinkProgress(execStats)
+
+    val eventTime = execStats.map { stats =>
+      stats.eventTimeStats.asJava
+    }.getOrElse(new java.util.HashMap)
 
-    val sourceProgress = sources.distinct.map { source =>
-      val numRecords = executionStats.inputRows.getOrElse(source, 0L)
+    val stateOperators = execStats.map { stats =>
+      stats.stateOperators.toArray
+    }.getOrElse(Array[StateOperatorProgress]())
+
+    new StreamingQueryProgress(
+      id = id,
+      runId = runId,
+      name = name,
+      timestamp = progressReporter.formatTimestamp(currentTriggerStartTimestamp),
+      batchId = batchId,
+      batchDuration = processingTimeMills,
+      durationMs =
+        new java.util.HashMap(currentDurationsMs.toMap.transform((_, v) => long2Long(v)).asJava),
+      eventTime = new java.util.HashMap(eventTime),
+      stateOperators = stateOperators,
+      sources = sourceProgress.toArray,
+      sink = sinkProgress,
+      observedMetrics = new java.util.HashMap(observedMetrics.asJava))
+  }
+
+  private def extractSourceProgress(
+    execStats: Option[ExecutionStats],

Review Comment:
   nit: 2 more spaces



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala:
##########
@@ -40,89 +41,172 @@ import org.apache.spark.util.Clock
 
 /**
  * Responsible for continually reporting statistics about the amount of data processed as well
- * as latency for a streaming query.  This trait is designed to be mixed into the
- * [[StreamExecution]], who is responsible for calling `startTrigger` and `finishTrigger`
- * at the appropriate times. Additionally, the status can updated with `updateStatusMessage` to
- * allow reporting on the streams current state (i.e. "Fetching more data").
+ * as latency for a streaming query.  This class is designed to hold information about
+ * a streaming query and contains methods that can be used on a streaming query,
+ * such as get the most recent progress of the query.
  */
-trait ProgressReporter extends Logging {
+class ProgressReporter(
+    private val sparkSession: SparkSession,
+    private val triggerClock: Clock,
+    val logicalPlan: () => LogicalPlan)
+  extends Logging {
 
-  case class ExecutionStats(
-    inputRows: Map[SparkDataStream, Long],
-    stateOperators: Seq[StateOperatorProgress],
-    eventTimeStats: Map[String, String])
-
-  // Internal state of the stream, required for computing metrics.
-  protected def id: UUID
-  protected def runId: UUID
-  protected def name: String
-  protected def triggerClock: Clock
-  protected def logicalPlan: LogicalPlan
-  protected def lastExecution: QueryExecution
-  protected def newData: Map[SparkDataStream, LogicalPlan]
-  protected def sinkCommitProgress: Option[StreamWriterCommitProgress]
-  protected def sources: Seq[SparkDataStream]
-  protected def sink: Table
+  // The timestamp we report an event that has not executed anything
+  var lastNoExecutionProgressEventTime = Long.MinValue
+
+  /** Holds the most recent query progress updates.  Accesses must lock on the queue itself. */
+  private val progressBuffer = new mutable.Queue[StreamingQueryProgress]()
+
+  val noDataProgressEventInterval: Long =
+    sparkSession.sessionState.conf.streamingNoDataProgressEventInterval
+
+  private val timestampFormat =
+    DateTimeFormatter
+      .ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
+      .withZone(DateTimeUtils.getZoneId("UTC"))
+
+  /** Returns an array containing the most recent query progress updates. */
+  def recentProgress: Array[StreamingQueryProgress] = progressBuffer.synchronized {
+    progressBuffer.toArray
+  }
+
+  /** Returns the most recent query progress update or null if there were no progress updates. */
+  def lastProgress: StreamingQueryProgress = progressBuffer.synchronized {
+    progressBuffer.lastOption.orNull
+  }
+
+  def updateProgress(newProgress: StreamingQueryProgress): Unit = {
+    // Reset noDataEventTimestamp if we processed any data
+    lastNoExecutionProgressEventTime = triggerClock.getTimeMillis()
+
+    addNewProgress(newProgress)
+    postEvent(new QueryProgressEvent(newProgress))
+    logInfo(s"Streaming query made progress: $newProgress")
+  }
+
+  private def addNewProgress(newProgress: StreamingQueryProgress): Unit = {
+    progressBuffer.synchronized {
+      progressBuffer += newProgress
+      while (progressBuffer.length >= sparkSession.sessionState.conf.streamingProgressRetention) {
+        progressBuffer.dequeue()
+      }
+    }
+  }
+
+  def updateIdleness(
+    id: UUID,
+    runId: UUID,
+    currentTriggerStartTimestamp: Long,
+    newProgress: StreamingQueryProgress): Unit = {
+    val now = triggerClock.getTimeMillis()
+    if (now - noDataProgressEventInterval >= lastNoExecutionProgressEventTime) {
+      addNewProgress(newProgress)
+      if (lastNoExecutionProgressEventTime > Long.MinValue) {
+        postEvent(new QueryIdleEvent(id, runId, formatTimestamp(currentTriggerStartTimestamp)))
+        logInfo(s"Streaming query has been idle and waiting for new data more than " +
+          s"${noDataProgressEventInterval} ms.")
+      }
+
+      lastNoExecutionProgressEventTime = now
+    }
+  }
+
+  private def postEvent(event: StreamingQueryListener.Event): Unit = {
+    sparkSession.streams.postListenerEvent(event)
+  }
+
+  def formatTimestamp(millis: Long): String = {
+    Instant.ofEpochMilli(millis)
+      .atZone(ZoneId.of("Z")).format(timestampFormat)
+  }
+}
+
+/**
+ * This class holds variables and methods that are used track metrics and progress
+ * during the execution lifecycle of a batch that is being processed by the streaming query
+ */
+abstract class ProgressContext(
+  id: UUID,

Review Comment:
   nit: 2 more spaces



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala:
##########
@@ -398,24 +447,28 @@ class MicroBatchExecution(
    *    Identify a brand new batch
    *  DONE
    */
-  private def populateStartOffsets(sparkSessionToRunBatches: SparkSession): Unit = {
-    sinkCommitProgress = None
+  protected def populateStartOffsets(
+    execCtx: MicroBatchExecutionContext,

Review Comment:
   nit: 2 more spaces



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala:
##########
@@ -463,34 +516,38 @@ class MicroBatchExecution(
             }
           case None => logInfo("no commit log present")
         }
-        logInfo(s"Resuming at batch $currentBatchId with committed offsets " +
-          s"$committedOffsets and available offsets $availableOffsets")
+        // initialize committed offsets to start offsets of the most recent committed batch
+        committedOffsets = execCtx.startOffsets
+        logInfo(s"Resuming at batch ${execCtx.batchId} with committed offsets " +
+          s"${execCtx.startOffsets} and available offsets ${execCtx.endOffsets}")
       case None => // We are starting this stream for the first time.
         logInfo(s"Starting new streaming query.")
-        currentBatchId = 0
+        execCtx.batchId = 0
         watermarkTracker = WatermarkTracker(sparkSessionToRunBatches.conf)
     }
   }
 
   /**
    * Returns true if there is any new data available to be processed.
    */
-  private def isNewDataAvailable: Boolean = {
-    availableOffsets.exists {
+  private def isNewDataAvailable(execCtx: MicroBatchExecutionContext): Boolean = {
+    execCtx.endOffsets.exists {
       case (source, available) =>
-        committedOffsets
+        execCtx.startOffsets
           .get(source)
           .map(committed => committed != available)
           .getOrElse(true)
     }
   }
 
   /**
-   * Get the startOffset from availableOffsets. This is to be used in
+   * Get the startOffset from endOffsets. This is to be used in
    * latestOffset(startOffset, readLimit)
    */
-  private def getStartOffset(dataStream: SparkDataStream): OffsetV2 = {
-    val startOffsetOpt = availableOffsets.get(dataStream)
+  private def getStartOffset(
+    execCtx: MicroBatchExecutionContext,

Review Comment:
   nit: 2 more spaces



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47052][SS] Separate state tracking variables from MicroBatchExecution/StreamExecution [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on PR #45109:
URL: https://github.com/apache/spark/pull/45109#issuecomment-1953519694

   Could you please rebase the master branch of your fork with the latest in OSS repo, and rebase your PR against the latest master as well? It may resolve issues the CI shows us as failure in linter.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47052][SS] Separate state tracking variables from MicroBatchExecution/StreamExecution [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on PR #45109:
URL: https://github.com/apache/spark/pull/45109#issuecomment-1955844741

   Thanks! Merging to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47052][SS] Separate state tracking variables from MicroBatchExecution/StreamExecution [spark]

Posted by "jerrypeng (via GitHub)" <gi...@apache.org>.
jerrypeng commented on PR #45109:
URL: https://github.com/apache/spark/pull/45109#issuecomment-1955422916

   @HeartSaVioR thanks for your review!  I have addressed your comments. PTAL!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org