You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2020/05/22 23:47:16 UTC

[spark] branch master updated: [SPARK-30915][SS] CompactibleFileStreamLog: Avoid reading the metadata log file when finding the latest batch ID

This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 5a258b0  [SPARK-30915][SS] CompactibleFileStreamLog: Avoid reading the metadata log file when finding the latest batch ID
5a258b0 is described below

commit 5a258b0b67ee7c97a90d8b719c7a2171707c9244
Author: Jungtaek Lim (HeartSaVioR) <ka...@gmail.com>
AuthorDate: Fri May 22 16:46:17 2020 -0700

    [SPARK-30915][SS] CompactibleFileStreamLog: Avoid reading the metadata log file when finding the latest batch ID
    
    ### What changes were proposed in this pull request?
    
    This patch adds the new method `getLatestBatchId()` in CompactibleFileStreamLog in complement of getLatest() which doesn't read the content of the latest batch metadata log file, and apply to both FileStreamSource and FileStreamSink to avoid unnecessary latency on reading log file.
    
    ### Why are the changes needed?
    
    Once compacted metadata log file becomes huge, writing outputs for the compact + 1 batch is also affected due to unnecessarily reading the compacted metadata log file. This unnecessary latency can be simply avoided.
    
    ### Does this PR introduce any user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    New UT. Also manually tested under query which has huge metadata log on file stream sink:
    
    > before applying the patch
    
    ![Screen Shot 2020-02-21 at 4 20 19 PM](https://user-images.githubusercontent.com/1317309/75016223-d3ffb180-54cd-11ea-9063-49405943049d.png)
    
    > after applying the patch
    
    ![Screen Shot 2020-02-21 at 4 06 18 PM](https://user-images.githubusercontent.com/1317309/75016220-d235ee00-54cd-11ea-81a7-7c03a43c4db4.png)
    
    Peaks are compact batches - please compare the next batch after compact batches, especially the area of "light brown".
    
    Closes #27664 from HeartSaVioR/SPARK-30915.
    
    Authored-by: Jungtaek Lim (HeartSaVioR) <ka...@gmail.com>
    Signed-off-by: Shixiong Zhu <zs...@gmail.com>
---
 .../streaming/CompactibleFileStreamLog.scala       |  2 +-
 .../sql/execution/streaming/FileStreamSink.scala   |  2 +-
 .../execution/streaming/FileStreamSourceLog.scala  |  2 +-
 .../sql/execution/streaming/HDFSMetadataLog.scala  | 23 ++++--
 .../streaming/FileStreamSinkLogSuite.scala         | 83 ++++++++++++++++++++++
 5 files changed, 102 insertions(+), 10 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
index 10bcfe6..e8ae0ea 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
@@ -213,7 +213,7 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](
    * Returns all files except the deleted ones.
    */
   def allFiles(): Array[T] = {
-    var latestId = getLatest().map(_._1).getOrElse(-1L)
+    var latestId = getLatestBatchId().getOrElse(-1L)
     // There is a race condition when `FileStreamSink` is deleting old files and `StreamFileIndex`
     // is calling this method. This loop will retry the reading to deal with the
     // race condition.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
index b679f16..3224547 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
@@ -142,7 +142,7 @@ class FileStreamSink(
   }
 
   override def addBatch(batchId: Long, data: DataFrame): Unit = {
-    if (batchId <= fileLog.getLatest().map(_._1).getOrElse(-1L)) {
+    if (batchId <= fileLog.getLatestBatchId().getOrElse(-1L)) {
       logInfo(s"Skipping already committed batch $batchId")
     } else {
       val committer = FileCommitProtocol.instantiate(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala
index 7b2ea96..c438877 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala
@@ -96,7 +96,7 @@ class FileStreamSourceLog(
     val searchKeys = removedBatches.map(_._1)
     val retrievedBatches = if (searchKeys.nonEmpty) {
       logWarning(s"Get batches from removed files, this is unexpected in the current code path!!!")
-      val latestBatchId = getLatest().map(_._1).getOrElse(-1L)
+      val latestBatchId = getLatestBatchId().getOrElse(-1L)
       if (latestBatchId < 0) {
         Map.empty[Long, Option[Array[FileEntry]]]
       } else {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
index ed0c44d..5c86f8a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -182,17 +182,26 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
     }
   }
 
-  override def getLatest(): Option[(Long, T)] = {
-    val batchIds = fileManager.list(metadataPath, batchFilesFilter)
+  /**
+   * Return the latest batch Id without reading the file. This method only checks for existence of
+   * file to avoid cost on reading and deserializing log file.
+   */
+  def getLatestBatchId(): Option[Long] = {
+    fileManager.list(metadataPath, batchFilesFilter)
       .map(f => pathToBatchId(f.getPath))
       .sorted(Ordering.Long.reverse)
-    for (batchId <- batchIds) {
-      val batch = get(batchId)
-      if (batch.isDefined) {
-        return Some((batchId, batch.get))
+      .headOption
+  }
+
+  override def getLatest(): Option[(Long, T)] = {
+    getLatestBatchId().map { batchId =>
+      val content = get(batchId).getOrElse {
+        // If we find the last batch file, we must read that file, other than failing back to
+        // old batches.
+        throw new IllegalStateException(s"failed to read log file for batch $batchId")
       }
+      (batchId, content)
     }
-    None
   }
 
   /**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
index f95daaf..6d615b5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
@@ -18,7 +18,15 @@
 package org.apache.spark.sql.execution.streaming
 
 import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
+import java.lang.{Long => JLong}
+import java.net.URI
 import java.nio.charset.StandardCharsets.UTF_8
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicLong
+
+import scala.util.Random
+
+import org.apache.hadoop.fs.{FSDataInputStream, Path, RawLocalFileSystem}
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.internal.SQLConf
@@ -240,6 +248,44 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession {
     ))
   }
 
+  test("getLatestBatchId") {
+    withCountOpenLocalFileSystemAsLocalFileSystem {
+      val scheme = CountOpenLocalFileSystem.scheme
+      withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
+        withTempDir { dir =>
+          val sinkLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, spark,
+            s"$scheme:///${dir.getCanonicalPath}")
+          for (batchId <- 0L to 2L) {
+            sinkLog.add(
+              batchId,
+              Array(newFakeSinkFileStatus("/a/b/" + batchId, FileStreamSinkLog.ADD_ACTION)))
+          }
+
+          def getCountForOpenOnMetadataFile(batchId: Long): Long = {
+            val path = sinkLog.batchIdToPath(batchId).toUri.getPath
+            CountOpenLocalFileSystem.pathToNumOpenCalled.getOrDefault(path, 0L)
+          }
+
+          CountOpenLocalFileSystem.resetCount()
+
+          assert(sinkLog.getLatestBatchId() === Some(2L))
+          // getLatestBatchId doesn't open the latest metadata log file
+          (0L to 2L).foreach { batchId =>
+            assert(getCountForOpenOnMetadataFile(batchId) === 0L)
+          }
+
+          assert(sinkLog.getLatest().map(_._1).getOrElse(-1L) === 2L)
+          (0L to 1L).foreach { batchId =>
+            assert(getCountForOpenOnMetadataFile(batchId) === 0L)
+          }
+          // getLatest opens the latest metadata log file, which explains the needs on
+          // having "getLatestBatchId".
+          assert(getCountForOpenOnMetadataFile(2L) === 1L)
+        }
+      }
+    }
+  }
+
   /**
    * Create a fake SinkFileStatus using path and action. Most of tests don't care about other fields
    * in SinkFileStatus.
@@ -267,4 +313,41 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession {
     val log = new FileStreamSinkLog(FileStreamSinkLog.VERSION, spark, input.toString)
     log.allFiles()
   }
+
+  private def withCountOpenLocalFileSystemAsLocalFileSystem(body: => Unit): Unit = {
+    val optionKey = s"fs.${CountOpenLocalFileSystem.scheme}.impl"
+    val originClassForLocalFileSystem = spark.conf.getOption(optionKey)
+    try {
+      spark.conf.set(optionKey, classOf[CountOpenLocalFileSystem].getName)
+      body
+    } finally {
+      originClassForLocalFileSystem match {
+        case Some(fsClazz) => spark.conf.set(optionKey, fsClazz)
+        case _ => spark.conf.unset(optionKey)
+      }
+    }
+  }
+}
+
+class CountOpenLocalFileSystem extends RawLocalFileSystem {
+  import CountOpenLocalFileSystem._
+
+  override def getUri: URI = {
+    URI.create(s"$scheme:///")
+  }
+
+  override def open(f: Path, bufferSize: Int): FSDataInputStream = {
+    val path = f.toUri.getPath
+    pathToNumOpenCalled.compute(path, (_, v) => {
+      if (v == null) 1L else v + 1
+    })
+    super.open(f, bufferSize)
+  }
+}
+
+object CountOpenLocalFileSystem {
+  val scheme = s"FileStreamSinkLogSuite${math.abs(Random.nextInt)}fs"
+  val pathToNumOpenCalled = new ConcurrentHashMap[String, JLong]
+
+  def resetCount(): Unit = pathToNumOpenCalled.clear()
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org