You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/12/08 08:21:59 UTC

[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking

HeartSaVioR commented on code in PR #38517:
URL: https://github.com/apache/spark/pull/38517#discussion_r1043003950


##########
core/src/main/scala/org/apache/spark/util/ThreadUtils.scala:
##########
@@ -167,6 +167,27 @@ private[spark] object ThreadUtils {
     Executors.newFixedThreadPool(1, threadFactory).asInstanceOf[ThreadPoolExecutor]
   }
 
+  /**
+   * Wrapper over newSingleThreadExecutor that allows the specification
+   * of a RejectedExecutionHandler
+   */
+  def newDaemonSingleThreadExecutorWithRejectedExecutionHandler(
+    threadName: String,

Review Comment:
   nit: 4 spaces for definition of params



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2014,7 +2014,6 @@ object SQLConf {
       .booleanConf
       .createWithDefault(true)
 
-

Review Comment:
   nit: unnecessary change



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala:
##########
@@ -342,17 +342,14 @@ class MicroBatchExecution(
         isCurrentBatchConstructed = true
         availableOffsets = nextOffsets.toStreamProgress(sources)
         /* Initialize committed offsets to a committed batch, which at this
-         * is the second latest batch id in the offset log. */
-        if (latestBatchId != 0) {
-          val secondLatestOffsets = offsetLog.get(latestBatchId - 1).getOrElse {
-            logError(s"The offset log for batch ${latestBatchId - 1} doesn't exist, " +
-              s"which is required to restart the query from the latest batch $latestBatchId " +
-              "from the offset log. Please ensure there are two subsequent offset logs " +
-              "available for the latest batch via manually deleting the offset file(s). " +
-              "Please also ensure the latest batch for commit log is equal or one batch " +
-              "earlier than the latest batch for offset log.")
-            throw new IllegalStateException(s"batch ${latestBatchId - 1} doesn't exist")
-          }
+         * is the second latest batch id in the offset log.
+         * The offset log may not be contiguous */
+        val prevBatchId = offsetLog.getPrevBatchFromStorage(latestBatchId)
+        if (latestBatchId != 0 && prevBatchId.isDefined) {
+            val secondLatestOffsets = offsetLog.get(prevBatchId.get).getOrElse({

Review Comment:
   nit: indentation looks to be off



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala:
##########
@@ -148,6 +148,24 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
     }
   }
 
+  /**
+   * Get the id of the previous batch from storage

Review Comment:
   nit: We don't require to fill all the form with meaningless info. Please remove parts if you don't feel it's helpful or you're just reiterating just to fill out.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala:
##########
@@ -342,17 +342,14 @@ class MicroBatchExecution(
         isCurrentBatchConstructed = true
         availableOffsets = nextOffsets.toStreamProgress(sources)
         /* Initialize committed offsets to a committed batch, which at this
-         * is the second latest batch id in the offset log. */
-        if (latestBatchId != 0) {
-          val secondLatestOffsets = offsetLog.get(latestBatchId - 1).getOrElse {
-            logError(s"The offset log for batch ${latestBatchId - 1} doesn't exist, " +
-              s"which is required to restart the query from the latest batch $latestBatchId " +
-              "from the offset log. Please ensure there are two subsequent offset logs " +
-              "available for the latest batch via manually deleting the offset file(s). " +
-              "Please also ensure the latest batch for commit log is equal or one batch " +
-              "earlier than the latest batch for offset log.")
-            throw new IllegalStateException(s"batch ${latestBatchId - 1} doesn't exist")
-          }
+         * is the second latest batch id in the offset log.

Review Comment:
   I'm actually in favor of limiting the change to async progress tracking one, like we are adding protected methods for extension.
   
   Do we have a goal to support smooth transition between normal microbatch execution and async progress tracking for a single query? If we want to do so, we should have a clear explanation on the semantic and behavior during transition between two (both directions). Otherwise, I'd rather say let's leave normal microbatch execution work for the same.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala:
##########
@@ -727,18 +719,56 @@ class MicroBatchExecution(
 
     withProgressLocked {
       sinkCommitProgress = batchSinkProgress
-      watermarkTracker.updateWatermark(lastExecution.executedPlan)
-      reportTimeTaken("commitOffsets") {
-        assert(commitLog.add(currentBatchId, CommitMetadata(watermarkTracker.currentWatermark)),
-          "Concurrent update to the commit log. Multiple streaming jobs detected for " +
-            s"$currentBatchId")
-      }
-      committedOffsets ++= availableOffsets
+      markMicroBatchEnd()
     }
     logDebug(s"Completed batch ${currentBatchId}")
   }
 
-  /** Execute a function while locking the stream from making an progress */
+
+  /**
+   * Called at the start of the micro batch with given offsets. It takes care of offset
+   * checkpointing to offset log and any microbatch startup tasks.
+   */
+  protected def markMicroBatchStart(): Unit = {
+    assert(offsetLog.add(currentBatchId,
+      availableOffsets.toOffsetSeq(sources, offsetSeqMetadata)),
+      s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId")
+    logInfo(s"Committed offsets for batch $currentBatchId. " +
+      s"Metadata ${offsetSeqMetadata.toString}")
+  }
+
+  /**
+   * Method called once after the planning is done and before the start of the microbatch execution.
+   * It can be used to perform any pre-execution tasks.
+   */
+  protected def markMicroBatchExecutionStart(): Unit = {}
+
+  /**
+   * Called after the microbatch has completed execution. It takes care of committing the offset
+   * to commit log and other bookkeeping.
+   */
+  protected def markMicroBatchEnd(): Unit = {
+    watermarkTracker.updateWatermark(lastExecution.executedPlan)
+    reportTimeTaken("commitOffsets") {
+      assert(commitLog.add(currentBatchId, CommitMetadata(watermarkTracker.currentWatermark)),
+        "Concurrent update to the commit log. Multiple streaming jobs detected for " +
+          s"$currentBatchId")
+    }
+    committedOffsets ++= availableOffsets
+  }
+
+  protected def cleanUpLastExecutedMicroBatch(): Unit = {
+    if (currentBatchId != 0) {
+      val prevBatchOff = offsetLog.get(currentBatchId - 1)
+      if (prevBatchOff.isDefined) {
+        commitSources(prevBatchOff.get)
+      } else {
+        throw new IllegalStateException(s"batch ${currentBatchId - 1} doesn't exist")
+      }
+    }
+  }
+
+    /** Execute a function while locking the stream from making an progress */

Review Comment:
   nit: indentation



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent._
+import java.util.concurrent.atomic.AtomicLong
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.streaming.WriteToStream
+import org.apache.spark.sql.streaming.Trigger
+import org.apache.spark.util.{Clock, ThreadUtils}
+
+object AsyncProgressTrackingMicroBatchExecution {
+  val ASYNC_PROGRESS_TRACKING_ENABLED = "asyncProgressTrackingEnabled"
+  val ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS =
+    "asyncProgressTrackingCheckpointIntervalMs"
+
+  // for testing purposes
+  val ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK =
+    "_asyncProgressTrackingOverrideSinkSupportCheck"
+
+  private def getAsyncProgressTrackingCheckpointingIntervalMs(
+      extraOptions: Map[String, String]): Long = {
+    extraOptions
+      .getOrElse(
+        AsyncProgressTrackingMicroBatchExecution.ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS,
+        "1000"
+      )
+      .toLong
+  }
+}
+
+/**
+ * Class to execute micro-batches when async progress tracking is enabled
+ */
+class AsyncProgressTrackingMicroBatchExecution(
+    sparkSession: SparkSession,
+    trigger: Trigger,
+    triggerClock: Clock,
+    extraOptions: Map[String, String],
+    plan: WriteToStream)
+    extends MicroBatchExecution(sparkSession, trigger, triggerClock, extraOptions, plan) {
+
+  protected val asyncProgressTrackingCheckpointingIntervalMs: Long
+  = AsyncProgressTrackingMicroBatchExecution
+    .getAsyncProgressTrackingCheckpointingIntervalMs(extraOptions)
+
+  // Offsets that are ready to be committed by the source.
+  // This is needed so that we can call source commit in the same thread as micro-batch execution
+  // to be thread safe
+  private val sourceCommitQueue = new ConcurrentLinkedQueue[OffsetSeq]()
+
+  // to cache the batch id of the last batch written to storage
+  private val lastBatchPersistedToDurableStorage = new AtomicLong(-1)
+
+  override val triggerExecutor: TriggerExecutor = validateAndGetTrigger()
+
+  private var isFirstBatch: Boolean = true
+
+  // thread pool is only one thread because we want offset
+  // writes to execute in order in a serialized fashion
+  protected val asyncWritesExecutorService
+  = ThreadUtils.newDaemonSingleThreadExecutorWithRejectedExecutionHandler(
+    "async-log-write",
+    2, // one for offset commit and one for completion commit
+    new RejectedExecutionHandler() {
+      override def rejectedExecution(r: Runnable, executor: ThreadPoolExecutor): Unit = {
+        try {
+          if (!executor.isShutdown) {
+            val start = System.currentTimeMillis()
+            executor.getQueue.put(r)
+            logDebug(
+              s"Async write paused execution for " +
+                s"${System.currentTimeMillis() - start} due to task queue being full."
+            )
+          }
+        } catch {
+          case e: InterruptedException =>
+            Thread.currentThread.interrupt()
+            throw new RejectedExecutionException("Producer interrupted", e)
+          case e: Throwable =>
+            logError("Encountered error in async write executor service", e)
+            errorNotifier.markError(e)
+        }
+      }
+    })
+
+  override val offsetLog = new AsyncOffsetSeqLog(
+    sparkSession,
+    checkpointFile("offsets"),
+    asyncWritesExecutorService,
+    asyncProgressTrackingCheckpointingIntervalMs,
+    clock = triggerClock
+  )
+
+  override val commitLog =
+    new AsyncCommitLog(sparkSession, checkpointFile("commits"), asyncWritesExecutorService)
+
+  override def markMicroBatchExecutionStart(): Unit = {
+    // check if pipeline is stateful
+    checkNotStatefulPipeline
+  }
+
+  override def cleanUpLastExecutedMicroBatch(): Unit = {
+    // this is a no op for async progress tracking since we only want to commit sources only
+    // after the offset WAL commit has be successfully written
+  }
+
+  /**
+   * Should not call super method as we need to do something completely different
+   * in this method for async progress tracking
+   */
+  override def markMicroBatchStart(): Unit = {
+    // Because we are using a thread pool with only one thread, async writes to the offset log
+    // are still written in a serial / in order fashion
+    offsetLog
+      .addAsync(currentBatchId, availableOffsets.toOffsetSeq(sources, offsetSeqMetadata))
+      .thenAccept(tuple => {
+        val (batchId, persistedToDurableStorage) = tuple
+        if (persistedToDurableStorage) {
+
+          // batch id cache not initialized
+          if (lastBatchPersistedToDurableStorage.get == -1) {
+            lastBatchPersistedToDurableStorage.set(
+              offsetLog.getPrevBatchFromStorage(batchId).getOrElse(-1))
+          }
+
+          if (batchId != 0 && lastBatchPersistedToDurableStorage.get != -1) {
+            // sanity check to make sure batch ids are monotonically increasing
+            assert(lastBatchPersistedToDurableStorage.get < batchId)
+            val prevBatchOff = offsetLog.get(lastBatchPersistedToDurableStorage.get())
+            if (prevBatchOff.isDefined) {
+              // Offset is ready to be committed by the source. Add to queue
+              sourceCommitQueue.add(prevBatchOff.get)
+            } else {
+              throw new IllegalStateException(
+                s"batch ${lastBatchPersistedToDurableStorage.get()} doesn't exist"
+              )
+            }
+          }
+          lastBatchPersistedToDurableStorage.set(batchId)
+        }
+      })
+      .exceptionally((th: Throwable) => {
+        logError("Encountered error while performing async offset write", th)
+        errorNotifier.markError(th)
+        return
+      })
+
+    // check if there are offsets that are ready to be committed by the source
+    var offset = sourceCommitQueue.poll()
+    while (offset != null) {
+      commitSources(offset)
+      offset = sourceCommitQueue.poll()
+    }
+  }
+
+  override def markMicroBatchEnd(): Unit = {
+    watermarkTracker.updateWatermark(lastExecution.executedPlan)
+    reportTimeTaken("commitOffsets") {
+      // check if current batch there is a async write for the offset log is issued for this batch
+      // if so, we should do the same for commit log
+      if (!offsetLog.getAsyncOffsetWrite(currentBatchId).isEmpty) {
+        commitLog
+          .addAsync(currentBatchId, CommitMetadata(watermarkTracker.currentWatermark))
+          .exceptionally((th: Throwable) => {
+            logError("Got exception during async write", th)
+            errorNotifier.markError(th)
+            return
+          })
+      } else {
+        if (!commitLog.addInMemory(
+          currentBatchId, CommitMetadata(watermarkTracker.currentWatermark))) {
+          throw new IllegalStateException(
+            s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId"
+          )
+        }
+      }
+      offsetLog.removeAsyncOffsetWrite(currentBatchId)
+    }
+    committedOffsets ++= availableOffsets
+  }
+
+  // need to look at the number of files on disk
+  override def purge(threshold: Long): Unit = {
+    while (offsetLog.writtenToDurableStorage.size() > minLogEntriesToMaintain) {
+      offsetLog.writtenToDurableStorage.poll()
+    }
+    offsetLog.purge(offsetLog.writtenToDurableStorage.peek())
+
+    while (commitLog.writtenToDurableStorage.size() > minLogEntriesToMaintain) {
+      commitLog.writtenToDurableStorage.poll()
+    }
+    commitLog.purge(commitLog.writtenToDurableStorage.peek())
+  }
+
+  override def cleanup(): Unit = {
+    super.cleanup()
+
+    ThreadUtils.shutdown(asyncWritesExecutorService)
+    logInfo(s"Async progress tracking executor pool for query ${prettyIdString} has been shutdown")
+  }
+
+  // used for testing
+  def areWritesPendingOrInProgress(): Boolean = {
+    asyncWritesExecutorService.getQueue.size() > 0 || asyncWritesExecutorService.getActiveCount > 0
+  }
+
+  private def validateAndGetTrigger(): TriggerExecutor = {
+    // validate that the pipeline is using a supported sink
+    if (!extraOptions
+      .get(
+        AsyncProgressTrackingMicroBatchExecution.ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK

Review Comment:
   nit: why not import `AsyncProgressTrackingMicroBatchExecution._` in the early of class definition and remove redundant AsyncProgressTrackingMicroBatchExecution?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent._
+import java.util.concurrent.atomic.AtomicLong
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.streaming.WriteToStream
+import org.apache.spark.sql.streaming.Trigger
+import org.apache.spark.util.{Clock, ThreadUtils}
+
+object AsyncProgressTrackingMicroBatchExecution {
+  val ASYNC_PROGRESS_TRACKING_ENABLED = "asyncProgressTrackingEnabled"
+  val ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS =
+    "asyncProgressTrackingCheckpointIntervalMs"
+
+  // for testing purposes
+  val ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK =
+    "_asyncProgressTrackingOverrideSinkSupportCheck"
+
+  private def getAsyncProgressTrackingCheckpointingIntervalMs(
+      extraOptions: Map[String, String]): Long = {
+    extraOptions
+      .getOrElse(
+        AsyncProgressTrackingMicroBatchExecution.ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS,
+        "1000"
+      )
+      .toLong
+  }
+}
+
+/**
+ * Class to execute micro-batches when async progress tracking is enabled
+ */
+class AsyncProgressTrackingMicroBatchExecution(
+    sparkSession: SparkSession,
+    trigger: Trigger,
+    triggerClock: Clock,
+    extraOptions: Map[String, String],
+    plan: WriteToStream)
+    extends MicroBatchExecution(sparkSession, trigger, triggerClock, extraOptions, plan) {
+
+  protected val asyncProgressTrackingCheckpointingIntervalMs: Long
+  = AsyncProgressTrackingMicroBatchExecution
+    .getAsyncProgressTrackingCheckpointingIntervalMs(extraOptions)
+
+  // Offsets that are ready to be committed by the source.
+  // This is needed so that we can call source commit in the same thread as micro-batch execution
+  // to be thread safe
+  private val sourceCommitQueue = new ConcurrentLinkedQueue[OffsetSeq]()
+
+  // to cache the batch id of the last batch written to storage
+  private val lastBatchPersistedToDurableStorage = new AtomicLong(-1)
+
+  override val triggerExecutor: TriggerExecutor = validateAndGetTrigger()
+
+  private var isFirstBatch: Boolean = true
+
+  // thread pool is only one thread because we want offset
+  // writes to execute in order in a serialized fashion
+  protected val asyncWritesExecutorService
+  = ThreadUtils.newDaemonSingleThreadExecutorWithRejectedExecutionHandler(
+    "async-log-write",
+    2, // one for offset commit and one for completion commit
+    new RejectedExecutionHandler() {
+      override def rejectedExecution(r: Runnable, executor: ThreadPoolExecutor): Unit = {
+        try {
+          if (!executor.isShutdown) {
+            val start = System.currentTimeMillis()
+            executor.getQueue.put(r)
+            logDebug(
+              s"Async write paused execution for " +
+                s"${System.currentTimeMillis() - start} due to task queue being full."
+            )
+          }
+        } catch {
+          case e: InterruptedException =>
+            Thread.currentThread.interrupt()
+            throw new RejectedExecutionException("Producer interrupted", e)
+          case e: Throwable =>
+            logError("Encountered error in async write executor service", e)
+            errorNotifier.markError(e)
+        }
+      }
+    })
+
+  override val offsetLog = new AsyncOffsetSeqLog(
+    sparkSession,
+    checkpointFile("offsets"),
+    asyncWritesExecutorService,
+    asyncProgressTrackingCheckpointingIntervalMs,
+    clock = triggerClock
+  )
+
+  override val commitLog =
+    new AsyncCommitLog(sparkSession, checkpointFile("commits"), asyncWritesExecutorService)
+
+  override def markMicroBatchExecutionStart(): Unit = {
+    // check if pipeline is stateful
+    checkNotStatefulPipeline
+  }
+
+  override def cleanUpLastExecutedMicroBatch(): Unit = {
+    // this is a no op for async progress tracking since we only want to commit sources only
+    // after the offset WAL commit has be successfully written
+  }
+
+  /**
+   * Should not call super method as we need to do something completely different
+   * in this method for async progress tracking
+   */
+  override def markMicroBatchStart(): Unit = {
+    // Because we are using a thread pool with only one thread, async writes to the offset log
+    // are still written in a serial / in order fashion
+    offsetLog
+      .addAsync(currentBatchId, availableOffsets.toOffsetSeq(sources, offsetSeqMetadata))
+      .thenAccept(tuple => {
+        val (batchId, persistedToDurableStorage) = tuple
+        if (persistedToDurableStorage) {
+
+          // batch id cache not initialized
+          if (lastBatchPersistedToDurableStorage.get == -1) {
+            lastBatchPersistedToDurableStorage.set(
+              offsetLog.getPrevBatchFromStorage(batchId).getOrElse(-1))
+          }
+
+          if (batchId != 0 && lastBatchPersistedToDurableStorage.get != -1) {
+            // sanity check to make sure batch ids are monotonically increasing
+            assert(lastBatchPersistedToDurableStorage.get < batchId)
+            val prevBatchOff = offsetLog.get(lastBatchPersistedToDurableStorage.get())
+            if (prevBatchOff.isDefined) {
+              // Offset is ready to be committed by the source. Add to queue
+              sourceCommitQueue.add(prevBatchOff.get)
+            } else {
+              throw new IllegalStateException(
+                s"batch ${lastBatchPersistedToDurableStorage.get()} doesn't exist"
+              )
+            }
+          }
+          lastBatchPersistedToDurableStorage.set(batchId)
+        }
+      })
+      .exceptionally((th: Throwable) => {
+        logError("Encountered error while performing async offset write", th)
+        errorNotifier.markError(th)
+        return
+      })
+
+    // check if there are offsets that are ready to be committed by the source
+    var offset = sourceCommitQueue.poll()
+    while (offset != null) {
+      commitSources(offset)
+      offset = sourceCommitQueue.poll()
+    }
+  }
+
+  override def markMicroBatchEnd(): Unit = {
+    watermarkTracker.updateWatermark(lastExecution.executedPlan)
+    reportTimeTaken("commitOffsets") {
+      // check if current batch there is a async write for the offset log is issued for this batch
+      // if so, we should do the same for commit log
+      if (!offsetLog.getAsyncOffsetWrite(currentBatchId).isEmpty) {
+        commitLog
+          .addAsync(currentBatchId, CommitMetadata(watermarkTracker.currentWatermark))
+          .exceptionally((th: Throwable) => {
+            logError("Got exception during async write", th)
+            errorNotifier.markError(th)
+            return
+          })
+      } else {
+        if (!commitLog.addInMemory(
+          currentBatchId, CommitMetadata(watermarkTracker.currentWatermark))) {
+          throw new IllegalStateException(
+            s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId"
+          )
+        }
+      }
+      offsetLog.removeAsyncOffsetWrite(currentBatchId)
+    }
+    committedOffsets ++= availableOffsets
+  }
+
+  // need to look at the number of files on disk
+  override def purge(threshold: Long): Unit = {
+    while (offsetLog.writtenToDurableStorage.size() > minLogEntriesToMaintain) {
+      offsetLog.writtenToDurableStorage.poll()
+    }
+    offsetLog.purge(offsetLog.writtenToDurableStorage.peek())
+
+    while (commitLog.writtenToDurableStorage.size() > minLogEntriesToMaintain) {
+      commitLog.writtenToDurableStorage.poll()
+    }
+    commitLog.purge(commitLog.writtenToDurableStorage.peek())
+  }
+
+  override def cleanup(): Unit = {
+    super.cleanup()
+
+    ThreadUtils.shutdown(asyncWritesExecutorService)
+    logInfo(s"Async progress tracking executor pool for query ${prettyIdString} has been shutdown")
+  }
+
+  // used for testing
+  def areWritesPendingOrInProgress(): Boolean = {
+    asyncWritesExecutorService.getQueue.size() > 0 || asyncWritesExecutorService.getActiveCount > 0
+  }
+
+  private def validateAndGetTrigger(): TriggerExecutor = {
+    // validate that the pipeline is using a supported sink
+    if (!extraOptions
+      .get(
+        AsyncProgressTrackingMicroBatchExecution.ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK
+      )
+      .getOrElse("false")
+      .toBoolean) {
+      try {
+        plan.sink.name() match {
+          case "noop-table" =>

Review Comment:
   Let's not forget to describe which sink we support in the guide doc.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala:
##########
@@ -300,10 +322,8 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
     }
   }
 
-
   /**
-   * List the available batches on file system. As a workaround for S3 inconsistent list, it also
-   * tries to take `batchCache` into consideration to infer a better answer.
+   * List the available batches on file system

Review Comment:
   nit: Let's use one-liner comment `/** List the available batches on file system. */`



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent._
+import java.util.concurrent.atomic.AtomicLong
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.streaming.WriteToStream
+import org.apache.spark.sql.streaming.Trigger
+import org.apache.spark.util.{Clock, ThreadUtils}
+
+object AsyncProgressTrackingMicroBatchExecution {
+  val ASYNC_PROGRESS_TRACKING_ENABLED = "asyncProgressTrackingEnabled"
+  val ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS =
+    "asyncProgressTrackingCheckpointIntervalMs"
+
+  // for testing purposes
+  val ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK =
+    "_asyncProgressTrackingOverrideSinkSupportCheck"
+
+  private def getAsyncProgressTrackingCheckpointingIntervalMs(
+      extraOptions: Map[String, String]): Long = {
+    extraOptions
+      .getOrElse(
+        AsyncProgressTrackingMicroBatchExecution.ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS,
+        "1000"
+      )
+      .toLong
+  }
+}
+
+/**
+ * Class to execute micro-batches when async progress tracking is enabled
+ */
+class AsyncProgressTrackingMicroBatchExecution(
+    sparkSession: SparkSession,
+    trigger: Trigger,
+    triggerClock: Clock,
+    extraOptions: Map[String, String],
+    plan: WriteToStream)
+    extends MicroBatchExecution(sparkSession, trigger, triggerClock, extraOptions, plan) {
+
+  protected val asyncProgressTrackingCheckpointingIntervalMs: Long
+  = AsyncProgressTrackingMicroBatchExecution
+    .getAsyncProgressTrackingCheckpointingIntervalMs(extraOptions)
+
+  // Offsets that are ready to be committed by the source.
+  // This is needed so that we can call source commit in the same thread as micro-batch execution
+  // to be thread safe
+  private val sourceCommitQueue = new ConcurrentLinkedQueue[OffsetSeq]()
+
+  // to cache the batch id of the last batch written to storage
+  private val lastBatchPersistedToDurableStorage = new AtomicLong(-1)
+
+  override val triggerExecutor: TriggerExecutor = validateAndGetTrigger()
+
+  private var isFirstBatch: Boolean = true
+
+  // thread pool is only one thread because we want offset
+  // writes to execute in order in a serialized fashion
+  protected val asyncWritesExecutorService
+  = ThreadUtils.newDaemonSingleThreadExecutorWithRejectedExecutionHandler(
+    "async-log-write",
+    2, // one for offset commit and one for completion commit
+    new RejectedExecutionHandler() {
+      override def rejectedExecution(r: Runnable, executor: ThreadPoolExecutor): Unit = {
+        try {
+          if (!executor.isShutdown) {
+            val start = System.currentTimeMillis()
+            executor.getQueue.put(r)
+            logDebug(
+              s"Async write paused execution for " +
+                s"${System.currentTimeMillis() - start} due to task queue being full."
+            )
+          }
+        } catch {
+          case e: InterruptedException =>
+            Thread.currentThread.interrupt()
+            throw new RejectedExecutionException("Producer interrupted", e)
+          case e: Throwable =>
+            logError("Encountered error in async write executor service", e)
+            errorNotifier.markError(e)
+        }
+      }
+    })
+
+  override val offsetLog = new AsyncOffsetSeqLog(
+    sparkSession,
+    checkpointFile("offsets"),
+    asyncWritesExecutorService,
+    asyncProgressTrackingCheckpointingIntervalMs,
+    clock = triggerClock
+  )
+
+  override val commitLog =
+    new AsyncCommitLog(sparkSession, checkpointFile("commits"), asyncWritesExecutorService)
+
+  override def markMicroBatchExecutionStart(): Unit = {
+    // check if pipeline is stateful
+    checkNotStatefulPipeline
+  }
+
+  override def cleanUpLastExecutedMicroBatch(): Unit = {
+    // this is a no op for async progress tracking since we only want to commit sources only
+    // after the offset WAL commit has be successfully written
+  }
+
+  /**
+   * Should not call super method as we need to do something completely different
+   * in this method for async progress tracking
+   */
+  override def markMicroBatchStart(): Unit = {
+    // Because we are using a thread pool with only one thread, async writes to the offset log
+    // are still written in a serial / in order fashion
+    offsetLog
+      .addAsync(currentBatchId, availableOffsets.toOffsetSeq(sources, offsetSeqMetadata))
+      .thenAccept(tuple => {
+        val (batchId, persistedToDurableStorage) = tuple
+        if (persistedToDurableStorage) {
+
+          // batch id cache not initialized
+          if (lastBatchPersistedToDurableStorage.get == -1) {
+            lastBatchPersistedToDurableStorage.set(
+              offsetLog.getPrevBatchFromStorage(batchId).getOrElse(-1))
+          }
+
+          if (batchId != 0 && lastBatchPersistedToDurableStorage.get != -1) {
+            // sanity check to make sure batch ids are monotonically increasing
+            assert(lastBatchPersistedToDurableStorage.get < batchId)
+            val prevBatchOff = offsetLog.get(lastBatchPersistedToDurableStorage.get())
+            if (prevBatchOff.isDefined) {
+              // Offset is ready to be committed by the source. Add to queue
+              sourceCommitQueue.add(prevBatchOff.get)
+            } else {
+              throw new IllegalStateException(
+                s"batch ${lastBatchPersistedToDurableStorage.get()} doesn't exist"
+              )
+            }
+          }
+          lastBatchPersistedToDurableStorage.set(batchId)
+        }
+      })
+      .exceptionally((th: Throwable) => {
+        logError("Encountered error while performing async offset write", th)
+        errorNotifier.markError(th)
+        return
+      })
+
+    // check if there are offsets that are ready to be committed by the source
+    var offset = sourceCommitQueue.poll()
+    while (offset != null) {
+      commitSources(offset)
+      offset = sourceCommitQueue.poll()
+    }
+  }
+
+  override def markMicroBatchEnd(): Unit = {
+    watermarkTracker.updateWatermark(lastExecution.executedPlan)
+    reportTimeTaken("commitOffsets") {
+      // check if current batch there is a async write for the offset log is issued for this batch
+      // if so, we should do the same for commit log
+      if (!offsetLog.getAsyncOffsetWrite(currentBatchId).isEmpty) {
+        commitLog
+          .addAsync(currentBatchId, CommitMetadata(watermarkTracker.currentWatermark))
+          .exceptionally((th: Throwable) => {
+            logError("Got exception during async write", th)
+            errorNotifier.markError(th)
+            return
+          })
+      } else {
+        if (!commitLog.addInMemory(
+          currentBatchId, CommitMetadata(watermarkTracker.currentWatermark))) {
+          throw new IllegalStateException(
+            s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId"
+          )
+        }
+      }
+      offsetLog.removeAsyncOffsetWrite(currentBatchId)
+    }
+    committedOffsets ++= availableOffsets
+  }
+
+  // need to look at the number of files on disk
+  override def purge(threshold: Long): Unit = {
+    while (offsetLog.writtenToDurableStorage.size() > minLogEntriesToMaintain) {
+      offsetLog.writtenToDurableStorage.poll()
+    }
+    offsetLog.purge(offsetLog.writtenToDurableStorage.peek())
+
+    while (commitLog.writtenToDurableStorage.size() > minLogEntriesToMaintain) {
+      commitLog.writtenToDurableStorage.poll()
+    }
+    commitLog.purge(commitLog.writtenToDurableStorage.peek())
+  }
+
+  override def cleanup(): Unit = {
+    super.cleanup()
+
+    ThreadUtils.shutdown(asyncWritesExecutorService)
+    logInfo(s"Async progress tracking executor pool for query ${prettyIdString} has been shutdown")
+  }
+
+  // used for testing
+  def areWritesPendingOrInProgress(): Boolean = {
+    asyncWritesExecutorService.getQueue.size() > 0 || asyncWritesExecutorService.getActiveCount > 0
+  }
+
+  private def validateAndGetTrigger(): TriggerExecutor = {
+    // validate that the pipeline is using a supported sink
+    if (!extraOptions
+      .get(
+        AsyncProgressTrackingMicroBatchExecution.ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK
+      )
+      .getOrElse("false")
+      .toBoolean) {
+      try {
+        plan.sink.name() match {
+          case "noop-table" =>
+          case "console" =>
+          case "MemorySink" =>
+          case "KafkaTable" =>
+          case _ =>
+            throw new IllegalArgumentException(
+              s"Sink ${plan.sink.name()}" +
+                s" does not support async progress tracking"
+            )
+        }
+      } catch {
+        case e: IllegalStateException =>
+          // sink does not implement name() method
+          if (e.getMessage.equals("should not be called.")) {

Review Comment:
   Is it really necessary to check the message with string literal? Consider the case when we would get IllegalStateException - name() should throw it, meaning we have no way to get the name in any way.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent._
+import java.util.concurrent.atomic.AtomicLong
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.streaming.WriteToStream
+import org.apache.spark.sql.streaming.Trigger
+import org.apache.spark.util.{Clock, ThreadUtils}
+
+object AsyncProgressTrackingMicroBatchExecution {
+  val ASYNC_PROGRESS_TRACKING_ENABLED = "asyncProgressTrackingEnabled"
+  val ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS =
+    "asyncProgressTrackingCheckpointIntervalMs"
+
+  // for testing purposes
+  val ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK =
+    "_asyncProgressTrackingOverrideSinkSupportCheck"
+
+  private def getAsyncProgressTrackingCheckpointingIntervalMs(
+      extraOptions: Map[String, String]): Long = {
+    extraOptions
+      .getOrElse(
+        AsyncProgressTrackingMicroBatchExecution.ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS,
+        "1000"
+      )
+      .toLong
+  }
+}
+
+/**
+ * Class to execute micro-batches when async progress tracking is enabled
+ */
+class AsyncProgressTrackingMicroBatchExecution(
+    sparkSession: SparkSession,
+    trigger: Trigger,
+    triggerClock: Clock,
+    extraOptions: Map[String, String],
+    plan: WriteToStream)
+    extends MicroBatchExecution(sparkSession, trigger, triggerClock, extraOptions, plan) {
+
+  protected val asyncProgressTrackingCheckpointingIntervalMs: Long
+  = AsyncProgressTrackingMicroBatchExecution
+    .getAsyncProgressTrackingCheckpointingIntervalMs(extraOptions)
+
+  // Offsets that are ready to be committed by the source.
+  // This is needed so that we can call source commit in the same thread as micro-batch execution
+  // to be thread safe
+  private val sourceCommitQueue = new ConcurrentLinkedQueue[OffsetSeq]()
+
+  // to cache the batch id of the last batch written to storage
+  private val lastBatchPersistedToDurableStorage = new AtomicLong(-1)
+
+  override val triggerExecutor: TriggerExecutor = validateAndGetTrigger()
+
+  private var isFirstBatch: Boolean = true
+
+  // thread pool is only one thread because we want offset
+  // writes to execute in order in a serialized fashion
+  protected val asyncWritesExecutorService
+  = ThreadUtils.newDaemonSingleThreadExecutorWithRejectedExecutionHandler(
+    "async-log-write",
+    2, // one for offset commit and one for completion commit
+    new RejectedExecutionHandler() {
+      override def rejectedExecution(r: Runnable, executor: ThreadPoolExecutor): Unit = {
+        try {
+          if (!executor.isShutdown) {
+            val start = System.currentTimeMillis()
+            executor.getQueue.put(r)
+            logDebug(
+              s"Async write paused execution for " +
+                s"${System.currentTimeMillis() - start} due to task queue being full."
+            )
+          }
+        } catch {
+          case e: InterruptedException =>
+            Thread.currentThread.interrupt()
+            throw new RejectedExecutionException("Producer interrupted", e)
+          case e: Throwable =>
+            logError("Encountered error in async write executor service", e)
+            errorNotifier.markError(e)
+        }
+      }
+    })
+
+  override val offsetLog = new AsyncOffsetSeqLog(
+    sparkSession,
+    checkpointFile("offsets"),
+    asyncWritesExecutorService,
+    asyncProgressTrackingCheckpointingIntervalMs,
+    clock = triggerClock
+  )
+
+  override val commitLog =
+    new AsyncCommitLog(sparkSession, checkpointFile("commits"), asyncWritesExecutorService)
+
+  override def markMicroBatchExecutionStart(): Unit = {
+    // check if pipeline is stateful
+    checkNotStatefulPipeline
+  }
+
+  override def cleanUpLastExecutedMicroBatch(): Unit = {
+    // this is a no op for async progress tracking since we only want to commit sources only
+    // after the offset WAL commit has be successfully written
+  }
+
+  /**
+   * Should not call super method as we need to do something completely different
+   * in this method for async progress tracking
+   */
+  override def markMicroBatchStart(): Unit = {
+    // Because we are using a thread pool with only one thread, async writes to the offset log
+    // are still written in a serial / in order fashion
+    offsetLog
+      .addAsync(currentBatchId, availableOffsets.toOffsetSeq(sources, offsetSeqMetadata))
+      .thenAccept(tuple => {
+        val (batchId, persistedToDurableStorage) = tuple
+        if (persistedToDurableStorage) {
+
+          // batch id cache not initialized
+          if (lastBatchPersistedToDurableStorage.get == -1) {
+            lastBatchPersistedToDurableStorage.set(
+              offsetLog.getPrevBatchFromStorage(batchId).getOrElse(-1))
+          }
+
+          if (batchId != 0 && lastBatchPersistedToDurableStorage.get != -1) {
+            // sanity check to make sure batch ids are monotonically increasing
+            assert(lastBatchPersistedToDurableStorage.get < batchId)
+            val prevBatchOff = offsetLog.get(lastBatchPersistedToDurableStorage.get())
+            if (prevBatchOff.isDefined) {
+              // Offset is ready to be committed by the source. Add to queue
+              sourceCommitQueue.add(prevBatchOff.get)
+            } else {
+              throw new IllegalStateException(
+                s"batch ${lastBatchPersistedToDurableStorage.get()} doesn't exist"
+              )
+            }
+          }
+          lastBatchPersistedToDurableStorage.set(batchId)
+        }
+      })
+      .exceptionally((th: Throwable) => {
+        logError("Encountered error while performing async offset write", th)
+        errorNotifier.markError(th)
+        return
+      })
+
+    // check if there are offsets that are ready to be committed by the source
+    var offset = sourceCommitQueue.poll()
+    while (offset != null) {
+      commitSources(offset)
+      offset = sourceCommitQueue.poll()
+    }
+  }
+
+  override def markMicroBatchEnd(): Unit = {
+    watermarkTracker.updateWatermark(lastExecution.executedPlan)
+    reportTimeTaken("commitOffsets") {
+      // check if current batch there is a async write for the offset log is issued for this batch
+      // if so, we should do the same for commit log
+      if (!offsetLog.getAsyncOffsetWrite(currentBatchId).isEmpty) {
+        commitLog
+          .addAsync(currentBatchId, CommitMetadata(watermarkTracker.currentWatermark))
+          .exceptionally((th: Throwable) => {
+            logError("Got exception during async write", th)
+            errorNotifier.markError(th)
+            return
+          })
+      } else {
+        if (!commitLog.addInMemory(
+          currentBatchId, CommitMetadata(watermarkTracker.currentWatermark))) {
+          throw new IllegalStateException(
+            s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId"
+          )
+        }
+      }
+      offsetLog.removeAsyncOffsetWrite(currentBatchId)
+    }
+    committedOffsets ++= availableOffsets
+  }
+
+  // need to look at the number of files on disk
+  override def purge(threshold: Long): Unit = {
+    while (offsetLog.writtenToDurableStorage.size() > minLogEntriesToMaintain) {
+      offsetLog.writtenToDurableStorage.poll()
+    }
+    offsetLog.purge(offsetLog.writtenToDurableStorage.peek())
+
+    while (commitLog.writtenToDurableStorage.size() > minLogEntriesToMaintain) {
+      commitLog.writtenToDurableStorage.poll()
+    }
+    commitLog.purge(commitLog.writtenToDurableStorage.peek())
+  }
+
+  override def cleanup(): Unit = {
+    super.cleanup()
+
+    ThreadUtils.shutdown(asyncWritesExecutorService)
+    logInfo(s"Async progress tracking executor pool for query ${prettyIdString} has been shutdown")
+  }
+
+  // used for testing
+  def areWritesPendingOrInProgress(): Boolean = {
+    asyncWritesExecutorService.getQueue.size() > 0 || asyncWritesExecutorService.getActiveCount > 0
+  }
+
+  private def validateAndGetTrigger(): TriggerExecutor = {
+    // validate that the pipeline is using a supported sink
+    if (!extraOptions
+      .get(
+        AsyncProgressTrackingMicroBatchExecution.ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK
+      )
+      .getOrElse("false")
+      .toBoolean) {
+      try {
+        plan.sink.name() match {
+          case "noop-table" =>
+          case "console" =>
+          case "MemorySink" =>
+          case "KafkaTable" =>
+          case _ =>
+            throw new IllegalArgumentException(
+              s"Sink ${plan.sink.name()}" +
+                s" does not support async progress tracking"
+            )
+        }
+      } catch {
+        case e: IllegalStateException =>
+          // sink does not implement name() method
+          if (e.getMessage.equals("should not be called.")) {
+            throw new IllegalArgumentException(
+              s"Sink ${plan.sink}" +
+                s" does not support async progress tracking"
+            )
+          } else {
+            throw e
+          }
+      }
+    }
+
+    trigger match {
+      case t: ProcessingTimeTrigger => ProcessingTimeExecutor(t, triggerClock)
+      case OneTimeTrigger =>
+        throw new IllegalArgumentException(
+          "Async progress tracking cannot be used with Once trigger")
+      case AvailableNowTrigger =>
+        throw new IllegalArgumentException(
+          "Async progress tracking cannot be used with AvailableNow trigger"
+        )
+      case _ => throw new IllegalStateException(s"Unknown type of trigger: $trigger")
+    }
+  }
+
+  private def checkNotStatefulPipeline: Unit = {
+    if (isFirstBatch) {
+      lastExecution.executedPlan.collect {
+        case p if p.isInstanceOf[StateStoreWriter] =>
+          throw new IllegalArgumentException(
+            "Stateful streaming queries does not support async progress tracking at this moment."
+          )
+          isFirstBatch = false

Review Comment:
   This is not reachable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org