You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2019/05/27 06:47:55 UTC

[GitHub] [spark] ivoson commented on a change in pull request #24537: [SPARK-23887][SS] continuous query progress reporting

ivoson commented on a change in pull request #24537: [SPARK-23887][SS] continuous query progress reporting
URL: https://github.com/apache/spark/pull/24537#discussion_r287658210
 
 

 ##########
 File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
 ##########
 @@ -326,36 +329,39 @@ class ContinuousExecution(
    * Mark the specified epoch as committed. All readers must have reported end offsets for the epoch
    * before this is called.
    */
-  def commit(epoch: Long): Unit = {
+  def commit(epoch: Long, epochStats: EpochStats): Unit = {
     updateStatusMessage(s"Committing epoch $epoch")
 
     assert(sources.length == 1, "only one continuous source supported currently")
     assert(offsetLog.get(epoch).isDefined, s"offset for epoch $epoch not reported before commit")
 
-    synchronized {
-      // Record offsets before updating `committedOffsets`
-      recordTriggerOffsets(from = committedOffsets, to = availableOffsets)
-      if (queryExecutionThread.isAlive) {
-        commitLog.add(epoch, CommitMetadata())
-        val offset =
-          sources(0).deserializeOffset(offsetLog.get(epoch).get.offsets(0).get.json)
-        committedOffsets ++= Seq(sources(0) -> offset)
-        sources(0).commit(offset.asInstanceOf[v2.reader.streaming.Offset])
-      } else {
-        return
+    reportTimeTaken("walCommit", epoch) {
+      synchronized {
+        // Record offsets before updating `committedOffsets`
+        recordTriggerOffsets(from = committedOffsets, to = availableOffsets, epoch)
 
 Review comment:
   The `availableOffsets` seems never updated in continuous processing, maybe we should take this into consideration when report metrics.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org