You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ka...@apache.org on 2022/11/02 13:24:38 UTC

[spark] branch master updated: [SPARK-40957] Add in memory cache in HDFSMetadataLog

This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 5fa2c13cbf8 [SPARK-40957] Add in memory cache in HDFSMetadataLog
5fa2c13cbf8 is described below

commit 5fa2c13cbf83c6c4c040f15bbbf66dbe49581bdc
Author: Jerry Peng <je...@databricks.com>
AuthorDate: Wed Nov 2 22:24:16 2022 +0900

    [SPARK-40957] Add in memory cache in HDFSMetadataLog
    
    ### What changes were proposed in this pull request?
    
    Every time entries in offset log or commit log needs to be access, we read from disk which is slow.  Can a cache of recent entries to speed up reads.
    
    There is already an existing implementation of a caching mechanism in OffsetSeqLog.  Lets replace it with an implementation in HDFSMetadataLog (parent class) so that we can support reading from in memory cache for both offset log and commit log.
    
    ### Why are the changes needed?
    
    Improve read speeds for entries in offset log and commit log
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Existing unit tests should suffice
    
    Closes #38430 from jerrypeng/SPARK-40957.
    
    Authored-by: Jerry Peng <je...@databricks.com>
    Signed-off-by: Jungtaek Lim <ka...@gmail.com>
