You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "siying (via GitHub)" <gi...@apache.org> on 2023/05/09 20:21:48 UTC

[GitHub] [spark] siying commented on a diff in pull request #41099: [SPARK-43421][SS] Implement Changelog based Checkpointing for RocksDB State Store Provider

siying commented on code in PR #41099:
URL: https://github.com/apache/spark/pull/41099#discussion_r1189073051


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.io.{DataInputStream, DataOutputStream, FileNotFoundException, IOException}
+
+import scala.util.control.NonFatal
+
+import com.google.common.io.ByteStreams
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.fs.{FSError, Path}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.sql.execution.streaming.CheckpointFileManager
+import org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream
+
+/**
+ * Write changes to the key value state store instance to a changelog file.
+ * There are 2 types of records, put and delete.
+ * A put record is written as: | key length | key content | value length | value content |
+ * A delete record is written as: | key length | key content | -1 |
+ * Write an Int -1 to signal the end of file.
+ * The overall changelog format is: | put record | delete record | ... | put record | -1 |
+ */
+class StateStoreChangelogWriter(fm: CheckpointFileManager, file: Path,
+                                compressionCodec: CompressionCodec) extends Logging {
+
+  private def compressStream(outputStream: DataOutputStream): DataOutputStream = {
+    val compressed = compressionCodec.compressedOutputStream(outputStream)
+    new DataOutputStream(compressed)
+  }
+
+  private var backingFileStream: CancellableFSDataOutputStream =
+    fm.createAtomic(file, overwriteIfPossible = true)
+  private var compressedStream: DataOutputStream = compressStream(backingFileStream)
+  var size = 0
+
+  def put(key: Array[Byte], value: Array[Byte]): Unit = {
+    assert(compressedStream != null)
+    compressedStream.writeInt(key.size)
+    compressedStream.write(key)
+    compressedStream.writeInt(value.size)
+    compressedStream.write(value)
+    size += 1
+  }
+
+  def delete(key: Array[Byte]): Unit = {
+    assert(compressedStream != null)
+    compressedStream.writeInt(key.size)
+    compressedStream.write(key)
+    // -1 in the value field means record deletion.
+    compressedStream.writeInt(-1)
+    size += 1
+  }
+
+  def abort(): Unit = {
+    try {
+      if (backingFileStream != null) backingFileStream.cancel()
+      if (compressedStream != null) IOUtils.closeQuietly(compressedStream)
+    } catch {
+      // Closing the compressedStream causes the stream to write/flush flush data into the
+      // rawStream. Since the rawStream is already closed, there may be errors.
+      // Usually its an IOException. However, Hadoop's RawLocalFileSystem wraps
+      // IOException into FSError.
+      case e: FSError if e.getCause.isInstanceOf[IOException] =>
+      case NonFatal(ex) =>
+        logInfo(s"Failed to cancel changelog file $file for state store provider " +
+          s"with exception=$ex")
+    } finally {
+      backingFileStream = null
+      compressedStream = null
+    }
+  }
+
+  def commit(): Unit = {
+    try {
+      // -1 in the key length field mean EOF.
+      compressedStream.writeInt(-1)
+      compressedStream.close()
+    } catch {
+      case e: Throwable =>
+        abort()
+        throw e
+    } finally {
+      backingFileStream = null
+      compressedStream = null
+    }
+  }
+}
+
+
+/**
+ * Read an iterator of change record from the changelog file.
+ * A record is represented by ByteArrayPair(key: Array[Byte], value: Array[Byte])
+ * A put record is returned as a ByteArrayPair(key, value)
+ * A delete record is return as a ByteArrayPair(key, null)
+ */
+class StateStoreChangelogReader(fm: CheckpointFileManager, fileToRead: Path,
+                      compressionCodec: CompressionCodec)
+  extends Iterator[ByteArrayPair] with Logging {
+
+  private def decompressStream(inputStream: DataInputStream): DataInputStream = {
+    val compressed = compressionCodec.compressedInputStream(inputStream)
+    new DataInputStream(compressed)
+  }
+
+  private val sourceStream = try {
+    fm.open(fileToRead)
+  } catch {
+    case f: FileNotFoundException =>
+      throw new IllegalStateException(
+        s"Error reading streaming state file of $this: $fileToRead does not exist. " +
+          s"If the stream job is restarted with a new or updated state operation, please" +
+          s" create a new checkpoint location or clear the existing checkpoint location.", f)
+  }
+  private val input: DataInputStream = decompressStream(sourceStream)
+  // A buffer that hold the next record to return.
+  private var byteArrayPair: ByteArrayPair = null
+  private var eof = false
+
+  override def hasNext: Boolean = {
+    maybeReadNext()
+    byteArrayPair != null
+  }
+
+  override def next(): ByteArrayPair = {
+    maybeReadNext()
+    val nextByteArrayPair = byteArrayPair
+    byteArrayPair = null
+    nextByteArrayPair
+  }
+
+  def close(): Unit = { if (input != null) input.close() }
+
+  private def maybeReadNext(): Unit = {
+    if (!eof && byteArrayPair == null) {
+      val keySize = input.readInt()
+      // A -1 key size mean end of file.
+      if (keySize == -1) {
+        eof = true
+      } else if (keySize < 0) {
+        throw new IOException(
+          s"Error reading streaming state file $fileToRead of $this: key size cannot be $keySize")
+      } else {
+        val keyBuffer = new Array[Byte](keySize)

Review Comment:
   We don't have to do it in this PR, but ideally keyBuffer and valueBuffer (and perhaps byteArrayPair) can be reused across record.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -334,25 +373,59 @@ class RocksDB(
         loadedVersion = -1  // invalidate loaded version
         throw t
     } finally {
-      if (rocksDBBackgroundThreadPaused) db.continueBackgroundWork()
-      silentDeleteRecursively(checkpointDir, s"committing $newVersion")
       // reset resources as either 1) we already pushed the changes and it has been committed or
       // 2) commit has failed and the current version is "invalidated".
       release()
     }
   }
 
+  private def shouldCreateSnapshot(): Boolean = {
+    if (enableChangelogCheckpointing) {
+      assert(changelogWriter.isDefined)
+      val newVersion = loadedVersion + 1
+      newVersion - fileManager.getLastUploadedSnapshotVersion() >= conf.minDeltasForSnapshot ||
+        changelogWriter.get.size > 1000
+    } else true
+  }
+
+  private def uploadSnapshot(): Unit = {
+    val localCheckpoint = synchronized {
+      val checkpoint = latestCheckpoint
+      latestCheckpoint = None
+      checkpoint
+    }
+    localCheckpoint match {
+      case Some(RocksDBCheckpoint(localDir, version, numKeys)) =>
+        try {
+          val uploadTime = timeTakenMs {
+            fileManager.saveCheckpointToDfs(localDir, version, numKeys)
+            fileManagerMetrics = fileManager.latestSaveCheckpointMetrics
+          }
+          logInfo(s"Upload snapshot of version $version, time taken: $uploadTime ms")
+        } finally {
+          localCheckpoint.foreach(_.close())
+        }
+      case _ =>
+    }
+  }
+
   /**
    * Drop uncommitted changes, and roll back to previous version.
    */
   def rollback(): Unit = {
     numKeysOnWritingVersion = numKeysOnLoadedVersion
     loadedVersion = -1L
+    changelogWriter.foreach(_.abort())
+    // Make sure changelogWriter gets recreated next time.
+    changelogWriter = None
     release()
     logInfo(s"Rolled back to $loadedVersion")
   }
 
   def cleanup(): Unit = {
+    if (enableChangelogCheckpointing) {
+      uploadSnapshot()
+    }

Review Comment:
   With this new logic here, should we still call the function cleanup()?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -334,25 +373,59 @@ class RocksDB(
         loadedVersion = -1  // invalidate loaded version
         throw t
     } finally {
-      if (rocksDBBackgroundThreadPaused) db.continueBackgroundWork()
-      silentDeleteRecursively(checkpointDir, s"committing $newVersion")
       // reset resources as either 1) we already pushed the changes and it has been committed or
       // 2) commit has failed and the current version is "invalidated".
       release()
     }
   }
 
+  private def shouldCreateSnapshot(): Boolean = {
+    if (enableChangelogCheckpointing) {
+      assert(changelogWriter.isDefined)
+      val newVersion = loadedVersion + 1
+      newVersion - fileManager.getLastUploadedSnapshotVersion() >= conf.minDeltasForSnapshot ||
+        changelogWriter.get.size > 1000

Review Comment:
   1000 entries feel relatively small to trigger a snapshop upload. Consider to remove the limit, or make it very large, such as 1 million.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -334,25 +373,59 @@ class RocksDB(
         loadedVersion = -1  // invalidate loaded version
         throw t
     } finally {
-      if (rocksDBBackgroundThreadPaused) db.continueBackgroundWork()
-      silentDeleteRecursively(checkpointDir, s"committing $newVersion")
       // reset resources as either 1) we already pushed the changes and it has been committed or
       // 2) commit has failed and the current version is "invalidated".
       release()
     }
   }
 
+  private def shouldCreateSnapshot(): Boolean = {
+    if (enableChangelogCheckpointing) {
+      assert(changelogWriter.isDefined)
+      val newVersion = loadedVersion + 1
+      newVersion - fileManager.getLastUploadedSnapshotVersion() >= conf.minDeltasForSnapshot ||

Review Comment:
   We are using uploaded version here to determine whether we need to create a new checkpoint. Is it possible that between a checkpoint created to the checkpoint is uploaded, we are creating a new checkpoint for each commit?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org