---
 .../org/apache/spark/sql/internal/SQLConf.scala    |   8 ++
 .../sql/execution/streaming/HDFSMetadataLog.scala  | 112 ++++++++++++++-------
 .../sql/execution/streaming/OffsetSeqLog.scala     |  18 ----
 3 files changed, 85 insertions(+), 53 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index abe9df8dd87..0f3dc3cf44c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -2007,6 +2007,14 @@ object SQLConf {
       .booleanConf
       .createWithDefault(true)
 
+  val STREAMING_METADATA_CACHE_ENABLED =
+    buildConf("spark.sql.streaming.metadataCache.enabled")
+      .internal()
+      .doc("Whether the streaming HDFSMetadataLog caches the metadata of the latest two batches.")
+      .booleanConf
+      .createWithDefault(true)
+
+
   val VARIABLE_SUBSTITUTE_ENABLED =
     buildConf("spark.sql.variable.substitute")
       .doc("This enables substitution using syntax like `${var}`, `${system:var}`, " +
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
index 8a037b55168..1d444655548 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -19,7 +19,9 @@ package org.apache.spark.sql.execution.streaming
 
 import java.io._
 import java.nio.charset.StandardCharsets
+import java.util.{Collections, LinkedHashMap => JLinkedHashMap}
 
+import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
 
 import org.apache.commons.io.IOUtils
@@ -30,6 +32,7 @@ import org.json4s.jackson.Serialization
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.internal.SQLConf
 
 
 /**
@@ -64,6 +67,17 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
     fileManager.mkdirs(metadataPath)
   }
 
+  protected val metadataCacheEnabled: Boolean
+  = sparkSession.sessionState.conf.getConf(SQLConf.STREAMING_METADATA_CACHE_ENABLED)
+
+  /**
+   * Cache the latest two batches. [[StreamExecution]] usually just accesses the latest two batches
+   * when committing offsets, this cache will save some file system operations.
+   */
+  protected[sql] val batchCache = Collections.synchronizedMap(new JLinkedHashMap[Long, T](2) {
+    override def removeEldestEntry(e: java.util.Map.Entry[Long, T]): Boolean = size > 2
+  })
+
   /**
    * A `PathFilter` to filter only batch files
    */
@@ -113,10 +127,18 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
    */
   override def add(batchId: Long, metadata: T): Boolean = {
     require(metadata != null, "'null' metadata cannot written to a metadata log")
-    addNewBatchByStream(batchId) { output => serialize(metadata, output) }
+    val res = addNewBatchByStream(batchId) { output => serialize(metadata, output) }
+    if (metadataCacheEnabled && res) batchCache.put(batchId, metadata)
+    res
   }
 
   override def get(batchId: Long): Option[T] = {
+    if (metadataCacheEnabled && batchCache.containsKey(batchId)) {
+      val metadata = batchCache.get(batchId)
+      assert(metadata != null)
+      return Some(metadata)
+    }
+
     try {
       applyFnToBatchByStream(batchId) { input => Some(deserialize(input)) }
     } catch {
@@ -135,9 +157,10 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
    * NOTE: This no longer fails early on corruption. The caller should handle the exception
    * properly and make sure the logic is not affected by failing in the middle.
    */
-  def applyFnToBatchByStream[RET](batchId: Long)(fn: InputStream => RET): RET = {
+  def applyFnToBatchByStream[RET](
+      batchId: Long, skipExistingCheck: Boolean = false)(fn: InputStream => RET): RET = {
     val batchMetadataFile = batchIdToPath(batchId)
-    if (fileManager.exists(batchMetadataFile)) {
+    if (skipExistingCheck || fileManager.exists(batchMetadataFile)) {
       val input = fileManager.open(batchMetadataFile)
       try {
         fn(input)
@@ -168,7 +191,13 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
    * valid behavior, we still need to prevent it from destroying the files.
    */
   def addNewBatchByStream(batchId: Long)(fn: OutputStream => Unit): Boolean = {
-    get(batchId).map(_ => false).getOrElse {
+
+    val batchMetadataFile = batchIdToPath(batchId)
+
+    if ((metadataCacheEnabled && batchCache.containsKey(batchId))
+      || fileManager.exists(batchMetadataFile)) {
+      false
+    } else {
       // Only write metadata when the batch has not yet been written
       val output = fileManager.createAtomic(batchIdToPath(batchId), overwriteIfPossible = false)
       try {
@@ -188,42 +217,32 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
     }
   }
 
+  private def getExistingBatch(batchId: Long): T = {
+    val metadata = batchCache.get(batchId)
+    if (metadata == null) {
+      applyFnToBatchByStream(batchId, skipExistingCheck = true) { input => deserialize(input) }
+    } else {
+      metadata
+    }
+  }
+
   override def get(startId: Option[Long], endId: Option[Long]): Array[(Long, T)] = {
     assert(startId.isEmpty || endId.isEmpty || startId.get <= endId.get)
-    val files = fileManager.list(metadataPath, batchFilesFilter)
-    val batchIds = files
-      .map(f => pathToBatchId(f.getPath))
-      .filter { batchId =>
-        (endId.isEmpty || batchId <= endId.get) && (startId.isEmpty || batchId >= startId.get)
+    val batchIds = listBatches.filter { batchId =>
+      (endId.isEmpty || batchId <= endId.get) && (startId.isEmpty || batchId >= startId.get)
     }.sorted
 
     HDFSMetadataLog.verifyBatchIds(batchIds, startId, endId)
-
-    batchIds.map(batchId => (batchId, get(batchId))).filter(_._2.isDefined).map {
-      case (batchId, metadataOption) =>
-        (batchId, metadataOption.get)
-    }
+    batchIds.map(batchId => (batchId, getExistingBatch(batchId)))
   }
 
-  /**
-   * Return the latest batch Id without reading the file. This method only checks for existence of
-   * file to avoid cost on reading and deserializing log file.
-   */
-  def getLatestBatchId(): Option[Long] = {
-    fileManager.list(metadataPath, batchFilesFilter)
-      .map(f => pathToBatchId(f.getPath))
-      .sorted(Ordering.Long.reverse)
-      .headOption
-  }
+  /** Return the latest batch id without reading the file. */
+  def getLatestBatchId(): Option[Long] = listBatches.sorted.lastOption
 
   override def getLatest(): Option[(Long, T)] = {
-    getLatestBatchId().map { batchId =>
-      val content = get(batchId).getOrElse {
-        // If we find the last batch file, we must read that file, other than failing back to
-        // old batches.
-        throw new IllegalStateException(s"failed to read log file for batch $batchId")
-      }
-      (batchId, content)
+    listBatches.sorted.lastOption.map { batchId =>
+      logInfo(s"Getting latest batch $batchId")
+      (batchId, getExistingBatch(batchId))
     }
   }
 
@@ -250,16 +269,15 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
       possibleTargetBatchIds.foreach { batchId =>
         val path = batchIdToPath(batchId)
         fileManager.delete(path)
+        if (metadataCacheEnabled) batchCache.remove(batchId)
         logTrace(s"Removed metadata log file: $path")
       }
     } else {
       // using list to retrieve all elements
-      val batchIds = fileManager.list(metadataPath, batchFilesFilter)
-        .map(f => pathToBatchId(f.getPath))
-
-      for (batchId <- batchIds if batchId < thresholdBatchId) {
+      for (batchId <- listBatches if batchId < thresholdBatchId) {
         val path = batchIdToPath(batchId)
         fileManager.delete(path)
+        if (metadataCacheEnabled) batchCache.remove(batchId)
         logTrace(s"Removed metadata log file: $path")
       }
     }
@@ -277,10 +295,34 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
     for (batchId <- batchIds if batchId > thresholdBatchId) {
       val path = batchIdToPath(batchId)
       fileManager.delete(path)
+      if (metadataCacheEnabled) batchCache.remove(batchId)
       logTrace(s"Removed metadata log file: $path")
     }
   }
 
+
+  /**
+   * List the available batches on file system. As a workaround for S3 inconsistent list, it also
+   * tries to take `batchCache` into consideration to infer a better answer.
+   */
+  protected def listBatches: Array[Long] = {
+    val batchIds = fileManager.list(metadataPath, batchFilesFilter)
+      .map(f => pathToBatchId(f.getPath)) ++
+      // Iterate over keySet is not thread safe. We call `toArray` to make a copy in the lock to
+      // elimiate the race condition.
+      batchCache.synchronized {
+        batchCache.keySet.asScala.toArray
+      }
+    logInfo("BatchIds found from listing: " + batchIds.sorted.mkString(", "))
+
+    if (batchIds.isEmpty) {
+      Array.empty
+    } else {
+      // Assume batch ids are continuous
+      (batchIds.min to batchIds.max).toArray
+    }
+  }
+
   private[sql] def validateVersion(text: String, maxSupportedVersion: Int): Int =
     MetadataVersionUtil.validateVersion(text, maxSupportedVersion)
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala
index 7f00717ea4d..5646f61440e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala
@@ -18,7 +18,6 @@
 package org.apache.spark.sql.execution.streaming
 
 
-import java.{util => ju}
 import java.io.{InputStream, OutputStream}
 import java.nio.charset.StandardCharsets._
 
@@ -47,23 +46,6 @@ import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2}
 class OffsetSeqLog(sparkSession: SparkSession, path: String)
   extends HDFSMetadataLog[OffsetSeq](sparkSession, path) {
 
-  private val cachedMetadata = new ju.TreeMap[Long, OffsetSeq]()
-
-  override def add(batchId: Long, metadata: OffsetSeq): Boolean = {
-    val added = super.add(batchId, metadata)
-    if (added) {
-      // cache metadata as it will be read again
-      cachedMetadata.put(batchId, metadata)
-      // we don't access metadata for (batchId - 2) batches; evict them
-      cachedMetadata.headMap(batchId - 2, true).clear()
-    }
-    added
-  }
-
-  override def get(batchId: Long): Option[OffsetSeq] = {
-    Option(cachedMetadata.get(batchId)).orElse(super.get(batchId))
-  }
-
   override protected def deserialize(in: InputStream): OffsetSeq = {
     // called inside a try-finally where the underlying stream is closed in the caller
     def parseOffset(value: String): OffsetV2 = value match {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org