You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2021/04/21 10:03:00 UTC

[GitHub] [spark] xuanyuanking opened a new pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

xuanyuanking opened a new pull request #32272:
URL: https://github.com/apache/spark/pull/32272


   ### What changes were proposed in this pull request?
   Initial implementation of RocksDBCheckpointMetadata. It persists the metadata for RocksDBFileManager.
   
   ### Why are the changes needed?
   The RocksDBCheckpointMetadata persists the metadata for each committed batch in JSON format. The object contains all RocksDB file names and the number of total keys.
   The metadata binds closely with the directory structure of RocksDBFileManager, as described in the design doc - [Directory Structure and Format for Files stored in DFS](https://docs.google.com/document/d/10wVGaUorgPt4iVe4phunAcjU924fa3-_Kf29-2nxH6Y/edit#heading=h.zgvw85ijoz2).
   
   
   ### Does this PR introduce _any_ user-facing change?
   No. Internal implementation only.
   
   
   ### How was this patch tested?
   New UT added.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a change in pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
viirya commented on a change in pull request #32272:
URL: https://github.com/apache/spark/pull/32272#discussion_r618074831



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.io.File
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.Files
+
+import scala.collection.Seq
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper}
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
+
+/**
+ * Classes to represent metadata of checkpoints saved to DFS. Since this is converted to JSON, any
+ * changes to this MUST be backward-compatible.
+ */
+case class RocksDBCheckpointMetadata(
+    sstFiles: Seq[RocksDBSstFile],
+    logFiles: Seq[RocksDBLogFile],
+    numKeys: Long) {
+  import RocksDBCheckpointMetadata._
+
+  def json: String = {
+    // We turn this field into a null to avoid write a empty logFiles field in the json.
+    val nullified = if (logFiles.isEmpty) this.copy(logFiles = null) else this

Review comment:
       why we need to avoid it?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #32272:
URL: https://github.com/apache/spark/pull/32272#issuecomment-849534792


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/43535/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32272:
URL: https://github.com/apache/spark/pull/32272#issuecomment-823944469


   **[Test build #137727 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/137727/testReport)** for PR 32272 at commit [`7ce24ce`](https://github.com/apache/spark/commit/7ce24cec183224eb49c8801a2a756262310e9752).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on pull request #32272:
URL: https://github.com/apache/spark/pull/32272#issuecomment-849655811


   Jenkins passed. Thanks! Merging to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a change in pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
viirya commented on a change in pull request #32272:
URL: https://github.com/apache/spark/pull/32272#discussion_r638140977



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.io.File
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.Files
+
+import scala.collection.Seq
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper}
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
+
+/**
+ * Classes to represent metadata of checkpoints saved to DFS. Since this is converted to JSON, any
+ * changes to this MUST be backward-compatible.
+ */
+case class RocksDBCheckpointMetadata(
+    sstFiles: Seq[RocksDBSstFile],
+    logFiles: Seq[RocksDBLogFile],
+    numKeys: Long) {
+  import RocksDBCheckpointMetadata._
+
+  def json: String = {
+    // We turn this field into a null to avoid write a empty logFiles field in the json.
+    val nullified = if (logFiles.isEmpty) this.copy(logFiles = null) else this
+    mapper.writeValueAsString(nullified)
+  }
+
+  def prettyJson: String = Serialization.writePretty(this)(RocksDBCheckpointMetadata.format)
+
+  def writeToFile(metadataFile: File): Unit = {
+    val writer = Files.newBufferedWriter(metadataFile.toPath, UTF_8)
+    try {
+      writer.write(s"v$VERSION\n")
+      writer.write(this.json)
+    } finally {
+      writer.close()
+    }
+  }
+
+  def immutableFiles: Seq[RocksDBImmutableFile] = sstFiles ++ logFiles
+}
+
+/** Helper class for [[RocksDBCheckpointMetadata]] */
+object RocksDBCheckpointMetadata {
+  val VERSION = 1
+
+  implicit val format = Serialization.formats(NoTypeHints)
+
+  /** Used to convert between classes and JSON. */
+  lazy val mapper = {
+    val _mapper = new ObjectMapper with ScalaObjectMapper
+    _mapper.setSerializationInclusion(Include.NON_ABSENT)
+    _mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+    _mapper.registerModule(DefaultScalaModule)
+    _mapper
+  }
+
+  def readFromFile(metadataFile: File): RocksDBCheckpointMetadata = {
+    val reader = Files.newBufferedReader(metadataFile.toPath, UTF_8)
+    try {
+      val versionLine = reader.readLine()
+      if (versionLine != s"v$VERSION") {
+        throw new IllegalStateException(
+          s"Cannot read RocksDB checkpoint metadata of version $versionLine")
+      }
+      Serialization.read[RocksDBCheckpointMetadata](reader)
+    } finally {
+      reader.close()
+    }
+  }
+
+  def apply(rocksDBFiles: Seq[RocksDBImmutableFile], numKeys: Long): RocksDBCheckpointMetadata = {
+    val sstFiles = rocksDBFiles.collect { case file: RocksDBSstFile => file }
+    val logFiles = rocksDBFiles.collect { case file: RocksDBLogFile => file }
+
+    RocksDBCheckpointMetadata(sstFiles, logFiles, numKeys)
+  }
+}
+
+/**
+ * A RocksDBImmutableFile maintains a mapping between a local RocksDB file name and the name of
+ * its copy on DFS. Since these files are immutable, their DFS copies can be reused.
+ */
+sealed trait RocksDBImmutableFile {
+  def localFileName: String
+  def dfsFileName: String
+  def sizeBytes: Long
+
+  /**
+   * Whether another local file is same as the file described by this class.
+   * A file is same only when the name and the size are same.
+   */
+  def isSameFile(otherFile: File): Boolean = {
+    otherFile.getName == localFileName && otherFile.length() == sizeBytes
+  }
+}
+
+/**
+ * Class to represent a RocksDB SST file. Since this is converted to JSON,
+ * any changes to these MUST be backward-compatible.
+ */
+private[sql] case class RocksDBSstFile(
+    localFileName: String,
+    dfsSstFileName: String,
+    sizeBytes: Long) extends RocksDBImmutableFile {
+
+  override def dfsFileName: String = dfsSstFileName
+}
+
+/**
+ * Class to represent a RocksDB Log file. Since this is converted to JSON,
+ * any changes to these MUST be backward-compatible.
+ */
+private[sql] case class RocksDBLogFile(
+    localFileName: String,
+    dfsLogFileName: String,
+    sizeBytes: Long) extends RocksDBImmutableFile {
+
+  override def dfsFileName: String = dfsLogFileName
+}
+
+object RocksDBImmutableFile {
+  val SST_FILES_DFS_SUBDIR = "SSTs"
+  val LOG_FILES_DFS_SUBDIR = "logs"
+  val LOG_FILES_LOCAL_SUBDIR = "archive"
+
+  def apply(localFileName: String, dfsFileName: String, sizeBytes: Long): RocksDBImmutableFile = {
+    if (isSstFile(localFileName)) {
+      RocksDBSstFile(localFileName, dfsFileName, sizeBytes)
+    } else if (isLogFile(localFileName)) {
+      RocksDBLogFile(localFileName, dfsFileName, sizeBytes)
+    } else {
+      null
+    }
+  }
+
+  def isSstFile(fileName: String): Boolean = fileName.endsWith(".sst")
+
+  def isLogFile(fileName: String): Boolean = fileName.endsWith(".log")
+
+  private def isArchivedLogFile(file: File): Boolean =
+    isLogFile(file.getName) && file.getParentFile.getName == LOG_FILES_LOCAL_SUBDIR
+
+  def isImmutableFile(file: File): Boolean = isSstFile(file.getName) || isArchivedLogFile(file)

Review comment:
       Do you mean we sync up non-archived log files in `localOtherFiles` to DFS?
   
   ```scala
   val (localImmutableFiles, localOtherFiles) = allCheckpointFiles.partition(isImmutableFile)
   ...
   zipToDfsFile(localOtherFiles :+ metadataFile, dfsBatchZipFile(version))
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #32272:
URL: https://github.com/apache/spark/pull/32272#issuecomment-840444423


   **[Test build #138507 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/138507/testReport)** for PR 32272 at commit [`3b91a26`](https://github.com/apache/spark/commit/3b91a268c356544c8b6f75e7c365e0bfa7459e67).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuanyuanking commented on pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
xuanyuanking commented on pull request #32272:
URL: https://github.com/apache/spark/pull/32272#issuecomment-830078472


   @HeartSaVioR Thanks for the advice. Comments have been resolved and yes, it makes sense to reflect them to the PRs. The current implementation for RocksDBCheckpointMetadata is the metadata files in the ${batchId}.zip


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32272:
URL: https://github.com/apache/spark/pull/32272#issuecomment-823976991


   Kubernetes integration test status failure
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42254/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32272:
URL: https://github.com/apache/spark/pull/32272#issuecomment-840468975






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #32272:
URL: https://github.com/apache/spark/pull/32272#issuecomment-840469001


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/43027/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32272:
URL: https://github.com/apache/spark/pull/32272#issuecomment-823989038






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #32272:
URL: https://github.com/apache/spark/pull/32272#issuecomment-840469001


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/43027/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuanyuanking commented on a change in pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
xuanyuanking commented on a change in pull request #32272:
URL: https://github.com/apache/spark/pull/32272#discussion_r631689200



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.io._
+import java.nio.charset.Charset
+
+import org.apache.commons.io.FileUtils
+
+import org.apache.spark._
+import org.apache.spark.util.Utils
+
+class RocksDBSuite extends SparkFunSuite {
+
+  test("checkpoint metadata serde roundtrip") {
+    def checkJsonRoundtrip(metadata: RocksDBCheckpointMetadata, json: String): Unit = {
+      assert(metadata.json == json)
+      withTempDirectory { dir =>
+        val file = new File(dir, "json")
+        FileUtils.write(file, s"v1\n$json", Charset.defaultCharset)
+        assert(metadata == RocksDBCheckpointMetadata.readFromFile(file))
+      }
+    }
+    val sstFiles = Seq(RocksDBSstFile("00001.sst", "00001-uuid.sst", 12345678901234L))
+    val logFiles = Seq(RocksDBLogFile("00001.log", "00001-uuid.log", 12345678901234L))
+
+    // scalastyle:off line.size.limit
+    // should always include sstFiles and numKeys
+    checkJsonRoundtrip(
+      RocksDBCheckpointMetadata(Seq.empty, 0L),
+      """{"sstFiles":[],"numKeys":0}"""
+    )
+    // shouldn't include the "logFiles" field in json when it's empty
+    checkJsonRoundtrip(
+      RocksDBCheckpointMetadata(sstFiles, 12345678901234L),
+      """{"sstFiles":[{"localFileName":"00001.sst","dfsSstFileName":"00001-uuid.sst","sizeBytes":12345678901234}],"numKeys":12345678901234}"""
+    )
+    checkJsonRoundtrip(
+      RocksDBCheckpointMetadata(sstFiles, logFiles, 12345678901234L),
+      """{"sstFiles":[{"localFileName":"00001.sst","dfsSstFileName":"00001-uuid.sst","sizeBytes":12345678901234}],"logFiles":[{"localFileName":"00001.log","dfsLogFileName":"00001-uuid.log","sizeBytes":12345678901234}],"numKeys":12345678901234}""")
+    // scalastyle:on line.size.limit
+  }
+
+  private def withTempDirectory(f: File => Unit): Unit = {

Review comment:
       Ah yes. Let me update.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a change in pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
viirya commented on a change in pull request #32272:
URL: https://github.com/apache/spark/pull/32272#discussion_r638128499



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.io.File
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.Files
+
+import scala.collection.Seq
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper}
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
+
+/**
+ * Classes to represent metadata of checkpoints saved to DFS. Since this is converted to JSON, any
+ * changes to this MUST be backward-compatible.
+ */
+case class RocksDBCheckpointMetadata(
+    sstFiles: Seq[RocksDBSstFile],
+    logFiles: Seq[RocksDBLogFile],
+    numKeys: Long) {
+  import RocksDBCheckpointMetadata._
+
+  def json: String = {
+    // We turn this field into a null to avoid write a empty logFiles field in the json.
+    val nullified = if (logFiles.isEmpty) this.copy(logFiles = null) else this
+    mapper.writeValueAsString(nullified)
+  }
+
+  def prettyJson: String = Serialization.writePretty(this)(RocksDBCheckpointMetadata.format)
+
+  def writeToFile(metadataFile: File): Unit = {
+    val writer = Files.newBufferedWriter(metadataFile.toPath, UTF_8)
+    try {
+      writer.write(s"v$VERSION\n")
+      writer.write(this.json)
+    } finally {
+      writer.close()
+    }
+  }
+
+  def immutableFiles: Seq[RocksDBImmutableFile] = sstFiles ++ logFiles
+}
+
+/** Helper class for [[RocksDBCheckpointMetadata]] */
+object RocksDBCheckpointMetadata {
+  val VERSION = 1
+
+  implicit val format = Serialization.formats(NoTypeHints)
+
+  /** Used to convert between classes and JSON. */
+  lazy val mapper = {
+    val _mapper = new ObjectMapper with ScalaObjectMapper
+    _mapper.setSerializationInclusion(Include.NON_ABSENT)
+    _mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+    _mapper.registerModule(DefaultScalaModule)
+    _mapper
+  }
+
+  def readFromFile(metadataFile: File): RocksDBCheckpointMetadata = {
+    val reader = Files.newBufferedReader(metadataFile.toPath, UTF_8)
+    try {
+      val versionLine = reader.readLine()
+      if (versionLine != s"v$VERSION") {
+        throw new IllegalStateException(
+          s"Cannot read RocksDB checkpoint metadata of version $versionLine")
+      }
+      Serialization.read[RocksDBCheckpointMetadata](reader)
+    } finally {
+      reader.close()
+    }
+  }
+
+  def apply(rocksDBFiles: Seq[RocksDBImmutableFile], numKeys: Long): RocksDBCheckpointMetadata = {
+    val sstFiles = rocksDBFiles.collect { case file: RocksDBSstFile => file }
+    val logFiles = rocksDBFiles.collect { case file: RocksDBLogFile => file }
+
+    RocksDBCheckpointMetadata(sstFiles, logFiles, numKeys)
+  }
+}
+
+/**
+ * A RocksDBImmutableFile maintains a mapping between a local RocksDB file name and the name of
+ * its copy on DFS. Since these files are immutable, their DFS copies can be reused.
+ */
+sealed trait RocksDBImmutableFile {
+  def localFileName: String
+  def dfsFileName: String
+  def sizeBytes: Long
+
+  /**
+   * Whether another local file is same as the file described by this class.
+   * A file is same only when the name and the size are same.
+   */
+  def isSameFile(otherFile: File): Boolean = {
+    otherFile.getName == localFileName && otherFile.length() == sizeBytes
+  }
+}
+
+/**
+ * Class to represent a RocksDB SST file. Since this is converted to JSON,
+ * any changes to these MUST be backward-compatible.
+ */
+private[sql] case class RocksDBSstFile(
+    localFileName: String,
+    dfsSstFileName: String,
+    sizeBytes: Long) extends RocksDBImmutableFile {
+
+  override def dfsFileName: String = dfsSstFileName
+}
+
+/**
+ * Class to represent a RocksDB Log file. Since this is converted to JSON,
+ * any changes to these MUST be backward-compatible.
+ */
+private[sql] case class RocksDBLogFile(
+    localFileName: String,
+    dfsLogFileName: String,
+    sizeBytes: Long) extends RocksDBImmutableFile {
+
+  override def dfsFileName: String = dfsLogFileName
+}
+
+object RocksDBImmutableFile {
+  val SST_FILES_DFS_SUBDIR = "SSTs"
+  val LOG_FILES_DFS_SUBDIR = "logs"
+  val LOG_FILES_LOCAL_SUBDIR = "archive"
+
+  def apply(localFileName: String, dfsFileName: String, sizeBytes: Long): RocksDBImmutableFile = {
+    if (isSstFile(localFileName)) {
+      RocksDBSstFile(localFileName, dfsFileName, sizeBytes)
+    } else if (isLogFile(localFileName)) {
+      RocksDBLogFile(localFileName, dfsFileName, sizeBytes)
+    } else {
+      null
+    }
+  }
+
+  def isSstFile(fileName: String): Boolean = fileName.endsWith(".sst")
+
+  def isLogFile(fileName: String): Boolean = fileName.endsWith(".log")
+
+  private def isArchivedLogFile(file: File): Boolean =
+    isLogFile(file.getName) && file.getParentFile.getName == LOG_FILES_LOCAL_SUBDIR
+
+  def isImmutableFile(file: File): Boolean = isSstFile(file.getName) || isArchivedLogFile(file)

Review comment:
       Yea, this is somehow confusing part to me. Do we have a change to touch log files before they are archived?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a change in pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
viirya commented on a change in pull request #32272:
URL: https://github.com/apache/spark/pull/32272#discussion_r632907718



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.io.File
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.Files
+
+import scala.collection.Seq
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper}
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
+
+/**
+ * Classes to represent metadata of checkpoints saved to DFS. Since this is converted to JSON, any
+ * changes to this MUST be backward-compatible.

Review comment:
       About backward-compatibility, shall we introduce version number?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on pull request #32272:
URL: https://github.com/apache/spark/pull/32272#issuecomment-849466192


   retest this, please


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #32272:
URL: https://github.com/apache/spark/pull/32272#discussion_r624384544



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.io.File
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.Files
+
+import scala.collection.Seq
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper}
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
+
+/**
+ * Classes to represent metadata of checkpoints saved to DFS. Since this is converted to JSON, any
+ * changes to this MUST be backward-compatible.
+ */
+case class RocksDBCheckpointMetadata(
+    sstFiles: Seq[RocksDBSstFile],
+    logFiles: Seq[RocksDBLogFile],
+    numKeys: Long) {
+  import RocksDBCheckpointMetadata._
+
+  def json: String = {
+    // We turn this field into a null to avoid write a empty logFiles field in the json.
+    val nullified = if (logFiles.isEmpty) this.copy(logFiles = null) else this

Review comment:
       I think the point here is excluding empty field (correct?) vs leaving empty field with `[]`. Seems like a small optimization.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuanyuanking commented on a change in pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
xuanyuanking commented on a change in pull request #32272:
URL: https://github.com/apache/spark/pull/32272#discussion_r638383948



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.io.File
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.Files
+
+import scala.collection.Seq
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper}
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
+
+/**
+ * Classes to represent metadata of checkpoints saved to DFS. Since this is converted to JSON, any
+ * changes to this MUST be backward-compatible.
+ */
+case class RocksDBCheckpointMetadata(
+    sstFiles: Seq[RocksDBSstFile],
+    logFiles: Seq[RocksDBLogFile],
+    numKeys: Long) {
+  import RocksDBCheckpointMetadata._
+
+  def json: String = {
+    // We turn this field into a null to avoid write a empty logFiles field in the json.
+    val nullified = if (logFiles.isEmpty) this.copy(logFiles = null) else this
+    mapper.writeValueAsString(nullified)
+  }
+
+  def prettyJson: String = Serialization.writePretty(this)(RocksDBCheckpointMetadata.format)
+
+  def writeToFile(metadataFile: File): Unit = {
+    val writer = Files.newBufferedWriter(metadataFile.toPath, UTF_8)
+    try {
+      writer.write(s"v$VERSION\n")
+      writer.write(this.json)
+    } finally {
+      writer.close()
+    }
+  }
+
+  def immutableFiles: Seq[RocksDBImmutableFile] = sstFiles ++ logFiles
+}
+
+/** Helper class for [[RocksDBCheckpointMetadata]] */
+object RocksDBCheckpointMetadata {
+  val VERSION = 1
+
+  implicit val format = Serialization.formats(NoTypeHints)
+
+  /** Used to convert between classes and JSON. */
+  lazy val mapper = {
+    val _mapper = new ObjectMapper with ScalaObjectMapper
+    _mapper.setSerializationInclusion(Include.NON_ABSENT)
+    _mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+    _mapper.registerModule(DefaultScalaModule)
+    _mapper
+  }
+
+  def readFromFile(metadataFile: File): RocksDBCheckpointMetadata = {
+    val reader = Files.newBufferedReader(metadataFile.toPath, UTF_8)
+    try {
+      val versionLine = reader.readLine()
+      if (versionLine != s"v$VERSION") {
+        throw new IllegalStateException(
+          s"Cannot read RocksDB checkpoint metadata of version $versionLine")
+      }
+      Serialization.read[RocksDBCheckpointMetadata](reader)
+    } finally {
+      reader.close()
+    }
+  }
+
+  def apply(rocksDBFiles: Seq[RocksDBImmutableFile], numKeys: Long): RocksDBCheckpointMetadata = {
+    val sstFiles = rocksDBFiles.collect { case file: RocksDBSstFile => file }
+    val logFiles = rocksDBFiles.collect { case file: RocksDBLogFile => file }
+
+    RocksDBCheckpointMetadata(sstFiles, logFiles, numKeys)
+  }
+}
+
+/**
+ * A RocksDBImmutableFile maintains a mapping between a local RocksDB file name and the name of
+ * its copy on DFS. Since these files are immutable, their DFS copies can be reused.
+ */
+sealed trait RocksDBImmutableFile {
+  def localFileName: String
+  def dfsFileName: String
+  def sizeBytes: Long
+
+  /**
+   * Whether another local file is same as the file described by this class.
+   * A file is same only when the name and the size are same.
+   */
+  def isSameFile(otherFile: File): Boolean = {
+    otherFile.getName == localFileName && otherFile.length() == sizeBytes
+  }
+}
+
+/**
+ * Class to represent a RocksDB SST file. Since this is converted to JSON,
+ * any changes to these MUST be backward-compatible.
+ */
+private[sql] case class RocksDBSstFile(
+    localFileName: String,
+    dfsSstFileName: String,
+    sizeBytes: Long) extends RocksDBImmutableFile {
+
+  override def dfsFileName: String = dfsSstFileName
+}
+
+/**
+ * Class to represent a RocksDB Log file. Since this is converted to JSON,
+ * any changes to these MUST be backward-compatible.
+ */
+private[sql] case class RocksDBLogFile(
+    localFileName: String,
+    dfsLogFileName: String,
+    sizeBytes: Long) extends RocksDBImmutableFile {
+
+  override def dfsFileName: String = dfsLogFileName
+}
+
+object RocksDBImmutableFile {
+  val SST_FILES_DFS_SUBDIR = "SSTs"
+  val LOG_FILES_DFS_SUBDIR = "logs"
+  val LOG_FILES_LOCAL_SUBDIR = "archive"
+
+  def apply(localFileName: String, dfsFileName: String, sizeBytes: Long): RocksDBImmutableFile = {
+    if (isSstFile(localFileName)) {
+      RocksDBSstFile(localFileName, dfsFileName, sizeBytes)
+    } else if (isLogFile(localFileName)) {
+      RocksDBLogFile(localFileName, dfsFileName, sizeBytes)
+    } else {
+      null
+    }
+  }
+
+  def isSstFile(fileName: String): Boolean = fileName.endsWith(".sst")
+
+  def isLogFile(fileName: String): Boolean = fileName.endsWith(".log")
+
+  private def isArchivedLogFile(file: File): Boolean =
+    isLogFile(file.getName) && file.getParentFile.getName == LOG_FILES_LOCAL_SUBDIR
+
+  def isImmutableFile(file: File): Boolean = isSstFile(file.getName) || isArchivedLogFile(file)

Review comment:
       ```
   Why we need to distinguish archived log files?
   ```
   
   Because we'll build a mapping between archived log files and versions. https://github.com/apache/spark/pull/32582/files#diff-e3d3914d0398d61fdd299b1f8d3e869ec6a86e97606677c724969e421c9bf44eR135
   
   ```
   Do you mean we sync up non-archived log files in localOtherFiles to DFS?
   ```
   Yes. The DFS directory structure was explained in the doc and code comment [here](https://github.com/apache/spark/pull/32582/files#diff-e3d3914d0398d61fdd299b1f8d3e869ec6a86e97606677c724969e421c9bf44eR72)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #32272:
URL: https://github.com/apache/spark/pull/32272#issuecomment-849489480


   **[Test build #139018 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/139018/testReport)** for PR 32272 at commit [`f52adac`](https://github.com/apache/spark/commit/f52adac64ae66a3d01da23bee69106bd17b1a4e7).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32272:
URL: https://github.com/apache/spark/pull/32272#issuecomment-849489480


   **[Test build #139018 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/139018/testReport)** for PR 32272 at commit [`f52adac`](https://github.com/apache/spark/commit/f52adac64ae66a3d01da23bee69106bd17b1a4e7).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuanyuanking commented on a change in pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
xuanyuanking commented on a change in pull request #32272:
URL: https://github.com/apache/spark/pull/32272#discussion_r633321491



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.io.File
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.Files
+
+import scala.collection.Seq
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper}
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
+
+/**
+ * Classes to represent metadata of checkpoints saved to DFS. Since this is converted to JSON, any
+ * changes to this MUST be backward-compatible.
+ */
+case class RocksDBCheckpointMetadata(
+    sstFiles: Seq[RocksDBSstFile],
+    logFiles: Seq[RocksDBLogFile],
+    numKeys: Long) {
+  import RocksDBCheckpointMetadata._
+
+  def json: String = {
+    // We turn this field into a null to avoid write a empty logFiles field in the json.
+    val nullified = if (logFiles.isEmpty) this.copy(logFiles = null) else this
+    mapper.writeValueAsString(nullified)
+  }
+
+  def prettyJson: String = Serialization.writePretty(this)(RocksDBCheckpointMetadata.format)
+
+  def writeToFile(metadataFile: File): Unit = {
+    val writer = Files.newBufferedWriter(metadataFile.toPath, UTF_8)
+    try {
+      writer.write(s"v$VERSION\n")
+      writer.write(this.json)
+    } finally {
+      writer.close()
+    }
+  }
+
+  def immutableFiles: Seq[RocksDBImmutableFile] = sstFiles ++ logFiles
+}
+
+/** Helper class for [[RocksDBCheckpointMetadata]] */
+object RocksDBCheckpointMetadata {
+  val VERSION = 1
+
+  implicit val format = Serialization.formats(NoTypeHints)
+
+  /** Used to convert between classes and JSON. */
+  lazy val mapper = {
+    val _mapper = new ObjectMapper with ScalaObjectMapper
+    _mapper.setSerializationInclusion(Include.NON_ABSENT)
+    _mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+    _mapper.registerModule(DefaultScalaModule)
+    _mapper
+  }
+
+  def readFromFile(metadataFile: File): RocksDBCheckpointMetadata = {
+    val reader = Files.newBufferedReader(metadataFile.toPath, UTF_8)
+    try {
+      val versionLine = reader.readLine()
+      if (versionLine != s"v$VERSION") {
+        throw new IllegalStateException(
+          s"Cannot read RocksDB checkpoint metadata of version $versionLine")
+      }
+      Serialization.read[RocksDBCheckpointMetadata](reader)
+    } finally {
+      reader.close()
+    }
+  }
+
+  def apply(rocksDBFiles: Seq[RocksDBImmutableFile], numKeys: Long): RocksDBCheckpointMetadata = {
+    val sstFiles = rocksDBFiles.collect { case file: RocksDBSstFile => file }
+    val logFiles = rocksDBFiles.collect { case file: RocksDBLogFile => file }
+
+    RocksDBCheckpointMetadata(sstFiles, logFiles, numKeys)
+  }
+}
+
+/**
+ * A RocksDBImmutableFile maintains a mapping between a local RocksDB file name and the name of
+ * its copy on DFS. Since these files are immutable, their DFS copies can be reused.
+ */
+sealed trait RocksDBImmutableFile {
+  def localFileName: String
+  def dfsFileName: String
+  def sizeBytes: Long
+
+  /**
+   * Whether another local file is same as the file described by this class.
+   * A file is same only when the name and the size are same.
+   */
+  def isSameFile(otherFile: File): Boolean = {
+    otherFile.getName == localFileName && otherFile.length() == sizeBytes
+  }
+}
+
+/**
+ * Class to represent a RocksDB SST file. Since this is converted to JSON,
+ * any changes to these MUST be backward-compatible.
+ */
+private[sql] case class RocksDBSstFile(
+    localFileName: String,
+    dfsSstFileName: String,
+    sizeBytes: Long) extends RocksDBImmutableFile {
+
+  override def dfsFileName: String = dfsSstFileName
+}
+
+/**
+ * Class to represent a RocksDB Log file. Since this is converted to JSON,
+ * any changes to these MUST be backward-compatible.
+ */
+private[sql] case class RocksDBLogFile(
+    localFileName: String,
+    dfsLogFileName: String,
+    sizeBytes: Long) extends RocksDBImmutableFile {
+
+  override def dfsFileName: String = dfsLogFileName
+}
+
+object RocksDBImmutableFile {
+  val SST_FILES_DFS_SUBDIR = "SSTs"
+  val LOG_FILES_DFS_SUBDIR = "logs"
+  val LOG_FILES_LOCAL_SUBDIR = "archive"
+
+  def apply(localFileName: String, dfsFileName: String, sizeBytes: Long): RocksDBImmutableFile = {
+    if (isSstFile(localFileName)) {
+      RocksDBSstFile(localFileName, dfsFileName, sizeBytes)
+    } else if (isLogFile(localFileName)) {
+      RocksDBLogFile(localFileName, dfsFileName, sizeBytes)
+    } else {
+      null
+    }
+  }
+
+  def isSstFile(fileName: String): Boolean = fileName.endsWith(".sst")
+
+  def isLogFile(fileName: String): Boolean = fileName.endsWith(".log")
+
+  private def isArchivedLogFile(file: File): Boolean =
+    isLogFile(file.getName) && file.getParentFile.getName == LOG_FILES_LOCAL_SUBDIR
+
+  def isImmutableFile(file: File): Boolean = isSstFile(file.getName) || isArchivedLogFile(file)

Review comment:
       That's right. This is because we only generate the RocksDBLogFile object in the operation of checkpointing. Let's keep this comment open and I'll link this with the caller side for full context. (The caller side is the next PR as my plan - the save path of RocksDBFileManager).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuanyuanking commented on pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
xuanyuanking commented on pull request #32272:
URL: https://github.com/apache/spark/pull/32272#issuecomment-842119218


   ```
   About the file name, RocksDBFileManager.scala doesn't contain any RocksDBFileManager. Shall we rename it?
   ```
   All these checkpointing metadata is for RocksDBFileManager. As my plan, the next PR is for the save path of RocksDBFileManager.
   
   ```
   I think we still need to look at how this is going to be used in the full context.
   ```
   Agree, I plan to use [this comment](https://github.com/apache/spark/pull/32272#discussion_r632908535) as a demo. In the next PR, I'll reference this comment to provide the full context.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuanyuanking commented on a change in pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
xuanyuanking commented on a change in pull request #32272:
URL: https://github.com/apache/spark/pull/32272#discussion_r638529708



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.io.File
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.Files
+
+import scala.collection.Seq
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper}
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
+
+/**
+ * Classes to represent metadata of checkpoints saved to DFS. Since this is converted to JSON, any
+ * changes to this MUST be backward-compatible.
+ */
+case class RocksDBCheckpointMetadata(
+    sstFiles: Seq[RocksDBSstFile],
+    logFiles: Seq[RocksDBLogFile],
+    numKeys: Long) {
+  import RocksDBCheckpointMetadata._
+
+  def json: String = {
+    // We turn this field into a null to avoid write a empty logFiles field in the json.
+    val nullified = if (logFiles.isEmpty) this.copy(logFiles = null) else this
+    mapper.writeValueAsString(nullified)
+  }
+
+  def prettyJson: String = Serialization.writePretty(this)(RocksDBCheckpointMetadata.format)
+
+  def writeToFile(metadataFile: File): Unit = {
+    val writer = Files.newBufferedWriter(metadataFile.toPath, UTF_8)
+    try {
+      writer.write(s"v$VERSION\n")
+      writer.write(this.json)
+    } finally {
+      writer.close()
+    }
+  }
+
+  def immutableFiles: Seq[RocksDBImmutableFile] = sstFiles ++ logFiles
+}
+
+/** Helper class for [[RocksDBCheckpointMetadata]] */
+object RocksDBCheckpointMetadata {
+  val VERSION = 1
+
+  implicit val format = Serialization.formats(NoTypeHints)
+
+  /** Used to convert between classes and JSON. */
+  lazy val mapper = {
+    val _mapper = new ObjectMapper with ScalaObjectMapper
+    _mapper.setSerializationInclusion(Include.NON_ABSENT)
+    _mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+    _mapper.registerModule(DefaultScalaModule)
+    _mapper
+  }
+
+  def readFromFile(metadataFile: File): RocksDBCheckpointMetadata = {
+    val reader = Files.newBufferedReader(metadataFile.toPath, UTF_8)
+    try {
+      val versionLine = reader.readLine()
+      if (versionLine != s"v$VERSION") {
+        throw new IllegalStateException(
+          s"Cannot read RocksDB checkpoint metadata of version $versionLine")
+      }
+      Serialization.read[RocksDBCheckpointMetadata](reader)
+    } finally {
+      reader.close()
+    }
+  }
+
+  def apply(rocksDBFiles: Seq[RocksDBImmutableFile], numKeys: Long): RocksDBCheckpointMetadata = {
+    val sstFiles = rocksDBFiles.collect { case file: RocksDBSstFile => file }
+    val logFiles = rocksDBFiles.collect { case file: RocksDBLogFile => file }
+
+    RocksDBCheckpointMetadata(sstFiles, logFiles, numKeys)
+  }
+}
+
+/**
+ * A RocksDBImmutableFile maintains a mapping between a local RocksDB file name and the name of
+ * its copy on DFS. Since these files are immutable, their DFS copies can be reused.
+ */
+sealed trait RocksDBImmutableFile {
+  def localFileName: String
+  def dfsFileName: String
+  def sizeBytes: Long
+
+  /**
+   * Whether another local file is same as the file described by this class.
+   * A file is same only when the name and the size are same.
+   */
+  def isSameFile(otherFile: File): Boolean = {
+    otherFile.getName == localFileName && otherFile.length() == sizeBytes
+  }
+}
+
+/**
+ * Class to represent a RocksDB SST file. Since this is converted to JSON,
+ * any changes to these MUST be backward-compatible.
+ */
+private[sql] case class RocksDBSstFile(
+    localFileName: String,
+    dfsSstFileName: String,
+    sizeBytes: Long) extends RocksDBImmutableFile {
+
+  override def dfsFileName: String = dfsSstFileName
+}
+
+/**
+ * Class to represent a RocksDB Log file. Since this is converted to JSON,
+ * any changes to these MUST be backward-compatible.
+ */
+private[sql] case class RocksDBLogFile(
+    localFileName: String,
+    dfsLogFileName: String,
+    sizeBytes: Long) extends RocksDBImmutableFile {
+
+  override def dfsFileName: String = dfsLogFileName
+}
+
+object RocksDBImmutableFile {
+  val SST_FILES_DFS_SUBDIR = "SSTs"
+  val LOG_FILES_DFS_SUBDIR = "logs"
+  val LOG_FILES_LOCAL_SUBDIR = "archive"
+
+  def apply(localFileName: String, dfsFileName: String, sizeBytes: Long): RocksDBImmutableFile = {
+    if (isSstFile(localFileName)) {
+      RocksDBSstFile(localFileName, dfsFileName, sizeBytes)
+    } else if (isLogFile(localFileName)) {
+      RocksDBLogFile(localFileName, dfsFileName, sizeBytes)
+    } else {
+      null
+    }
+  }
+
+  def isSstFile(fileName: String): Boolean = fileName.endsWith(".sst")
+
+  def isLogFile(fileName: String): Boolean = fileName.endsWith(".log")
+
+  private def isArchivedLogFile(file: File): Boolean =
+    isLogFile(file.getName) && file.getParentFile.getName == LOG_FILES_LOCAL_SUBDIR
+
+  def isImmutableFile(file: File): Boolean = isSstFile(file.getName) || isArchivedLogFile(file)

Review comment:
       Ah yeah. I know why you feel confused. Make sense, I'll emphasize the "sst files and log files" together in the comments to avoid making others confuse. Thanks!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #32272:
URL: https://github.com/apache/spark/pull/32272#issuecomment-823977018


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/42254/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a change in pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
viirya commented on a change in pull request #32272:
URL: https://github.com/apache/spark/pull/32272#discussion_r618076328



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.io.File
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.Files
+
+import scala.collection.Seq
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper}
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
+
+/**
+ * Classes to represent metadata of checkpoints saved to DFS. Since this is converted to JSON, any
+ * changes to this MUST be backward-compatible.
+ */
+case class RocksDBCheckpointMetadata(
+    sstFiles: Seq[RocksDBSstFile],
+    logFiles: Seq[RocksDBLogFile],
+    numKeys: Long) {
+  import RocksDBCheckpointMetadata._
+
+  def json: String = {
+    // We turn this field into a null to avoid write a empty logFiles field in the json.
+    val nullified = if (logFiles.isEmpty) this.copy(logFiles = null) else this
+    mapper.writeValueAsString(nullified)
+  }
+
+  def prettyJson: String = Serialization.writePretty(this)(RocksDBCheckpointMetadata.format)
+
+  def writeToFile(metadataFile: File): Unit = {
+    val writer = Files.newBufferedWriter(metadataFile.toPath, UTF_8)
+    try {
+      writer.write(s"v$VERSION\n")
+      writer.write(this.json)
+    } finally {
+      writer.close()
+    }
+  }
+
+  def immutableFiles: Seq[RocksDBImmutableFile] = sstFiles ++ logFiles
+}
+
+/** Helper class for [[RocksDBCheckpointMetadata]] */
+object RocksDBCheckpointMetadata {
+  val VERSION = 1
+
+  implicit val format = Serialization.formats(NoTypeHints)
+
+  /** Used to convert between classes and JSON. */
+  lazy val mapper = {
+    val _mapper = new ObjectMapper with ScalaObjectMapper
+    _mapper.setSerializationInclusion(Include.NON_ABSENT)
+    _mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+    _mapper.registerModule(DefaultScalaModule)
+    _mapper
+  }
+
+  def readFromFile(metadataFile: File): RocksDBCheckpointMetadata = {
+    val reader = Files.newBufferedReader(metadataFile.toPath, UTF_8)
+    try {
+      val versionLine = reader.readLine()
+      if (versionLine != s"v$VERSION") {
+        throw new IllegalStateException(
+          s"Cannot read RocksDB checkpoint metadata of version $versionLine")
+      }
+      Serialization.read[RocksDBCheckpointMetadata](reader)
+    } finally {
+      reader.close()
+    }
+  }
+
+  def apply(rocksDBFiles: Seq[RocksDBImmutableFile], numKeys: Long): RocksDBCheckpointMetadata = {
+    val sstFiles = rocksDBFiles.collect { case file: RocksDBSstFile => file }
+    val logFiles = rocksDBFiles.collect { case file: RocksDBLogFile => file }
+
+    RocksDBCheckpointMetadata(sstFiles, logFiles, numKeys)
+  }
+}
+
+/**
+ * A RocksDBImmutableFile maintains a mapping between a local RocksDB file name and the name of
+ * its copy on DFS. Since these files are immutable, their DFS copies can be reused.
+ */
+sealed trait RocksDBImmutableFile {
+  def localFileName: String
+  def dfsFileName: String
+  def sizeBytes: Long
+
+  /**
+   * Whether another local file is same as the file described by this class.
+   * A file is same only when the name and the size are same.
+   */
+  def isSameFile(otherFile: File): Boolean = {
+    otherFile.getName == localFileName && otherFile.length() == sizeBytes
+  }

Review comment:
       If a DFS copy can be mapped to more than one local file names, shouldn't two local file names the same one even their local file names are different, if their DFS file names are the same?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuanyuanking commented on pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
xuanyuanking commented on pull request #32272:
URL: https://github.com/apache/spark/pull/32272#issuecomment-850078509


   Thanks for the review and help!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32272:
URL: https://github.com/apache/spark/pull/32272#issuecomment-824106856


   **[Test build #137731 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/137731/testReport)** for PR 32272 at commit [`4b61526`](https://github.com/apache/spark/commit/4b61526475215e6dbc16c68c0216750685fa0a37).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a change in pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
viirya commented on a change in pull request #32272:
URL: https://github.com/apache/spark/pull/32272#discussion_r632908535



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.io.File
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.Files
+
+import scala.collection.Seq
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper}
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
+
+/**
+ * Classes to represent metadata of checkpoints saved to DFS. Since this is converted to JSON, any
+ * changes to this MUST be backward-compatible.
+ */
+case class RocksDBCheckpointMetadata(
+    sstFiles: Seq[RocksDBSstFile],
+    logFiles: Seq[RocksDBLogFile],
+    numKeys: Long) {
+  import RocksDBCheckpointMetadata._
+
+  def json: String = {
+    // We turn this field into a null to avoid write a empty logFiles field in the json.
+    val nullified = if (logFiles.isEmpty) this.copy(logFiles = null) else this
+    mapper.writeValueAsString(nullified)
+  }
+
+  def prettyJson: String = Serialization.writePretty(this)(RocksDBCheckpointMetadata.format)
+
+  def writeToFile(metadataFile: File): Unit = {
+    val writer = Files.newBufferedWriter(metadataFile.toPath, UTF_8)
+    try {
+      writer.write(s"v$VERSION\n")
+      writer.write(this.json)
+    } finally {
+      writer.close()
+    }
+  }
+
+  def immutableFiles: Seq[RocksDBImmutableFile] = sstFiles ++ logFiles
+}
+
+/** Helper class for [[RocksDBCheckpointMetadata]] */
+object RocksDBCheckpointMetadata {
+  val VERSION = 1
+
+  implicit val format = Serialization.formats(NoTypeHints)
+
+  /** Used to convert between classes and JSON. */
+  lazy val mapper = {
+    val _mapper = new ObjectMapper with ScalaObjectMapper
+    _mapper.setSerializationInclusion(Include.NON_ABSENT)
+    _mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+    _mapper.registerModule(DefaultScalaModule)
+    _mapper
+  }
+
+  def readFromFile(metadataFile: File): RocksDBCheckpointMetadata = {
+    val reader = Files.newBufferedReader(metadataFile.toPath, UTF_8)
+    try {
+      val versionLine = reader.readLine()
+      if (versionLine != s"v$VERSION") {
+        throw new IllegalStateException(
+          s"Cannot read RocksDB checkpoint metadata of version $versionLine")
+      }
+      Serialization.read[RocksDBCheckpointMetadata](reader)
+    } finally {
+      reader.close()
+    }
+  }
+
+  def apply(rocksDBFiles: Seq[RocksDBImmutableFile], numKeys: Long): RocksDBCheckpointMetadata = {
+    val sstFiles = rocksDBFiles.collect { case file: RocksDBSstFile => file }
+    val logFiles = rocksDBFiles.collect { case file: RocksDBLogFile => file }
+
+    RocksDBCheckpointMetadata(sstFiles, logFiles, numKeys)
+  }
+}
+
+/**
+ * A RocksDBImmutableFile maintains a mapping between a local RocksDB file name and the name of
+ * its copy on DFS. Since these files are immutable, their DFS copies can be reused.
+ */
+sealed trait RocksDBImmutableFile {
+  def localFileName: String
+  def dfsFileName: String
+  def sizeBytes: Long
+
+  /**
+   * Whether another local file is same as the file described by this class.
+   * A file is same only when the name and the size are same.
+   */
+  def isSameFile(otherFile: File): Boolean = {
+    otherFile.getName == localFileName && otherFile.length() == sizeBytes
+  }
+}
+
+/**
+ * Class to represent a RocksDB SST file. Since this is converted to JSON,
+ * any changes to these MUST be backward-compatible.
+ */
+private[sql] case class RocksDBSstFile(
+    localFileName: String,
+    dfsSstFileName: String,
+    sizeBytes: Long) extends RocksDBImmutableFile {
+
+  override def dfsFileName: String = dfsSstFileName
+}
+
+/**
+ * Class to represent a RocksDB Log file. Since this is converted to JSON,
+ * any changes to these MUST be backward-compatible.
+ */
+private[sql] case class RocksDBLogFile(
+    localFileName: String,
+    dfsLogFileName: String,
+    sizeBytes: Long) extends RocksDBImmutableFile {
+
+  override def dfsFileName: String = dfsLogFileName
+}
+
+object RocksDBImmutableFile {
+  val SST_FILES_DFS_SUBDIR = "SSTs"
+  val LOG_FILES_DFS_SUBDIR = "logs"
+  val LOG_FILES_LOCAL_SUBDIR = "archive"
+
+  def apply(localFileName: String, dfsFileName: String, sizeBytes: Long): RocksDBImmutableFile = {
+    if (isSstFile(localFileName)) {
+      RocksDBSstFile(localFileName, dfsFileName, sizeBytes)
+    } else if (isLogFile(localFileName)) {
+      RocksDBLogFile(localFileName, dfsFileName, sizeBytes)
+    } else {
+      null
+    }
+  }
+
+  def isSstFile(fileName: String): Boolean = fileName.endsWith(".sst")
+
+  def isLogFile(fileName: String): Boolean = fileName.endsWith(".log")
+
+  private def isArchivedLogFile(file: File): Boolean =
+    isLogFile(file.getName) && file.getParentFile.getName == LOG_FILES_LOCAL_SUBDIR
+
+  def isImmutableFile(file: File): Boolean = isSstFile(file.getName) || isArchivedLogFile(file)

Review comment:
       Hmm, so if a log file not archived, it is not immutable?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on pull request #32272:
URL: https://github.com/apache/spark/pull/32272#issuecomment-849466064


   Looks like there's no further comment so I'm going to merge this once the test passes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #32272:
URL: https://github.com/apache/spark/pull/32272#issuecomment-824007339


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/42258/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #32272:
URL: https://github.com/apache/spark/pull/32272#issuecomment-849654670


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/139018/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #32272:
URL: https://github.com/apache/spark/pull/32272#issuecomment-824135230






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuanyuanking commented on a change in pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
xuanyuanking commented on a change in pull request #32272:
URL: https://github.com/apache/spark/pull/32272#discussion_r638126273



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.io.File
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.Files
+
+import scala.collection.Seq
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper}
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
+
+/**
+ * Classes to represent metadata of checkpoints saved to DFS. Since this is converted to JSON, any
+ * changes to this MUST be backward-compatible.
+ */
+case class RocksDBCheckpointMetadata(
+    sstFiles: Seq[RocksDBSstFile],
+    logFiles: Seq[RocksDBLogFile],
+    numKeys: Long) {
+  import RocksDBCheckpointMetadata._
+
+  def json: String = {
+    // We turn this field into a null to avoid write a empty logFiles field in the json.
+    val nullified = if (logFiles.isEmpty) this.copy(logFiles = null) else this
+    mapper.writeValueAsString(nullified)
+  }
+
+  def prettyJson: String = Serialization.writePretty(this)(RocksDBCheckpointMetadata.format)
+
+  def writeToFile(metadataFile: File): Unit = {
+    val writer = Files.newBufferedWriter(metadataFile.toPath, UTF_8)
+    try {
+      writer.write(s"v$VERSION\n")
+      writer.write(this.json)
+    } finally {
+      writer.close()
+    }
+  }
+
+  def immutableFiles: Seq[RocksDBImmutableFile] = sstFiles ++ logFiles
+}
+
+/** Helper class for [[RocksDBCheckpointMetadata]] */
+object RocksDBCheckpointMetadata {
+  val VERSION = 1
+
+  implicit val format = Serialization.formats(NoTypeHints)
+
+  /** Used to convert between classes and JSON. */
+  lazy val mapper = {
+    val _mapper = new ObjectMapper with ScalaObjectMapper
+    _mapper.setSerializationInclusion(Include.NON_ABSENT)
+    _mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+    _mapper.registerModule(DefaultScalaModule)
+    _mapper
+  }
+
+  def readFromFile(metadataFile: File): RocksDBCheckpointMetadata = {
+    val reader = Files.newBufferedReader(metadataFile.toPath, UTF_8)
+    try {
+      val versionLine = reader.readLine()
+      if (versionLine != s"v$VERSION") {
+        throw new IllegalStateException(
+          s"Cannot read RocksDB checkpoint metadata of version $versionLine")
+      }
+      Serialization.read[RocksDBCheckpointMetadata](reader)
+    } finally {
+      reader.close()
+    }
+  }
+
+  def apply(rocksDBFiles: Seq[RocksDBImmutableFile], numKeys: Long): RocksDBCheckpointMetadata = {
+    val sstFiles = rocksDBFiles.collect { case file: RocksDBSstFile => file }
+    val logFiles = rocksDBFiles.collect { case file: RocksDBLogFile => file }
+
+    RocksDBCheckpointMetadata(sstFiles, logFiles, numKeys)
+  }
+}
+
+/**
+ * A RocksDBImmutableFile maintains a mapping between a local RocksDB file name and the name of
+ * its copy on DFS. Since these files are immutable, their DFS copies can be reused.
+ */
+sealed trait RocksDBImmutableFile {
+  def localFileName: String
+  def dfsFileName: String
+  def sizeBytes: Long
+
+  /**
+   * Whether another local file is same as the file described by this class.
+   * A file is same only when the name and the size are same.
+   */
+  def isSameFile(otherFile: File): Boolean = {
+    otherFile.getName == localFileName && otherFile.length() == sizeBytes
+  }
+}
+
+/**
+ * Class to represent a RocksDB SST file. Since this is converted to JSON,
+ * any changes to these MUST be backward-compatible.
+ */
+private[sql] case class RocksDBSstFile(
+    localFileName: String,
+    dfsSstFileName: String,
+    sizeBytes: Long) extends RocksDBImmutableFile {
+
+  override def dfsFileName: String = dfsSstFileName
+}
+
+/**
+ * Class to represent a RocksDB Log file. Since this is converted to JSON,
+ * any changes to these MUST be backward-compatible.
+ */
+private[sql] case class RocksDBLogFile(
+    localFileName: String,
+    dfsLogFileName: String,
+    sizeBytes: Long) extends RocksDBImmutableFile {
+
+  override def dfsFileName: String = dfsLogFileName
+}
+
+object RocksDBImmutableFile {
+  val SST_FILES_DFS_SUBDIR = "SSTs"
+  val LOG_FILES_DFS_SUBDIR = "logs"
+  val LOG_FILES_LOCAL_SUBDIR = "archive"
+
+  def apply(localFileName: String, dfsFileName: String, sizeBytes: Long): RocksDBImmutableFile = {
+    if (isSstFile(localFileName)) {
+      RocksDBSstFile(localFileName, dfsFileName, sizeBytes)
+    } else if (isLogFile(localFileName)) {
+      RocksDBLogFile(localFileName, dfsFileName, sizeBytes)
+    } else {
+      null
+    }
+  }
+
+  def isSstFile(fileName: String): Boolean = fileName.endsWith(".sst")
+
+  def isLogFile(fileName: String): Boolean = fileName.endsWith(".log")
+
+  private def isArchivedLogFile(file: File): Boolean =
+    isLogFile(file.getName) && file.getParentFile.getName == LOG_FILES_LOCAL_SUBDIR
+
+  def isImmutableFile(file: File): Boolean = isSstFile(file.getName) || isArchivedLogFile(file)

Review comment:
       ```
   Is there a log file which is not archived log file?...so when we move a log file to LOG_FILES_LOCAL_SUBDIR and make it immutable?
   ```
   Yes, per the [example](https://github.com/facebook/rocksdb/wiki/Write-Ahead-Log#life-cycle-of-a-wal), the archival is triggered when the flushing happens. The save path is used for syncing the necessary files to DFS, and the save path is called in the commit operation of a state store provider.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32272:
URL: https://github.com/apache/spark/pull/32272#issuecomment-823974152


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42254/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #32272:
URL: https://github.com/apache/spark/pull/32272#issuecomment-844693655


   Thanks @HeartSaVioR. I will take another look with #32582 tomorrow.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32272:
URL: https://github.com/apache/spark/pull/32272#issuecomment-849534756


   Kubernetes integration test status success
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/43535/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #32272:
URL: https://github.com/apache/spark/pull/32272#issuecomment-824135230






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuanyuanking commented on a change in pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
xuanyuanking commented on a change in pull request #32272:
URL: https://github.com/apache/spark/pull/32272#discussion_r641196335



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.io.File
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.Files
+
+import scala.collection.Seq
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper}
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
+
+/**
+ * Classes to represent metadata of checkpoints saved to DFS. Since this is converted to JSON, any
+ * changes to this MUST be backward-compatible.
+ */
+case class RocksDBCheckpointMetadata(
+    sstFiles: Seq[RocksDBSstFile],
+    logFiles: Seq[RocksDBLogFile],
+    numKeys: Long) {
+  import RocksDBCheckpointMetadata._
+
+  def json: String = {
+    // We turn this field into a null to avoid write a empty logFiles field in the json.
+    val nullified = if (logFiles.isEmpty) this.copy(logFiles = null) else this
+    mapper.writeValueAsString(nullified)
+  }
+
+  def prettyJson: String = Serialization.writePretty(this)(RocksDBCheckpointMetadata.format)
+
+  def writeToFile(metadataFile: File): Unit = {
+    val writer = Files.newBufferedWriter(metadataFile.toPath, UTF_8)
+    try {
+      writer.write(s"v$VERSION\n")
+      writer.write(this.json)
+    } finally {
+      writer.close()
+    }
+  }
+
+  def immutableFiles: Seq[RocksDBImmutableFile] = sstFiles ++ logFiles
+}
+
+/** Helper class for [[RocksDBCheckpointMetadata]] */
+object RocksDBCheckpointMetadata {
+  val VERSION = 1
+
+  implicit val format = Serialization.formats(NoTypeHints)
+
+  /** Used to convert between classes and JSON. */
+  lazy val mapper = {
+    val _mapper = new ObjectMapper with ScalaObjectMapper
+    _mapper.setSerializationInclusion(Include.NON_ABSENT)
+    _mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+    _mapper.registerModule(DefaultScalaModule)
+    _mapper
+  }
+
+  def readFromFile(metadataFile: File): RocksDBCheckpointMetadata = {
+    val reader = Files.newBufferedReader(metadataFile.toPath, UTF_8)
+    try {
+      val versionLine = reader.readLine()
+      if (versionLine != s"v$VERSION") {
+        throw new IllegalStateException(
+          s"Cannot read RocksDB checkpoint metadata of version $versionLine")
+      }
+      Serialization.read[RocksDBCheckpointMetadata](reader)
+    } finally {
+      reader.close()
+    }
+  }
+
+  def apply(rocksDBFiles: Seq[RocksDBImmutableFile], numKeys: Long): RocksDBCheckpointMetadata = {
+    val sstFiles = rocksDBFiles.collect { case file: RocksDBSstFile => file }
+    val logFiles = rocksDBFiles.collect { case file: RocksDBLogFile => file }
+
+    RocksDBCheckpointMetadata(sstFiles, logFiles, numKeys)
+  }
+}
+
+/**
+ * A RocksDBImmutableFile maintains a mapping between a local RocksDB file name and the name of
+ * its copy on DFS. Since these files are immutable, their DFS copies can be reused.
+ */
+sealed trait RocksDBImmutableFile {
+  def localFileName: String
+  def dfsFileName: String
+  def sizeBytes: Long
+
+  /**
+   * Whether another local file is same as the file described by this class.
+   * A file is same only when the name and the size are same.
+   */
+  def isSameFile(otherFile: File): Boolean = {
+    otherFile.getName == localFileName && otherFile.length() == sizeBytes
+  }
+}
+
+/**
+ * Class to represent a RocksDB SST file. Since this is converted to JSON,
+ * any changes to these MUST be backward-compatible.
+ */
+private[sql] case class RocksDBSstFile(
+    localFileName: String,
+    dfsSstFileName: String,
+    sizeBytes: Long) extends RocksDBImmutableFile {
+
+  override def dfsFileName: String = dfsSstFileName
+}
+
+/**
+ * Class to represent a RocksDB Log file. Since this is converted to JSON,
+ * any changes to these MUST be backward-compatible.
+ */
+private[sql] case class RocksDBLogFile(
+    localFileName: String,
+    dfsLogFileName: String,
+    sizeBytes: Long) extends RocksDBImmutableFile {
+
+  override def dfsFileName: String = dfsLogFileName
+}
+
+object RocksDBImmutableFile {
+  val SST_FILES_DFS_SUBDIR = "SSTs"
+  val LOG_FILES_DFS_SUBDIR = "logs"
+  val LOG_FILES_LOCAL_SUBDIR = "archive"
+
+  def apply(localFileName: String, dfsFileName: String, sizeBytes: Long): RocksDBImmutableFile = {
+    if (isSstFile(localFileName)) {
+      RocksDBSstFile(localFileName, dfsFileName, sizeBytes)
+    } else if (isLogFile(localFileName)) {
+      RocksDBLogFile(localFileName, dfsFileName, sizeBytes)
+    } else {
+      null
+    }
+  }
+
+  def isSstFile(fileName: String): Boolean = fileName.endsWith(".sst")
+
+  def isLogFile(fileName: String): Boolean = fileName.endsWith(".log")
+
+  private def isArchivedLogFile(file: File): Boolean =
+    isLogFile(file.getName) && file.getParentFile.getName == LOG_FILES_LOCAL_SUBDIR
+
+  def isImmutableFile(file: File): Boolean = isSstFile(file.getName) || isArchivedLogFile(file)

Review comment:
       The comment changed in https://github.com/apache/spark/pull/32582/files#diff-e3d3914d0398d61fdd299b1f8d3e869ec6a86e97606677c724969e421c9bf44eR77-R80




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #32272:
URL: https://github.com/apache/spark/pull/32272#issuecomment-823977018


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/42254/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuanyuanking commented on a change in pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
xuanyuanking commented on a change in pull request #32272:
URL: https://github.com/apache/spark/pull/32272#discussion_r638519692



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.io.File
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.Files
+
+import scala.collection.Seq
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper}
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
+
+/**
+ * Classes to represent metadata of checkpoints saved to DFS. Since this is converted to JSON, any
+ * changes to this MUST be backward-compatible.
+ */
+case class RocksDBCheckpointMetadata(
+    sstFiles: Seq[RocksDBSstFile],
+    logFiles: Seq[RocksDBLogFile],
+    numKeys: Long) {
+  import RocksDBCheckpointMetadata._
+
+  def json: String = {
+    // We turn this field into a null to avoid write a empty logFiles field in the json.
+    val nullified = if (logFiles.isEmpty) this.copy(logFiles = null) else this
+    mapper.writeValueAsString(nullified)
+  }
+
+  def prettyJson: String = Serialization.writePretty(this)(RocksDBCheckpointMetadata.format)
+
+  def writeToFile(metadataFile: File): Unit = {
+    val writer = Files.newBufferedWriter(metadataFile.toPath, UTF_8)
+    try {
+      writer.write(s"v$VERSION\n")
+      writer.write(this.json)
+    } finally {
+      writer.close()
+    }
+  }
+
+  def immutableFiles: Seq[RocksDBImmutableFile] = sstFiles ++ logFiles
+}
+
+/** Helper class for [[RocksDBCheckpointMetadata]] */
+object RocksDBCheckpointMetadata {
+  val VERSION = 1
+
+  implicit val format = Serialization.formats(NoTypeHints)
+
+  /** Used to convert between classes and JSON. */
+  lazy val mapper = {
+    val _mapper = new ObjectMapper with ScalaObjectMapper
+    _mapper.setSerializationInclusion(Include.NON_ABSENT)
+    _mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+    _mapper.registerModule(DefaultScalaModule)
+    _mapper
+  }
+
+  def readFromFile(metadataFile: File): RocksDBCheckpointMetadata = {
+    val reader = Files.newBufferedReader(metadataFile.toPath, UTF_8)
+    try {
+      val versionLine = reader.readLine()
+      if (versionLine != s"v$VERSION") {
+        throw new IllegalStateException(
+          s"Cannot read RocksDB checkpoint metadata of version $versionLine")
+      }
+      Serialization.read[RocksDBCheckpointMetadata](reader)
+    } finally {
+      reader.close()
+    }
+  }
+
+  def apply(rocksDBFiles: Seq[RocksDBImmutableFile], numKeys: Long): RocksDBCheckpointMetadata = {
+    val sstFiles = rocksDBFiles.collect { case file: RocksDBSstFile => file }
+    val logFiles = rocksDBFiles.collect { case file: RocksDBLogFile => file }
+
+    RocksDBCheckpointMetadata(sstFiles, logFiles, numKeys)
+  }
+}
+
+/**
+ * A RocksDBImmutableFile maintains a mapping between a local RocksDB file name and the name of
+ * its copy on DFS. Since these files are immutable, their DFS copies can be reused.
+ */
+sealed trait RocksDBImmutableFile {
+  def localFileName: String
+  def dfsFileName: String
+  def sizeBytes: Long
+
+  /**
+   * Whether another local file is same as the file described by this class.
+   * A file is same only when the name and the size are same.
+   */
+  def isSameFile(otherFile: File): Boolean = {
+    otherFile.getName == localFileName && otherFile.length() == sizeBytes
+  }
+}
+
+/**
+ * Class to represent a RocksDB SST file. Since this is converted to JSON,
+ * any changes to these MUST be backward-compatible.
+ */
+private[sql] case class RocksDBSstFile(
+    localFileName: String,
+    dfsSstFileName: String,
+    sizeBytes: Long) extends RocksDBImmutableFile {
+
+  override def dfsFileName: String = dfsSstFileName
+}
+
+/**
+ * Class to represent a RocksDB Log file. Since this is converted to JSON,
+ * any changes to these MUST be backward-compatible.
+ */
+private[sql] case class RocksDBLogFile(
+    localFileName: String,
+    dfsLogFileName: String,
+    sizeBytes: Long) extends RocksDBImmutableFile {
+
+  override def dfsFileName: String = dfsLogFileName
+}
+
+object RocksDBImmutableFile {
+  val SST_FILES_DFS_SUBDIR = "SSTs"
+  val LOG_FILES_DFS_SUBDIR = "logs"
+  val LOG_FILES_LOCAL_SUBDIR = "archive"
+
+  def apply(localFileName: String, dfsFileName: String, sizeBytes: Long): RocksDBImmutableFile = {
+    if (isSstFile(localFileName)) {
+      RocksDBSstFile(localFileName, dfsFileName, sizeBytes)
+    } else if (isLogFile(localFileName)) {
+      RocksDBLogFile(localFileName, dfsFileName, sizeBytes)
+    } else {
+      null
+    }
+  }
+
+  def isSstFile(fileName: String): Boolean = fileName.endsWith(".sst")
+
+  def isLogFile(fileName: String): Boolean = fileName.endsWith(".log")
+
+  private def isArchivedLogFile(file: File): Boolean =
+    isLogFile(file.getName) && file.getParentFile.getName == LOG_FILES_LOCAL_SUBDIR
+
+  def isImmutableFile(file: File): Boolean = isSstFile(file.getName) || isArchivedLogFile(file)

Review comment:
       ```
   Looks they are shared too as they are immutable files. Can you also update the code comment there?
   ```
   Yes that's right. All the immutable files (sst and log files) are contained in checkpoint metadata and shared across checkpoint versions. The un-archived log files are only contained in the ${version}.zip and mapped with a specific checkpoint version.
   
   I think the structure already contains log files. Do you mean I need to emphasize it in the comments (here)[https://github.com/apache/spark/pull/32582/files#diff-e3d3914d0398d61fdd299b1f8d3e869ec6a86e97606677c724969e421c9bf44eR74-R77]?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #32272:
URL: https://github.com/apache/spark/pull/32272#issuecomment-824007339


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/42258/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR closed pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
HeartSaVioR closed pull request #32272:
URL: https://github.com/apache/spark/pull/32272


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun commented on pull request #32272:
URL: https://github.com/apache/spark/pull/32272#issuecomment-824502349


   cc @viirya 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuanyuanking commented on a change in pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
xuanyuanking commented on a change in pull request #32272:
URL: https://github.com/apache/spark/pull/32272#discussion_r633321491



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.io.File
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.Files
+
+import scala.collection.Seq
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper}
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
+
+/**
+ * Classes to represent metadata of checkpoints saved to DFS. Since this is converted to JSON, any
+ * changes to this MUST be backward-compatible.
+ */
+case class RocksDBCheckpointMetadata(
+    sstFiles: Seq[RocksDBSstFile],
+    logFiles: Seq[RocksDBLogFile],
+    numKeys: Long) {
+  import RocksDBCheckpointMetadata._
+
+  def json: String = {
+    // We turn this field into a null to avoid write a empty logFiles field in the json.
+    val nullified = if (logFiles.isEmpty) this.copy(logFiles = null) else this
+    mapper.writeValueAsString(nullified)
+  }
+
+  def prettyJson: String = Serialization.writePretty(this)(RocksDBCheckpointMetadata.format)
+
+  def writeToFile(metadataFile: File): Unit = {
+    val writer = Files.newBufferedWriter(metadataFile.toPath, UTF_8)
+    try {
+      writer.write(s"v$VERSION\n")
+      writer.write(this.json)
+    } finally {
+      writer.close()
+    }
+  }
+
+  def immutableFiles: Seq[RocksDBImmutableFile] = sstFiles ++ logFiles
+}
+
+/** Helper class for [[RocksDBCheckpointMetadata]] */
+object RocksDBCheckpointMetadata {
+  val VERSION = 1
+
+  implicit val format = Serialization.formats(NoTypeHints)
+
+  /** Used to convert between classes and JSON. */
+  lazy val mapper = {
+    val _mapper = new ObjectMapper with ScalaObjectMapper
+    _mapper.setSerializationInclusion(Include.NON_ABSENT)
+    _mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+    _mapper.registerModule(DefaultScalaModule)
+    _mapper
+  }
+
+  def readFromFile(metadataFile: File): RocksDBCheckpointMetadata = {
+    val reader = Files.newBufferedReader(metadataFile.toPath, UTF_8)
+    try {
+      val versionLine = reader.readLine()
+      if (versionLine != s"v$VERSION") {
+        throw new IllegalStateException(
+          s"Cannot read RocksDB checkpoint metadata of version $versionLine")
+      }
+      Serialization.read[RocksDBCheckpointMetadata](reader)
+    } finally {
+      reader.close()
+    }
+  }
+
+  def apply(rocksDBFiles: Seq[RocksDBImmutableFile], numKeys: Long): RocksDBCheckpointMetadata = {
+    val sstFiles = rocksDBFiles.collect { case file: RocksDBSstFile => file }
+    val logFiles = rocksDBFiles.collect { case file: RocksDBLogFile => file }
+
+    RocksDBCheckpointMetadata(sstFiles, logFiles, numKeys)
+  }
+}
+
+/**
+ * A RocksDBImmutableFile maintains a mapping between a local RocksDB file name and the name of
+ * its copy on DFS. Since these files are immutable, their DFS copies can be reused.
+ */
+sealed trait RocksDBImmutableFile {
+  def localFileName: String
+  def dfsFileName: String
+  def sizeBytes: Long
+
+  /**
+   * Whether another local file is same as the file described by this class.
+   * A file is same only when the name and the size are same.
+   */
+  def isSameFile(otherFile: File): Boolean = {
+    otherFile.getName == localFileName && otherFile.length() == sizeBytes
+  }
+}
+
+/**
+ * Class to represent a RocksDB SST file. Since this is converted to JSON,
+ * any changes to these MUST be backward-compatible.
+ */
+private[sql] case class RocksDBSstFile(
+    localFileName: String,
+    dfsSstFileName: String,
+    sizeBytes: Long) extends RocksDBImmutableFile {
+
+  override def dfsFileName: String = dfsSstFileName
+}
+
+/**
+ * Class to represent a RocksDB Log file. Since this is converted to JSON,
+ * any changes to these MUST be backward-compatible.
+ */
+private[sql] case class RocksDBLogFile(
+    localFileName: String,
+    dfsLogFileName: String,
+    sizeBytes: Long) extends RocksDBImmutableFile {
+
+  override def dfsFileName: String = dfsLogFileName
+}
+
+object RocksDBImmutableFile {
+  val SST_FILES_DFS_SUBDIR = "SSTs"
+  val LOG_FILES_DFS_SUBDIR = "logs"
+  val LOG_FILES_LOCAL_SUBDIR = "archive"
+
+  def apply(localFileName: String, dfsFileName: String, sizeBytes: Long): RocksDBImmutableFile = {
+    if (isSstFile(localFileName)) {
+      RocksDBSstFile(localFileName, dfsFileName, sizeBytes)
+    } else if (isLogFile(localFileName)) {
+      RocksDBLogFile(localFileName, dfsFileName, sizeBytes)
+    } else {
+      null
+    }
+  }
+
+  def isSstFile(fileName: String): Boolean = fileName.endsWith(".sst")
+
+  def isLogFile(fileName: String): Boolean = fileName.endsWith(".log")
+
+  private def isArchivedLogFile(file: File): Boolean =
+    isLogFile(file.getName) && file.getParentFile.getName == LOG_FILES_LOCAL_SUBDIR
+
+  def isImmutableFile(file: File): Boolean = isSstFile(file.getName) || isArchivedLogFile(file)

Review comment:
       That's right. This is because we only generate the RocksDBLogFile object in the operation of checkpointing. Let's keep this comment open and I'll link this with the caller side for full context. (The caller side is the next PR as my plan - the save path of RocksDBFileManager).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a change in pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
viirya commented on a change in pull request #32272:
URL: https://github.com/apache/spark/pull/32272#discussion_r638524407



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.io.File
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.Files
+
+import scala.collection.Seq
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper}
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
+
+/**
+ * Classes to represent metadata of checkpoints saved to DFS. Since this is converted to JSON, any
+ * changes to this MUST be backward-compatible.
+ */
+case class RocksDBCheckpointMetadata(
+    sstFiles: Seq[RocksDBSstFile],
+    logFiles: Seq[RocksDBLogFile],
+    numKeys: Long) {
+  import RocksDBCheckpointMetadata._
+
+  def json: String = {
+    // We turn this field into a null to avoid write a empty logFiles field in the json.
+    val nullified = if (logFiles.isEmpty) this.copy(logFiles = null) else this
+    mapper.writeValueAsString(nullified)
+  }
+
+  def prettyJson: String = Serialization.writePretty(this)(RocksDBCheckpointMetadata.format)
+
+  def writeToFile(metadataFile: File): Unit = {
+    val writer = Files.newBufferedWriter(metadataFile.toPath, UTF_8)
+    try {
+      writer.write(s"v$VERSION\n")
+      writer.write(this.json)
+    } finally {
+      writer.close()
+    }
+  }
+
+  def immutableFiles: Seq[RocksDBImmutableFile] = sstFiles ++ logFiles
+}
+
+/** Helper class for [[RocksDBCheckpointMetadata]] */
+object RocksDBCheckpointMetadata {
+  val VERSION = 1
+
+  implicit val format = Serialization.formats(NoTypeHints)
+
+  /** Used to convert between classes and JSON. */
+  lazy val mapper = {
+    val _mapper = new ObjectMapper with ScalaObjectMapper
+    _mapper.setSerializationInclusion(Include.NON_ABSENT)
+    _mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+    _mapper.registerModule(DefaultScalaModule)
+    _mapper
+  }
+
+  def readFromFile(metadataFile: File): RocksDBCheckpointMetadata = {
+    val reader = Files.newBufferedReader(metadataFile.toPath, UTF_8)
+    try {
+      val versionLine = reader.readLine()
+      if (versionLine != s"v$VERSION") {
+        throw new IllegalStateException(
+          s"Cannot read RocksDB checkpoint metadata of version $versionLine")
+      }
+      Serialization.read[RocksDBCheckpointMetadata](reader)
+    } finally {
+      reader.close()
+    }
+  }
+
+  def apply(rocksDBFiles: Seq[RocksDBImmutableFile], numKeys: Long): RocksDBCheckpointMetadata = {
+    val sstFiles = rocksDBFiles.collect { case file: RocksDBSstFile => file }
+    val logFiles = rocksDBFiles.collect { case file: RocksDBLogFile => file }
+
+    RocksDBCheckpointMetadata(sstFiles, logFiles, numKeys)
+  }
+}
+
+/**
+ * A RocksDBImmutableFile maintains a mapping between a local RocksDB file name and the name of
+ * its copy on DFS. Since these files are immutable, their DFS copies can be reused.
+ */
+sealed trait RocksDBImmutableFile {
+  def localFileName: String
+  def dfsFileName: String
+  def sizeBytes: Long
+
+  /**
+   * Whether another local file is same as the file described by this class.
+   * A file is same only when the name and the size are same.
+   */
+  def isSameFile(otherFile: File): Boolean = {
+    otherFile.getName == localFileName && otherFile.length() == sizeBytes
+  }
+}
+
+/**
+ * Class to represent a RocksDB SST file. Since this is converted to JSON,
+ * any changes to these MUST be backward-compatible.
+ */
+private[sql] case class RocksDBSstFile(
+    localFileName: String,
+    dfsSstFileName: String,
+    sizeBytes: Long) extends RocksDBImmutableFile {
+
+  override def dfsFileName: String = dfsSstFileName
+}
+
+/**
+ * Class to represent a RocksDB Log file. Since this is converted to JSON,
+ * any changes to these MUST be backward-compatible.
+ */
+private[sql] case class RocksDBLogFile(
+    localFileName: String,
+    dfsLogFileName: String,
+    sizeBytes: Long) extends RocksDBImmutableFile {
+
+  override def dfsFileName: String = dfsLogFileName
+}
+
+object RocksDBImmutableFile {
+  val SST_FILES_DFS_SUBDIR = "SSTs"
+  val LOG_FILES_DFS_SUBDIR = "logs"
+  val LOG_FILES_LOCAL_SUBDIR = "archive"
+
+  def apply(localFileName: String, dfsFileName: String, sizeBytes: Long): RocksDBImmutableFile = {
+    if (isSstFile(localFileName)) {
+      RocksDBSstFile(localFileName, dfsFileName, sizeBytes)
+    } else if (isLogFile(localFileName)) {
+      RocksDBLogFile(localFileName, dfsFileName, sizeBytes)
+    } else {
+      null
+    }
+  }
+
+  def isSstFile(fileName: String): Boolean = fileName.endsWith(".sst")
+
+  def isLogFile(fileName: String): Boolean = fileName.endsWith(".log")
+
+  private def isArchivedLogFile(file: File): Boolean =
+    isLogFile(file.getName) && file.getParentFile.getName == LOG_FILES_LOCAL_SUBDIR
+
+  def isImmutableFile(file: File): Boolean = isSstFile(file.getName) || isArchivedLogFile(file)

Review comment:
       I saw archived log files in the structure, but the comment only mentions SST files. So I thought if it is different than SST files. I think it is better to  mention archived log files along with SST files in the comments.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuanyuanking commented on a change in pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
xuanyuanking commented on a change in pull request #32272:
URL: https://github.com/apache/spark/pull/32272#discussion_r619787523



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.io.File
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.Files
+
+import scala.collection.Seq
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper}
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
+
+/**
+ * Classes to represent metadata of checkpoints saved to DFS. Since this is converted to JSON, any
+ * changes to this MUST be backward-compatible.
+ */
+case class RocksDBCheckpointMetadata(
+    sstFiles: Seq[RocksDBSstFile],
+    logFiles: Seq[RocksDBLogFile],
+    numKeys: Long) {
+  import RocksDBCheckpointMetadata._
+
+  def json: String = {
+    // We turn this field into a null to avoid write a empty logFiles field in the json.
+    val nullified = if (logFiles.isEmpty) this.copy(logFiles = null) else this
+    mapper.writeValueAsString(nullified)
+  }
+
+  def prettyJson: String = Serialization.writePretty(this)(RocksDBCheckpointMetadata.format)
+
+  def writeToFile(metadataFile: File): Unit = {
+    val writer = Files.newBufferedWriter(metadataFile.toPath, UTF_8)
+    try {
+      writer.write(s"v$VERSION\n")
+      writer.write(this.json)
+    } finally {
+      writer.close()
+    }
+  }
+
+  def immutableFiles: Seq[RocksDBImmutableFile] = sstFiles ++ logFiles
+}
+
+/** Helper class for [[RocksDBCheckpointMetadata]] */
+object RocksDBCheckpointMetadata {
+  val VERSION = 1
+
+  implicit val format = Serialization.formats(NoTypeHints)
+
+  /** Used to convert between classes and JSON. */
+  lazy val mapper = {
+    val _mapper = new ObjectMapper with ScalaObjectMapper
+    _mapper.setSerializationInclusion(Include.NON_ABSENT)
+    _mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+    _mapper.registerModule(DefaultScalaModule)
+    _mapper
+  }
+
+  def readFromFile(metadataFile: File): RocksDBCheckpointMetadata = {
+    val reader = Files.newBufferedReader(metadataFile.toPath, UTF_8)
+    try {
+      val versionLine = reader.readLine()
+      if (versionLine != s"v$VERSION") {
+        throw new IllegalStateException(
+          s"Cannot read RocksDB checkpoint metadata of version $versionLine")
+      }
+      Serialization.read[RocksDBCheckpointMetadata](reader)
+    } finally {
+      reader.close()
+    }
+  }
+
+  def apply(rocksDBFiles: Seq[RocksDBImmutableFile], numKeys: Long): RocksDBCheckpointMetadata = {
+    val sstFiles = rocksDBFiles.collect { case file: RocksDBSstFile => file }
+    val logFiles = rocksDBFiles.collect { case file: RocksDBLogFile => file }
+
+    RocksDBCheckpointMetadata(sstFiles, logFiles, numKeys)
+  }
+}
+
+/**
+ * A RocksDBImmutableFile maintains a mapping between a local RocksDB file name and the name of
+ * its copy on DFS. Since these files are immutable, their DFS copies can be reused.
+ */
+sealed trait RocksDBImmutableFile {
+  def localFileName: String
+  def dfsFileName: String
+  def sizeBytes: Long
+
+  /**
+   * Whether another local file is same as the file described by this class.
+   * A file is same only when the name and the size are same.
+   */
+  def isSameFile(otherFile: File): Boolean = {
+    otherFile.getName == localFileName && otherFile.length() == sizeBytes
+  }

Review comment:
       The DFS file name contains UUID, it shouldn't be the same. Normally we use the local file name to filter whether the file is existing locally.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuanyuanking commented on pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
xuanyuanking commented on pull request #32272:
URL: https://github.com/apache/spark/pull/32272#issuecomment-840445866


   ```
   there're some sorts of uncertainty during reviewing as there's no reference PR. In other words, we are reviewing methods which we don't have idea how these methods will be used.
   ```
   ```
   I'm also OK to review PRs one by one with uncertainty (with faith) and revisit all changes at the last phase.
   ```
   Yes, agree on both. I propose that we can mark down the uncertain methods or the ones without the caller side for now in the PR. When I submitting the reference PR, I can link the comment to the newly created PRs. It should help to our review and make sure I don't miss to explain any uncertainty during the review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #32272:
URL: https://github.com/apache/spark/pull/32272#issuecomment-846359351


   Sorry for late. I will find some time in the weekend to look at this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuanyuanking commented on a change in pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
xuanyuanking commented on a change in pull request #32272:
URL: https://github.com/apache/spark/pull/32272#discussion_r638132159



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.io.File
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.Files
+
+import scala.collection.Seq
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper}
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
+
+/**
+ * Classes to represent metadata of checkpoints saved to DFS. Since this is converted to JSON, any
+ * changes to this MUST be backward-compatible.
+ */
+case class RocksDBCheckpointMetadata(
+    sstFiles: Seq[RocksDBSstFile],
+    logFiles: Seq[RocksDBLogFile],
+    numKeys: Long) {
+  import RocksDBCheckpointMetadata._
+
+  def json: String = {
+    // We turn this field into a null to avoid write a empty logFiles field in the json.
+    val nullified = if (logFiles.isEmpty) this.copy(logFiles = null) else this
+    mapper.writeValueAsString(nullified)
+  }
+
+  def prettyJson: String = Serialization.writePretty(this)(RocksDBCheckpointMetadata.format)
+
+  def writeToFile(metadataFile: File): Unit = {
+    val writer = Files.newBufferedWriter(metadataFile.toPath, UTF_8)
+    try {
+      writer.write(s"v$VERSION\n")
+      writer.write(this.json)
+    } finally {
+      writer.close()
+    }
+  }
+
+  def immutableFiles: Seq[RocksDBImmutableFile] = sstFiles ++ logFiles
+}
+
+/** Helper class for [[RocksDBCheckpointMetadata]] */
+object RocksDBCheckpointMetadata {
+  val VERSION = 1
+
+  implicit val format = Serialization.formats(NoTypeHints)
+
+  /** Used to convert between classes and JSON. */
+  lazy val mapper = {
+    val _mapper = new ObjectMapper with ScalaObjectMapper
+    _mapper.setSerializationInclusion(Include.NON_ABSENT)
+    _mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+    _mapper.registerModule(DefaultScalaModule)
+    _mapper
+  }
+
+  def readFromFile(metadataFile: File): RocksDBCheckpointMetadata = {
+    val reader = Files.newBufferedReader(metadataFile.toPath, UTF_8)
+    try {
+      val versionLine = reader.readLine()
+      if (versionLine != s"v$VERSION") {
+        throw new IllegalStateException(
+          s"Cannot read RocksDB checkpoint metadata of version $versionLine")
+      }
+      Serialization.read[RocksDBCheckpointMetadata](reader)
+    } finally {
+      reader.close()
+    }
+  }
+
+  def apply(rocksDBFiles: Seq[RocksDBImmutableFile], numKeys: Long): RocksDBCheckpointMetadata = {
+    val sstFiles = rocksDBFiles.collect { case file: RocksDBSstFile => file }
+    val logFiles = rocksDBFiles.collect { case file: RocksDBLogFile => file }
+
+    RocksDBCheckpointMetadata(sstFiles, logFiles, numKeys)
+  }
+}
+
+/**
+ * A RocksDBImmutableFile maintains a mapping between a local RocksDB file name and the name of
+ * its copy on DFS. Since these files are immutable, their DFS copies can be reused.
+ */
+sealed trait RocksDBImmutableFile {
+  def localFileName: String
+  def dfsFileName: String
+  def sizeBytes: Long
+
+  /**
+   * Whether another local file is same as the file described by this class.
+   * A file is same only when the name and the size are same.
+   */
+  def isSameFile(otherFile: File): Boolean = {
+    otherFile.getName == localFileName && otherFile.length() == sizeBytes
+  }
+}
+
+/**
+ * Class to represent a RocksDB SST file. Since this is converted to JSON,
+ * any changes to these MUST be backward-compatible.
+ */
+private[sql] case class RocksDBSstFile(
+    localFileName: String,
+    dfsSstFileName: String,
+    sizeBytes: Long) extends RocksDBImmutableFile {
+
+  override def dfsFileName: String = dfsSstFileName
+}
+
+/**
+ * Class to represent a RocksDB Log file. Since this is converted to JSON,
+ * any changes to these MUST be backward-compatible.
+ */
+private[sql] case class RocksDBLogFile(
+    localFileName: String,
+    dfsLogFileName: String,
+    sizeBytes: Long) extends RocksDBImmutableFile {
+
+  override def dfsFileName: String = dfsLogFileName
+}
+
+object RocksDBImmutableFile {
+  val SST_FILES_DFS_SUBDIR = "SSTs"
+  val LOG_FILES_DFS_SUBDIR = "logs"
+  val LOG_FILES_LOCAL_SUBDIR = "archive"
+
+  def apply(localFileName: String, dfsFileName: String, sizeBytes: Long): RocksDBImmutableFile = {
+    if (isSstFile(localFileName)) {
+      RocksDBSstFile(localFileName, dfsFileName, sizeBytes)
+    } else if (isLogFile(localFileName)) {
+      RocksDBLogFile(localFileName, dfsFileName, sizeBytes)
+    } else {
+      null
+    }
+  }
+
+  def isSstFile(fileName: String): Boolean = fileName.endsWith(".sst")
+
+  def isLogFile(fileName: String): Boolean = fileName.endsWith(".log")
+
+  private def isArchivedLogFile(file: File): Boolean =
+    isLogFile(file.getName) && file.getParentFile.getName == LOG_FILES_LOCAL_SUBDIR
+
+  def isImmutableFile(file: File): Boolean = isSstFile(file.getName) || isArchivedLogFile(file)

Review comment:
       No, we won't change any files generated by Rocksdb, just sync them with DFS. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #32272:
URL: https://github.com/apache/spark/pull/32272#issuecomment-849534792


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/43535/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuanyuanking commented on a change in pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
xuanyuanking commented on a change in pull request #32272:
URL: https://github.com/apache/spark/pull/32272#discussion_r638519692



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.io.File
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.Files
+
+import scala.collection.Seq
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper}
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
+
+/**
+ * Classes to represent metadata of checkpoints saved to DFS. Since this is converted to JSON, any
+ * changes to this MUST be backward-compatible.
+ */
+case class RocksDBCheckpointMetadata(
+    sstFiles: Seq[RocksDBSstFile],
+    logFiles: Seq[RocksDBLogFile],
+    numKeys: Long) {
+  import RocksDBCheckpointMetadata._
+
+  def json: String = {
+    // We turn this field into a null to avoid write a empty logFiles field in the json.
+    val nullified = if (logFiles.isEmpty) this.copy(logFiles = null) else this
+    mapper.writeValueAsString(nullified)
+  }
+
+  def prettyJson: String = Serialization.writePretty(this)(RocksDBCheckpointMetadata.format)
+
+  def writeToFile(metadataFile: File): Unit = {
+    val writer = Files.newBufferedWriter(metadataFile.toPath, UTF_8)
+    try {
+      writer.write(s"v$VERSION\n")
+      writer.write(this.json)
+    } finally {
+      writer.close()
+    }
+  }
+
+  def immutableFiles: Seq[RocksDBImmutableFile] = sstFiles ++ logFiles
+}
+
+/** Helper class for [[RocksDBCheckpointMetadata]] */
+object RocksDBCheckpointMetadata {
+  val VERSION = 1
+
+  implicit val format = Serialization.formats(NoTypeHints)
+
+  /** Used to convert between classes and JSON. */
+  lazy val mapper = {
+    val _mapper = new ObjectMapper with ScalaObjectMapper
+    _mapper.setSerializationInclusion(Include.NON_ABSENT)
+    _mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+    _mapper.registerModule(DefaultScalaModule)
+    _mapper
+  }
+
+  def readFromFile(metadataFile: File): RocksDBCheckpointMetadata = {
+    val reader = Files.newBufferedReader(metadataFile.toPath, UTF_8)
+    try {
+      val versionLine = reader.readLine()
+      if (versionLine != s"v$VERSION") {
+        throw new IllegalStateException(
+          s"Cannot read RocksDB checkpoint metadata of version $versionLine")
+      }
+      Serialization.read[RocksDBCheckpointMetadata](reader)
+    } finally {
+      reader.close()
+    }
+  }
+
+  def apply(rocksDBFiles: Seq[RocksDBImmutableFile], numKeys: Long): RocksDBCheckpointMetadata = {
+    val sstFiles = rocksDBFiles.collect { case file: RocksDBSstFile => file }
+    val logFiles = rocksDBFiles.collect { case file: RocksDBLogFile => file }
+
+    RocksDBCheckpointMetadata(sstFiles, logFiles, numKeys)
+  }
+}
+
+/**
+ * A RocksDBImmutableFile maintains a mapping between a local RocksDB file name and the name of
+ * its copy on DFS. Since these files are immutable, their DFS copies can be reused.
+ */
+sealed trait RocksDBImmutableFile {
+  def localFileName: String
+  def dfsFileName: String
+  def sizeBytes: Long
+
+  /**
+   * Whether another local file is same as the file described by this class.
+   * A file is same only when the name and the size are same.
+   */
+  def isSameFile(otherFile: File): Boolean = {
+    otherFile.getName == localFileName && otherFile.length() == sizeBytes
+  }
+}
+
+/**
+ * Class to represent a RocksDB SST file. Since this is converted to JSON,
+ * any changes to these MUST be backward-compatible.
+ */
+private[sql] case class RocksDBSstFile(
+    localFileName: String,
+    dfsSstFileName: String,
+    sizeBytes: Long) extends RocksDBImmutableFile {
+
+  override def dfsFileName: String = dfsSstFileName
+}
+
+/**
+ * Class to represent a RocksDB Log file. Since this is converted to JSON,
+ * any changes to these MUST be backward-compatible.
+ */
+private[sql] case class RocksDBLogFile(
+    localFileName: String,
+    dfsLogFileName: String,
+    sizeBytes: Long) extends RocksDBImmutableFile {
+
+  override def dfsFileName: String = dfsLogFileName
+}
+
+object RocksDBImmutableFile {
+  val SST_FILES_DFS_SUBDIR = "SSTs"
+  val LOG_FILES_DFS_SUBDIR = "logs"
+  val LOG_FILES_LOCAL_SUBDIR = "archive"
+
+  def apply(localFileName: String, dfsFileName: String, sizeBytes: Long): RocksDBImmutableFile = {
+    if (isSstFile(localFileName)) {
+      RocksDBSstFile(localFileName, dfsFileName, sizeBytes)
+    } else if (isLogFile(localFileName)) {
+      RocksDBLogFile(localFileName, dfsFileName, sizeBytes)
+    } else {
+      null
+    }
+  }
+
+  def isSstFile(fileName: String): Boolean = fileName.endsWith(".sst")
+
+  def isLogFile(fileName: String): Boolean = fileName.endsWith(".log")
+
+  private def isArchivedLogFile(file: File): Boolean =
+    isLogFile(file.getName) && file.getParentFile.getName == LOG_FILES_LOCAL_SUBDIR
+
+  def isImmutableFile(file: File): Boolean = isSstFile(file.getName) || isArchivedLogFile(file)

Review comment:
       ```
   Looks they are shared too as they are immutable files. Can you also update the code comment there?
   ```
   Yes that's right. All the immutable files (sst and log files) are contained in checkpoint metadata and shared across checkpoint versions. The un-archived log files are only contained in the ${version}.zip and mapped with a specific checkpoint version.
   
   I think the structure already contains log files. Do you mean I need to emphasize it in the comments [here](https://github.com/apache/spark/pull/32582/files#diff-e3d3914d0398d61fdd299b1f8d3e869ec6a86e97606677c724969e421c9bf44eR74-R77)?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32272:
URL: https://github.com/apache/spark/pull/32272#issuecomment-823946560


   **[Test build #137731 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/137731/testReport)** for PR 32272 at commit [`4b61526`](https://github.com/apache/spark/commit/4b61526475215e6dbc16c68c0216750685fa0a37).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuanyuanking commented on pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
xuanyuanking commented on pull request #32272:
URL: https://github.com/apache/spark/pull/32272#issuecomment-847455327


   ```
   Sorry for late. I will find some time in the weekend to look at this.
   ```
   No worries, thanks for the detailed review! Take your time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #32272:
URL: https://github.com/apache/spark/pull/32272#issuecomment-823946560


   **[Test build #137731 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/137731/testReport)** for PR 32272 at commit [`4b61526`](https://github.com/apache/spark/commit/4b61526475215e6dbc16c68c0216750685fa0a37).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #32272:
URL: https://github.com/apache/spark/pull/32272#issuecomment-823944469


   **[Test build #137727 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/137727/testReport)** for PR 32272 at commit [`7ce24ce`](https://github.com/apache/spark/commit/7ce24cec183224eb49c8801a2a756262310e9752).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a change in pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
viirya commented on a change in pull request #32272:
URL: https://github.com/apache/spark/pull/32272#discussion_r638497268



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.io.File
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.Files
+
+import scala.collection.Seq
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper}
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
+
+/**
+ * Classes to represent metadata of checkpoints saved to DFS. Since this is converted to JSON, any
+ * changes to this MUST be backward-compatible.
+ */
+case class RocksDBCheckpointMetadata(
+    sstFiles: Seq[RocksDBSstFile],
+    logFiles: Seq[RocksDBLogFile],
+    numKeys: Long) {
+  import RocksDBCheckpointMetadata._
+
+  def json: String = {
+    // We turn this field into a null to avoid write a empty logFiles field in the json.
+    val nullified = if (logFiles.isEmpty) this.copy(logFiles = null) else this
+    mapper.writeValueAsString(nullified)
+  }
+
+  def prettyJson: String = Serialization.writePretty(this)(RocksDBCheckpointMetadata.format)
+
+  def writeToFile(metadataFile: File): Unit = {
+    val writer = Files.newBufferedWriter(metadataFile.toPath, UTF_8)
+    try {
+      writer.write(s"v$VERSION\n")
+      writer.write(this.json)
+    } finally {
+      writer.close()
+    }
+  }
+
+  def immutableFiles: Seq[RocksDBImmutableFile] = sstFiles ++ logFiles
+}
+
+/** Helper class for [[RocksDBCheckpointMetadata]] */
+object RocksDBCheckpointMetadata {
+  val VERSION = 1
+
+  implicit val format = Serialization.formats(NoTypeHints)
+
+  /** Used to convert between classes and JSON. */
+  lazy val mapper = {
+    val _mapper = new ObjectMapper with ScalaObjectMapper
+    _mapper.setSerializationInclusion(Include.NON_ABSENT)
+    _mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+    _mapper.registerModule(DefaultScalaModule)
+    _mapper
+  }
+
+  def readFromFile(metadataFile: File): RocksDBCheckpointMetadata = {
+    val reader = Files.newBufferedReader(metadataFile.toPath, UTF_8)
+    try {
+      val versionLine = reader.readLine()
+      if (versionLine != s"v$VERSION") {
+        throw new IllegalStateException(
+          s"Cannot read RocksDB checkpoint metadata of version $versionLine")
+      }
+      Serialization.read[RocksDBCheckpointMetadata](reader)
+    } finally {
+      reader.close()
+    }
+  }
+
+  def apply(rocksDBFiles: Seq[RocksDBImmutableFile], numKeys: Long): RocksDBCheckpointMetadata = {
+    val sstFiles = rocksDBFiles.collect { case file: RocksDBSstFile => file }
+    val logFiles = rocksDBFiles.collect { case file: RocksDBLogFile => file }
+
+    RocksDBCheckpointMetadata(sstFiles, logFiles, numKeys)
+  }
+}
+
+/**
+ * A RocksDBImmutableFile maintains a mapping between a local RocksDB file name and the name of
+ * its copy on DFS. Since these files are immutable, their DFS copies can be reused.
+ */
+sealed trait RocksDBImmutableFile {
+  def localFileName: String
+  def dfsFileName: String
+  def sizeBytes: Long
+
+  /**
+   * Whether another local file is same as the file described by this class.
+   * A file is same only when the name and the size are same.
+   */
+  def isSameFile(otherFile: File): Boolean = {
+    otherFile.getName == localFileName && otherFile.length() == sizeBytes
+  }
+}
+
+/**
+ * Class to represent a RocksDB SST file. Since this is converted to JSON,
+ * any changes to these MUST be backward-compatible.
+ */
+private[sql] case class RocksDBSstFile(
+    localFileName: String,
+    dfsSstFileName: String,
+    sizeBytes: Long) extends RocksDBImmutableFile {
+
+  override def dfsFileName: String = dfsSstFileName
+}
+
+/**
+ * Class to represent a RocksDB Log file. Since this is converted to JSON,
+ * any changes to these MUST be backward-compatible.
+ */
+private[sql] case class RocksDBLogFile(
+    localFileName: String,
+    dfsLogFileName: String,
+    sizeBytes: Long) extends RocksDBImmutableFile {
+
+  override def dfsFileName: String = dfsLogFileName
+}
+
+object RocksDBImmutableFile {
+  val SST_FILES_DFS_SUBDIR = "SSTs"
+  val LOG_FILES_DFS_SUBDIR = "logs"
+  val LOG_FILES_LOCAL_SUBDIR = "archive"
+
+  def apply(localFileName: String, dfsFileName: String, sizeBytes: Long): RocksDBImmutableFile = {
+    if (isSstFile(localFileName)) {
+      RocksDBSstFile(localFileName, dfsFileName, sizeBytes)
+    } else if (isLogFile(localFileName)) {
+      RocksDBLogFile(localFileName, dfsFileName, sizeBytes)
+    } else {
+      null
+    }
+  }
+
+  def isSstFile(fileName: String): Boolean = fileName.endsWith(".sst")
+
+  def isLogFile(fileName: String): Boolean = fileName.endsWith(".log")
+
+  private def isArchivedLogFile(file: File): Boolean =
+    isLogFile(file.getName) && file.getParentFile.getName == LOG_FILES_LOCAL_SUBDIR
+
+  def isImmutableFile(file: File): Boolean = isSstFile(file.getName) || isArchivedLogFile(file)

Review comment:
       The DFS directory structure only explains SST files, but doesn't mention log files. Does `metadata` also  contain mapping between `archive/00008.log` and `logs/00008-[uuid3].log`? Are archived log files also shared across checkpoint versions?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuanyuanking commented on a change in pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
xuanyuanking commented on a change in pull request #32272:
URL: https://github.com/apache/spark/pull/32272#discussion_r619785921



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.io.File
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.Files
+
+import scala.collection.Seq
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper}
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
+
+/**
+ * Classes to represent metadata of checkpoints saved to DFS. Since this is converted to JSON, any
+ * changes to this MUST be backward-compatible.
+ */
+case class RocksDBCheckpointMetadata(
+    sstFiles: Seq[RocksDBSstFile],
+    logFiles: Seq[RocksDBLogFile],
+    numKeys: Long) {
+  import RocksDBCheckpointMetadata._
+
+  def json: String = {
+    // We turn this field into a null to avoid write a empty logFiles field in the json.
+    val nullified = if (logFiles.isEmpty) this.copy(logFiles = null) else this

Review comment:
       It's related to the usage for RocksDB, we don't always have log files. But we must have sst files.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.io.File
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.Files
+
+import scala.collection.Seq
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper}
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
+
+/**
+ * Classes to represent metadata of checkpoints saved to DFS. Since this is converted to JSON, any
+ * changes to this MUST be backward-compatible.
+ */
+case class RocksDBCheckpointMetadata(
+    sstFiles: Seq[RocksDBSstFile],
+    logFiles: Seq[RocksDBLogFile],
+    numKeys: Long) {
+  import RocksDBCheckpointMetadata._
+
+  def json: String = {
+    // We turn this field into a null to avoid write a empty logFiles field in the json.
+    val nullified = if (logFiles.isEmpty) this.copy(logFiles = null) else this
+    mapper.writeValueAsString(nullified)
+  }
+
+  def prettyJson: String = Serialization.writePretty(this)(RocksDBCheckpointMetadata.format)
+
+  def writeToFile(metadataFile: File): Unit = {
+    val writer = Files.newBufferedWriter(metadataFile.toPath, UTF_8)
+    try {
+      writer.write(s"v$VERSION\n")
+      writer.write(this.json)
+    } finally {
+      writer.close()
+    }
+  }
+
+  def immutableFiles: Seq[RocksDBImmutableFile] = sstFiles ++ logFiles
+}
+
+/** Helper class for [[RocksDBCheckpointMetadata]] */
+object RocksDBCheckpointMetadata {
+  val VERSION = 1
+
+  implicit val format = Serialization.formats(NoTypeHints)
+
+  /** Used to convert between classes and JSON. */
+  lazy val mapper = {
+    val _mapper = new ObjectMapper with ScalaObjectMapper
+    _mapper.setSerializationInclusion(Include.NON_ABSENT)
+    _mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+    _mapper.registerModule(DefaultScalaModule)
+    _mapper
+  }
+
+  def readFromFile(metadataFile: File): RocksDBCheckpointMetadata = {
+    val reader = Files.newBufferedReader(metadataFile.toPath, UTF_8)
+    try {
+      val versionLine = reader.readLine()
+      if (versionLine != s"v$VERSION") {
+        throw new IllegalStateException(
+          s"Cannot read RocksDB checkpoint metadata of version $versionLine")
+      }
+      Serialization.read[RocksDBCheckpointMetadata](reader)
+    } finally {
+      reader.close()
+    }
+  }
+
+  def apply(rocksDBFiles: Seq[RocksDBImmutableFile], numKeys: Long): RocksDBCheckpointMetadata = {
+    val sstFiles = rocksDBFiles.collect { case file: RocksDBSstFile => file }
+    val logFiles = rocksDBFiles.collect { case file: RocksDBLogFile => file }
+
+    RocksDBCheckpointMetadata(sstFiles, logFiles, numKeys)
+  }
+}
+
+/**
+ * A RocksDBImmutableFile maintains a mapping between a local RocksDB file name and the name of
+ * its copy on DFS. Since these files are immutable, their DFS copies can be reused.

Review comment:
       Yes. Can be mapped to more than one local file but for different tasks. The most common scenario is task/stage retry.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.io.File
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.Files
+
+import scala.collection.Seq
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper}
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
+
+/**
+ * Classes to represent metadata of checkpoints saved to DFS. Since this is converted to JSON, any
+ * changes to this MUST be backward-compatible.
+ */
+case class RocksDBCheckpointMetadata(
+    sstFiles: Seq[RocksDBSstFile],
+    logFiles: Seq[RocksDBLogFile],
+    numKeys: Long) {
+  import RocksDBCheckpointMetadata._
+
+  def json: String = {
+    // We turn this field into a null to avoid write a empty logFiles field in the json.
+    val nullified = if (logFiles.isEmpty) this.copy(logFiles = null) else this
+    mapper.writeValueAsString(nullified)
+  }
+
+  def prettyJson: String = Serialization.writePretty(this)(RocksDBCheckpointMetadata.format)
+
+  def writeToFile(metadataFile: File): Unit = {
+    val writer = Files.newBufferedWriter(metadataFile.toPath, UTF_8)
+    try {
+      writer.write(s"v$VERSION\n")
+      writer.write(this.json)
+    } finally {
+      writer.close()
+    }
+  }
+
+  def immutableFiles: Seq[RocksDBImmutableFile] = sstFiles ++ logFiles
+}
+
+/** Helper class for [[RocksDBCheckpointMetadata]] */
+object RocksDBCheckpointMetadata {
+  val VERSION = 1
+
+  implicit val format = Serialization.formats(NoTypeHints)
+
+  /** Used to convert between classes and JSON. */
+  lazy val mapper = {
+    val _mapper = new ObjectMapper with ScalaObjectMapper
+    _mapper.setSerializationInclusion(Include.NON_ABSENT)
+    _mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+    _mapper.registerModule(DefaultScalaModule)
+    _mapper
+  }
+
+  def readFromFile(metadataFile: File): RocksDBCheckpointMetadata = {
+    val reader = Files.newBufferedReader(metadataFile.toPath, UTF_8)
+    try {
+      val versionLine = reader.readLine()
+      if (versionLine != s"v$VERSION") {
+        throw new IllegalStateException(
+          s"Cannot read RocksDB checkpoint metadata of version $versionLine")
+      }
+      Serialization.read[RocksDBCheckpointMetadata](reader)
+    } finally {
+      reader.close()
+    }
+  }
+
+  def apply(rocksDBFiles: Seq[RocksDBImmutableFile], numKeys: Long): RocksDBCheckpointMetadata = {
+    val sstFiles = rocksDBFiles.collect { case file: RocksDBSstFile => file }
+    val logFiles = rocksDBFiles.collect { case file: RocksDBLogFile => file }
+
+    RocksDBCheckpointMetadata(sstFiles, logFiles, numKeys)
+  }
+}
+
+/**
+ * A RocksDBImmutableFile maintains a mapping between a local RocksDB file name and the name of
+ * its copy on DFS. Since these files are immutable, their DFS copies can be reused.
+ */
+sealed trait RocksDBImmutableFile {
+  def localFileName: String
+  def dfsFileName: String
+  def sizeBytes: Long
+
+  /**
+   * Whether another local file is same as the file described by this class.
+   * A file is same only when the name and the size are same.
+   */
+  def isSameFile(otherFile: File): Boolean = {
+    otherFile.getName == localFileName && otherFile.length() == sizeBytes
+  }

Review comment:
       The DFS file name contains UUID, it shouldn't be the same. Normally we use the local file name to filter whether the file is existing locally.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #32272:
URL: https://github.com/apache/spark/pull/32272#discussion_r624388116



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.io.File
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.Files
+
+import scala.collection.Seq
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper}
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
+
+/**
+ * Classes to represent metadata of checkpoints saved to DFS. Since this is converted to JSON, any
+ * changes to this MUST be backward-compatible.
+ */
+case class RocksDBCheckpointMetadata(
+    sstFiles: Seq[RocksDBSstFile],
+    logFiles: Seq[RocksDBLogFile],
+    numKeys: Long) {
+  import RocksDBCheckpointMetadata._
+
+  def json: String = {
+    // We turn this field into a null to avoid write a empty logFiles field in the json.
+    val nullified = if (logFiles.isEmpty) this.copy(logFiles = null) else this
+    mapper.writeValueAsString(nullified)
+  }
+
+  def prettyJson: String = Serialization.writePretty(this)(RocksDBCheckpointMetadata.format)

Review comment:
       Would it produce same output with `json`? Since this doesn't manipulate empty logFiles field. Otherwise is it by intention to handle json and prettyJson differently?

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.io._
+import java.nio.charset.Charset
+
+import org.apache.commons.io.FileUtils
+
+import org.apache.spark._
+import org.apache.spark.util.Utils
+
+class RocksDBSuite extends SparkFunSuite {
+
+  test("checkpoint metadata serde roundtrip") {
+    def checkJsonRoundtrip(metadata: RocksDBCheckpointMetadata, json: String): Unit = {
+      assert(metadata.json == json)
+      withTempDirectory { dir =>
+        val file = new File(dir, "json")
+        FileUtils.write(file, s"v1\n$json", Charset.defaultCharset)
+        assert(metadata == RocksDBCheckpointMetadata.readFromFile(file))
+      }
+    }
+    val sstFiles = Seq(RocksDBSstFile("00001.sst", "00001-uuid.sst", 12345678901234L))
+    val logFiles = Seq(RocksDBLogFile("00001.log", "00001-uuid.log", 12345678901234L))
+
+    // scalastyle:off line.size.limit
+    // should always include sstFiles and numKeys
+    checkJsonRoundtrip(
+      RocksDBCheckpointMetadata(Seq.empty, 0L),
+      """{"sstFiles":[],"numKeys":0}"""
+    )
+    // shouldn't include the "logFiles" field in json when it's empty
+    checkJsonRoundtrip(
+      RocksDBCheckpointMetadata(sstFiles, 12345678901234L),
+      """{"sstFiles":[{"localFileName":"00001.sst","dfsSstFileName":"00001-uuid.sst","sizeBytes":12345678901234}],"numKeys":12345678901234}"""
+    )
+    checkJsonRoundtrip(
+      RocksDBCheckpointMetadata(sstFiles, logFiles, 12345678901234L),
+      """{"sstFiles":[{"localFileName":"00001.sst","dfsSstFileName":"00001-uuid.sst","sizeBytes":12345678901234}],"logFiles":[{"localFileName":"00001.log","dfsLogFileName":"00001-uuid.log","sizeBytes":12345678901234}],"numKeys":12345678901234}""")
+    // scalastyle:on line.size.limit
+  }
+
+  private def withTempDirectory(f: File => Unit): Unit = {

Review comment:
       If I remember correctly, withTempDir is defined in SparkFunSuite so you can just leverage the method.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on pull request #32272:
URL: https://github.com/apache/spark/pull/32272#issuecomment-827262787


   Sorry to visit this lately. I just went through design doc and left some comments. Probably it'd be nice if we can resolve comments on the design doc and reflect them to current/following PRs. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuanyuanking commented on pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
xuanyuanking commented on pull request #32272:
URL: https://github.com/apache/spark/pull/32272#issuecomment-842119218


   ```
   About the file name, RocksDBFileManager.scala doesn't contain any RocksDBFileManager. Shall we rename it?
   ```
   All these checkpointing metadata is for RocksDBFileManager. As my plan, the next PR is for the save path of RocksDBFileManager.
   
   ```
   I think we still need to look at how this is going to be used in the full context.
   ```
   Agree, I plan to use [this comment](https://github.com/apache/spark/pull/32272#discussion_r632908535) as a demo. In the next PR, I'll reference this comment to provide the full context.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on pull request #32272:
URL: https://github.com/apache/spark/pull/32272#issuecomment-849467165


   @xuanyuanking Could you please push an empty commit for the case Jenkins doesn't work? Thanks in advance!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuanyuanking commented on a change in pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
xuanyuanking commented on a change in pull request #32272:
URL: https://github.com/apache/spark/pull/32272#discussion_r631689463



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.io.File
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.Files
+
+import scala.collection.Seq
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper}
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
+
+/**
+ * Classes to represent metadata of checkpoints saved to DFS. Since this is converted to JSON, any
+ * changes to this MUST be backward-compatible.
+ */
+case class RocksDBCheckpointMetadata(
+    sstFiles: Seq[RocksDBSstFile],
+    logFiles: Seq[RocksDBLogFile],
+    numKeys: Long) {
+  import RocksDBCheckpointMetadata._
+
+  def json: String = {
+    // We turn this field into a null to avoid write a empty logFiles field in the json.
+    val nullified = if (logFiles.isEmpty) this.copy(logFiles = null) else this

Review comment:
       Yes, the `logFiles` field not always has value.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a change in pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
viirya commented on a change in pull request #32272:
URL: https://github.com/apache/spark/pull/32272#discussion_r638139884



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.io.File
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.Files
+
+import scala.collection.Seq
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper}
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
+
+/**
+ * Classes to represent metadata of checkpoints saved to DFS. Since this is converted to JSON, any
+ * changes to this MUST be backward-compatible.
+ */
+case class RocksDBCheckpointMetadata(
+    sstFiles: Seq[RocksDBSstFile],
+    logFiles: Seq[RocksDBLogFile],
+    numKeys: Long) {
+  import RocksDBCheckpointMetadata._
+
+  def json: String = {
+    // We turn this field into a null to avoid write a empty logFiles field in the json.
+    val nullified = if (logFiles.isEmpty) this.copy(logFiles = null) else this
+    mapper.writeValueAsString(nullified)
+  }
+
+  def prettyJson: String = Serialization.writePretty(this)(RocksDBCheckpointMetadata.format)
+
+  def writeToFile(metadataFile: File): Unit = {
+    val writer = Files.newBufferedWriter(metadataFile.toPath, UTF_8)
+    try {
+      writer.write(s"v$VERSION\n")
+      writer.write(this.json)
+    } finally {
+      writer.close()
+    }
+  }
+
+  def immutableFiles: Seq[RocksDBImmutableFile] = sstFiles ++ logFiles
+}
+
+/** Helper class for [[RocksDBCheckpointMetadata]] */
+object RocksDBCheckpointMetadata {
+  val VERSION = 1
+
+  implicit val format = Serialization.formats(NoTypeHints)
+
+  /** Used to convert between classes and JSON. */
+  lazy val mapper = {
+    val _mapper = new ObjectMapper with ScalaObjectMapper
+    _mapper.setSerializationInclusion(Include.NON_ABSENT)
+    _mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+    _mapper.registerModule(DefaultScalaModule)
+    _mapper
+  }
+
+  def readFromFile(metadataFile: File): RocksDBCheckpointMetadata = {
+    val reader = Files.newBufferedReader(metadataFile.toPath, UTF_8)
+    try {
+      val versionLine = reader.readLine()
+      if (versionLine != s"v$VERSION") {
+        throw new IllegalStateException(
+          s"Cannot read RocksDB checkpoint metadata of version $versionLine")
+      }
+      Serialization.read[RocksDBCheckpointMetadata](reader)
+    } finally {
+      reader.close()
+    }
+  }
+
+  def apply(rocksDBFiles: Seq[RocksDBImmutableFile], numKeys: Long): RocksDBCheckpointMetadata = {
+    val sstFiles = rocksDBFiles.collect { case file: RocksDBSstFile => file }
+    val logFiles = rocksDBFiles.collect { case file: RocksDBLogFile => file }
+
+    RocksDBCheckpointMetadata(sstFiles, logFiles, numKeys)
+  }
+}
+
+/**
+ * A RocksDBImmutableFile maintains a mapping between a local RocksDB file name and the name of
+ * its copy on DFS. Since these files are immutable, their DFS copies can be reused.
+ */
+sealed trait RocksDBImmutableFile {
+  def localFileName: String
+  def dfsFileName: String
+  def sizeBytes: Long
+
+  /**
+   * Whether another local file is same as the file described by this class.
+   * A file is same only when the name and the size are same.
+   */
+  def isSameFile(otherFile: File): Boolean = {
+    otherFile.getName == localFileName && otherFile.length() == sizeBytes
+  }
+}
+
+/**
+ * Class to represent a RocksDB SST file. Since this is converted to JSON,
+ * any changes to these MUST be backward-compatible.
+ */
+private[sql] case class RocksDBSstFile(
+    localFileName: String,
+    dfsSstFileName: String,
+    sizeBytes: Long) extends RocksDBImmutableFile {
+
+  override def dfsFileName: String = dfsSstFileName
+}
+
+/**
+ * Class to represent a RocksDB Log file. Since this is converted to JSON,
+ * any changes to these MUST be backward-compatible.
+ */
+private[sql] case class RocksDBLogFile(
+    localFileName: String,
+    dfsLogFileName: String,
+    sizeBytes: Long) extends RocksDBImmutableFile {
+
+  override def dfsFileName: String = dfsLogFileName
+}
+
+object RocksDBImmutableFile {
+  val SST_FILES_DFS_SUBDIR = "SSTs"
+  val LOG_FILES_DFS_SUBDIR = "logs"
+  val LOG_FILES_LOCAL_SUBDIR = "archive"
+
+  def apply(localFileName: String, dfsFileName: String, sizeBytes: Long): RocksDBImmutableFile = {
+    if (isSstFile(localFileName)) {
+      RocksDBSstFile(localFileName, dfsFileName, sizeBytes)
+    } else if (isLogFile(localFileName)) {
+      RocksDBLogFile(localFileName, dfsFileName, sizeBytes)
+    } else {
+      null
+    }
+  }
+
+  def isSstFile(fileName: String): Boolean = fileName.endsWith(".sst")
+
+  def isLogFile(fileName: String): Boolean = fileName.endsWith(".log")
+
+  private def isArchivedLogFile(file: File): Boolean =
+    isLogFile(file.getName) && file.getParentFile.getName == LOG_FILES_LOCAL_SUBDIR
+
+  def isImmutableFile(file: File): Boolean = isSstFile(file.getName) || isArchivedLogFile(file)

Review comment:
       So there won't be any non-archived log files touched by this. Why we need to distinguish archived log files?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuanyuanking commented on a change in pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
xuanyuanking commented on a change in pull request #32272:
URL: https://github.com/apache/spark/pull/32272#discussion_r619785921



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.io.File
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.Files
+
+import scala.collection.Seq
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper}
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
+
+/**
+ * Classes to represent metadata of checkpoints saved to DFS. Since this is converted to JSON, any
+ * changes to this MUST be backward-compatible.
+ */
+case class RocksDBCheckpointMetadata(
+    sstFiles: Seq[RocksDBSstFile],
+    logFiles: Seq[RocksDBLogFile],
+    numKeys: Long) {
+  import RocksDBCheckpointMetadata._
+
+  def json: String = {
+    // We turn this field into a null to avoid write a empty logFiles field in the json.
+    val nullified = if (logFiles.isEmpty) this.copy(logFiles = null) else this

Review comment:
       It's related to the usage for RocksDB, we don't always have log files. But we must have sst files.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #32272:
URL: https://github.com/apache/spark/pull/32272#issuecomment-840597560


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/138507/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32272:
URL: https://github.com/apache/spark/pull/32272#issuecomment-840580229


   **[Test build #138507 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/138507/testReport)** for PR 32272 at commit [`3b91a26`](https://github.com/apache/spark/commit/3b91a268c356544c8b6f75e7c365e0bfa7459e67).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuanyuanking commented on pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
xuanyuanking commented on pull request #32272:
URL: https://github.com/apache/spark/pull/32272#issuecomment-843247088


   To provide more context for the functions in this PR, I created the WIP PR (#32582) and referenced the comment there. Please check whether we can ship this for now. Thanks :) @HeartSaVioR @viirya 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a change in pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
viirya commented on a change in pull request #32272:
URL: https://github.com/apache/spark/pull/32272#discussion_r618075956



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.io.File
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.Files
+
+import scala.collection.Seq
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper}
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
+
+/**
+ * Classes to represent metadata of checkpoints saved to DFS. Since this is converted to JSON, any
+ * changes to this MUST be backward-compatible.
+ */
+case class RocksDBCheckpointMetadata(
+    sstFiles: Seq[RocksDBSstFile],
+    logFiles: Seq[RocksDBLogFile],
+    numKeys: Long) {
+  import RocksDBCheckpointMetadata._
+
+  def json: String = {
+    // We turn this field into a null to avoid write a empty logFiles field in the json.
+    val nullified = if (logFiles.isEmpty) this.copy(logFiles = null) else this
+    mapper.writeValueAsString(nullified)
+  }
+
+  def prettyJson: String = Serialization.writePretty(this)(RocksDBCheckpointMetadata.format)
+
+  def writeToFile(metadataFile: File): Unit = {
+    val writer = Files.newBufferedWriter(metadataFile.toPath, UTF_8)
+    try {
+      writer.write(s"v$VERSION\n")
+      writer.write(this.json)
+    } finally {
+      writer.close()
+    }
+  }
+
+  def immutableFiles: Seq[RocksDBImmutableFile] = sstFiles ++ logFiles
+}
+
+/** Helper class for [[RocksDBCheckpointMetadata]] */
+object RocksDBCheckpointMetadata {
+  val VERSION = 1
+
+  implicit val format = Serialization.formats(NoTypeHints)
+
+  /** Used to convert between classes and JSON. */
+  lazy val mapper = {
+    val _mapper = new ObjectMapper with ScalaObjectMapper
+    _mapper.setSerializationInclusion(Include.NON_ABSENT)
+    _mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+    _mapper.registerModule(DefaultScalaModule)
+    _mapper
+  }
+
+  def readFromFile(metadataFile: File): RocksDBCheckpointMetadata = {
+    val reader = Files.newBufferedReader(metadataFile.toPath, UTF_8)
+    try {
+      val versionLine = reader.readLine()
+      if (versionLine != s"v$VERSION") {
+        throw new IllegalStateException(
+          s"Cannot read RocksDB checkpoint metadata of version $versionLine")
+      }
+      Serialization.read[RocksDBCheckpointMetadata](reader)
+    } finally {
+      reader.close()
+    }
+  }
+
+  def apply(rocksDBFiles: Seq[RocksDBImmutableFile], numKeys: Long): RocksDBCheckpointMetadata = {
+    val sstFiles = rocksDBFiles.collect { case file: RocksDBSstFile => file }
+    val logFiles = rocksDBFiles.collect { case file: RocksDBLogFile => file }
+
+    RocksDBCheckpointMetadata(sstFiles, logFiles, numKeys)
+  }
+}
+
+/**
+ * A RocksDBImmutableFile maintains a mapping between a local RocksDB file name and the name of
+ * its copy on DFS. Since these files are immutable, their DFS copies can be reused.

Review comment:
       When do we reuse the DFS copies?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a change in pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
viirya commented on a change in pull request #32272:
URL: https://github.com/apache/spark/pull/32272#discussion_r632908535



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.io.File
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.Files
+
+import scala.collection.Seq
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper}
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
+
+/**
+ * Classes to represent metadata of checkpoints saved to DFS. Since this is converted to JSON, any
+ * changes to this MUST be backward-compatible.
+ */
+case class RocksDBCheckpointMetadata(
+    sstFiles: Seq[RocksDBSstFile],
+    logFiles: Seq[RocksDBLogFile],
+    numKeys: Long) {
+  import RocksDBCheckpointMetadata._
+
+  def json: String = {
+    // We turn this field into a null to avoid write a empty logFiles field in the json.
+    val nullified = if (logFiles.isEmpty) this.copy(logFiles = null) else this
+    mapper.writeValueAsString(nullified)
+  }
+
+  def prettyJson: String = Serialization.writePretty(this)(RocksDBCheckpointMetadata.format)
+
+  def writeToFile(metadataFile: File): Unit = {
+    val writer = Files.newBufferedWriter(metadataFile.toPath, UTF_8)
+    try {
+      writer.write(s"v$VERSION\n")
+      writer.write(this.json)
+    } finally {
+      writer.close()
+    }
+  }
+
+  def immutableFiles: Seq[RocksDBImmutableFile] = sstFiles ++ logFiles
+}
+
+/** Helper class for [[RocksDBCheckpointMetadata]] */
+object RocksDBCheckpointMetadata {
+  val VERSION = 1
+
+  implicit val format = Serialization.formats(NoTypeHints)
+
+  /** Used to convert between classes and JSON. */
+  lazy val mapper = {
+    val _mapper = new ObjectMapper with ScalaObjectMapper
+    _mapper.setSerializationInclusion(Include.NON_ABSENT)
+    _mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+    _mapper.registerModule(DefaultScalaModule)
+    _mapper
+  }
+
+  def readFromFile(metadataFile: File): RocksDBCheckpointMetadata = {
+    val reader = Files.newBufferedReader(metadataFile.toPath, UTF_8)
+    try {
+      val versionLine = reader.readLine()
+      if (versionLine != s"v$VERSION") {
+        throw new IllegalStateException(
+          s"Cannot read RocksDB checkpoint metadata of version $versionLine")
+      }
+      Serialization.read[RocksDBCheckpointMetadata](reader)
+    } finally {
+      reader.close()
+    }
+  }
+
+  def apply(rocksDBFiles: Seq[RocksDBImmutableFile], numKeys: Long): RocksDBCheckpointMetadata = {
+    val sstFiles = rocksDBFiles.collect { case file: RocksDBSstFile => file }
+    val logFiles = rocksDBFiles.collect { case file: RocksDBLogFile => file }
+
+    RocksDBCheckpointMetadata(sstFiles, logFiles, numKeys)
+  }
+}
+
+/**
+ * A RocksDBImmutableFile maintains a mapping between a local RocksDB file name and the name of
+ * its copy on DFS. Since these files are immutable, their DFS copies can be reused.
+ */
+sealed trait RocksDBImmutableFile {
+  def localFileName: String
+  def dfsFileName: String
+  def sizeBytes: Long
+
+  /**
+   * Whether another local file is same as the file described by this class.
+   * A file is same only when the name and the size are same.
+   */
+  def isSameFile(otherFile: File): Boolean = {
+    otherFile.getName == localFileName && otherFile.length() == sizeBytes
+  }
+}
+
+/**
+ * Class to represent a RocksDB SST file. Since this is converted to JSON,
+ * any changes to these MUST be backward-compatible.
+ */
+private[sql] case class RocksDBSstFile(
+    localFileName: String,
+    dfsSstFileName: String,
+    sizeBytes: Long) extends RocksDBImmutableFile {
+
+  override def dfsFileName: String = dfsSstFileName
+}
+
+/**
+ * Class to represent a RocksDB Log file. Since this is converted to JSON,
+ * any changes to these MUST be backward-compatible.
+ */
+private[sql] case class RocksDBLogFile(
+    localFileName: String,
+    dfsLogFileName: String,
+    sizeBytes: Long) extends RocksDBImmutableFile {
+
+  override def dfsFileName: String = dfsLogFileName
+}
+
+object RocksDBImmutableFile {
+  val SST_FILES_DFS_SUBDIR = "SSTs"
+  val LOG_FILES_DFS_SUBDIR = "logs"
+  val LOG_FILES_LOCAL_SUBDIR = "archive"
+
+  def apply(localFileName: String, dfsFileName: String, sizeBytes: Long): RocksDBImmutableFile = {
+    if (isSstFile(localFileName)) {
+      RocksDBSstFile(localFileName, dfsFileName, sizeBytes)
+    } else if (isLogFile(localFileName)) {
+      RocksDBLogFile(localFileName, dfsFileName, sizeBytes)
+    } else {
+      null
+    }
+  }
+
+  def isSstFile(fileName: String): Boolean = fileName.endsWith(".sst")
+
+  def isLogFile(fileName: String): Boolean = fileName.endsWith(".log")
+
+  private def isArchivedLogFile(file: File): Boolean =
+    isLogFile(file.getName) && file.getParentFile.getName == LOG_FILES_LOCAL_SUBDIR
+
+  def isImmutableFile(file: File): Boolean = isSstFile(file.getName) || isArchivedLogFile(file)

Review comment:
       Hmm, so if a log file not archived, it is not immutable? But `RocksDBLogFile` is actually `RocksDBImmutableFile`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32272:
URL: https://github.com/apache/spark/pull/32272#issuecomment-849652688


   **[Test build #139018 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/139018/testReport)** for PR 32272 at commit [`f52adac`](https://github.com/apache/spark/commit/f52adac64ae66a3d01da23bee69106bd17b1a4e7).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a change in pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
viirya commented on a change in pull request #32272:
URL: https://github.com/apache/spark/pull/32272#discussion_r632907718



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.io.File
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.Files
+
+import scala.collection.Seq
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper}
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
+
+/**
+ * Classes to represent metadata of checkpoints saved to DFS. Since this is converted to JSON, any
+ * changes to this MUST be backward-compatible.

Review comment:
       <del>About backward-compatibility, shall we introduce version number?</del>
   
   Oh, I saw it below. I also forgot I saw it before...




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32272:
URL: https://github.com/apache/spark/pull/32272#issuecomment-824105487


   **[Test build #137727 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/137727/testReport)** for PR 32272 at commit [`7ce24ce`](https://github.com/apache/spark/commit/7ce24cec183224eb49c8801a2a756262310e9752).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds the following public classes _(experimental)_:
     * `case class RocksDBCheckpointMetadata(`
     * `sealed trait RocksDBImmutableFile `


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuanyuanking commented on pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
xuanyuanking commented on pull request #32272:
URL: https://github.com/apache/spark/pull/32272#issuecomment-849468978


   @HeartSaVioR Sure, thanks for reminding.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #32272:
URL: https://github.com/apache/spark/pull/32272#issuecomment-840597560


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/138507/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a change in pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
viirya commented on a change in pull request #32272:
URL: https://github.com/apache/spark/pull/32272#discussion_r618075800



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.io.File
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.Files
+
+import scala.collection.Seq
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper}
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
+
+/**
+ * Classes to represent metadata of checkpoints saved to DFS. Since this is converted to JSON, any
+ * changes to this MUST be backward-compatible.
+ */
+case class RocksDBCheckpointMetadata(
+    sstFiles: Seq[RocksDBSstFile],
+    logFiles: Seq[RocksDBLogFile],
+    numKeys: Long) {
+  import RocksDBCheckpointMetadata._
+
+  def json: String = {
+    // We turn this field into a null to avoid write a empty logFiles field in the json.
+    val nullified = if (logFiles.isEmpty) this.copy(logFiles = null) else this
+    mapper.writeValueAsString(nullified)
+  }
+
+  def prettyJson: String = Serialization.writePretty(this)(RocksDBCheckpointMetadata.format)
+
+  def writeToFile(metadataFile: File): Unit = {
+    val writer = Files.newBufferedWriter(metadataFile.toPath, UTF_8)
+    try {
+      writer.write(s"v$VERSION\n")
+      writer.write(this.json)
+    } finally {
+      writer.close()
+    }
+  }
+
+  def immutableFiles: Seq[RocksDBImmutableFile] = sstFiles ++ logFiles
+}
+
+/** Helper class for [[RocksDBCheckpointMetadata]] */
+object RocksDBCheckpointMetadata {
+  val VERSION = 1
+
+  implicit val format = Serialization.formats(NoTypeHints)
+
+  /** Used to convert between classes and JSON. */
+  lazy val mapper = {
+    val _mapper = new ObjectMapper with ScalaObjectMapper
+    _mapper.setSerializationInclusion(Include.NON_ABSENT)
+    _mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+    _mapper.registerModule(DefaultScalaModule)
+    _mapper
+  }
+
+  def readFromFile(metadataFile: File): RocksDBCheckpointMetadata = {
+    val reader = Files.newBufferedReader(metadataFile.toPath, UTF_8)
+    try {
+      val versionLine = reader.readLine()
+      if (versionLine != s"v$VERSION") {
+        throw new IllegalStateException(
+          s"Cannot read RocksDB checkpoint metadata of version $versionLine")
+      }
+      Serialization.read[RocksDBCheckpointMetadata](reader)
+    } finally {
+      reader.close()
+    }
+  }
+
+  def apply(rocksDBFiles: Seq[RocksDBImmutableFile], numKeys: Long): RocksDBCheckpointMetadata = {
+    val sstFiles = rocksDBFiles.collect { case file: RocksDBSstFile => file }
+    val logFiles = rocksDBFiles.collect { case file: RocksDBLogFile => file }
+
+    RocksDBCheckpointMetadata(sstFiles, logFiles, numKeys)
+  }
+}
+
+/**
+ * A RocksDBImmutableFile maintains a mapping between a local RocksDB file name and the name of
+ * its copy on DFS. Since these files are immutable, their DFS copies can be reused.

Review comment:
       Does it mean that a DFS copy can be mapped to more than one local file names?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a change in pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
viirya commented on a change in pull request #32272:
URL: https://github.com/apache/spark/pull/32272#discussion_r637473571



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.io.File
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.Files
+
+import scala.collection.Seq
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper}
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
+
+/**
+ * Classes to represent metadata of checkpoints saved to DFS. Since this is converted to JSON, any
+ * changes to this MUST be backward-compatible.
+ */
+case class RocksDBCheckpointMetadata(
+    sstFiles: Seq[RocksDBSstFile],
+    logFiles: Seq[RocksDBLogFile],
+    numKeys: Long) {
+  import RocksDBCheckpointMetadata._
+
+  def json: String = {
+    // We turn this field into a null to avoid write a empty logFiles field in the json.
+    val nullified = if (logFiles.isEmpty) this.copy(logFiles = null) else this
+    mapper.writeValueAsString(nullified)
+  }
+
+  def prettyJson: String = Serialization.writePretty(this)(RocksDBCheckpointMetadata.format)
+
+  def writeToFile(metadataFile: File): Unit = {
+    val writer = Files.newBufferedWriter(metadataFile.toPath, UTF_8)
+    try {
+      writer.write(s"v$VERSION\n")
+      writer.write(this.json)
+    } finally {
+      writer.close()
+    }
+  }
+
+  def immutableFiles: Seq[RocksDBImmutableFile] = sstFiles ++ logFiles
+}
+
+/** Helper class for [[RocksDBCheckpointMetadata]] */
+object RocksDBCheckpointMetadata {
+  val VERSION = 1
+
+  implicit val format = Serialization.formats(NoTypeHints)
+
+  /** Used to convert between classes and JSON. */
+  lazy val mapper = {
+    val _mapper = new ObjectMapper with ScalaObjectMapper
+    _mapper.setSerializationInclusion(Include.NON_ABSENT)
+    _mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+    _mapper.registerModule(DefaultScalaModule)
+    _mapper
+  }
+
+  def readFromFile(metadataFile: File): RocksDBCheckpointMetadata = {
+    val reader = Files.newBufferedReader(metadataFile.toPath, UTF_8)
+    try {
+      val versionLine = reader.readLine()
+      if (versionLine != s"v$VERSION") {
+        throw new IllegalStateException(
+          s"Cannot read RocksDB checkpoint metadata of version $versionLine")
+      }
+      Serialization.read[RocksDBCheckpointMetadata](reader)
+    } finally {
+      reader.close()
+    }
+  }
+
+  def apply(rocksDBFiles: Seq[RocksDBImmutableFile], numKeys: Long): RocksDBCheckpointMetadata = {
+    val sstFiles = rocksDBFiles.collect { case file: RocksDBSstFile => file }
+    val logFiles = rocksDBFiles.collect { case file: RocksDBLogFile => file }
+
+    RocksDBCheckpointMetadata(sstFiles, logFiles, numKeys)
+  }
+}
+
+/**
+ * A RocksDBImmutableFile maintains a mapping between a local RocksDB file name and the name of
+ * its copy on DFS. Since these files are immutable, their DFS copies can be reused.
+ */
+sealed trait RocksDBImmutableFile {
+  def localFileName: String
+  def dfsFileName: String
+  def sizeBytes: Long
+
+  /**
+   * Whether another local file is same as the file described by this class.
+   * A file is same only when the name and the size are same.
+   */
+  def isSameFile(otherFile: File): Boolean = {
+    otherFile.getName == localFileName && otherFile.length() == sizeBytes
+  }
+}
+
+/**
+ * Class to represent a RocksDB SST file. Since this is converted to JSON,
+ * any changes to these MUST be backward-compatible.
+ */
+private[sql] case class RocksDBSstFile(
+    localFileName: String,
+    dfsSstFileName: String,
+    sizeBytes: Long) extends RocksDBImmutableFile {
+
+  override def dfsFileName: String = dfsSstFileName
+}
+
+/**
+ * Class to represent a RocksDB Log file. Since this is converted to JSON,
+ * any changes to these MUST be backward-compatible.
+ */
+private[sql] case class RocksDBLogFile(
+    localFileName: String,
+    dfsLogFileName: String,
+    sizeBytes: Long) extends RocksDBImmutableFile {
+
+  override def dfsFileName: String = dfsLogFileName
+}
+
+object RocksDBImmutableFile {
+  val SST_FILES_DFS_SUBDIR = "SSTs"
+  val LOG_FILES_DFS_SUBDIR = "logs"
+  val LOG_FILES_LOCAL_SUBDIR = "archive"
+
+  def apply(localFileName: String, dfsFileName: String, sizeBytes: Long): RocksDBImmutableFile = {
+    if (isSstFile(localFileName)) {
+      RocksDBSstFile(localFileName, dfsFileName, sizeBytes)
+    } else if (isLogFile(localFileName)) {
+      RocksDBLogFile(localFileName, dfsFileName, sizeBytes)
+    } else {
+      null
+    }
+  }
+
+  def isSstFile(fileName: String): Boolean = fileName.endsWith(".sst")
+
+  def isLogFile(fileName: String): Boolean = fileName.endsWith(".log")
+
+  private def isArchivedLogFile(file: File): Boolean =
+    isLogFile(file.getName) && file.getParentFile.getName == LOG_FILES_LOCAL_SUBDIR
+
+  def isImmutableFile(file: File): Boolean = isSstFile(file.getName) || isArchivedLogFile(file)

Review comment:
       I read the save path, but still a bit confused. Is there a log file which is not archived log file? I mean why only archived log file is immutable? `isArchivedLogFile` considers a log file is archived by looking at its parent dir, so when we move a log file to `LOG_FILES_LOCAL_SUBDIR` and make it immutable?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a change in pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
viirya commented on a change in pull request #32272:
URL: https://github.com/apache/spark/pull/32272#discussion_r638505924



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.io.File
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.Files
+
+import scala.collection.Seq
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper}
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
+
+/**
+ * Classes to represent metadata of checkpoints saved to DFS. Since this is converted to JSON, any
+ * changes to this MUST be backward-compatible.
+ */
+case class RocksDBCheckpointMetadata(
+    sstFiles: Seq[RocksDBSstFile],
+    logFiles: Seq[RocksDBLogFile],
+    numKeys: Long) {
+  import RocksDBCheckpointMetadata._
+
+  def json: String = {
+    // We turn this field into a null to avoid write a empty logFiles field in the json.
+    val nullified = if (logFiles.isEmpty) this.copy(logFiles = null) else this
+    mapper.writeValueAsString(nullified)
+  }
+
+  def prettyJson: String = Serialization.writePretty(this)(RocksDBCheckpointMetadata.format)
+
+  def writeToFile(metadataFile: File): Unit = {
+    val writer = Files.newBufferedWriter(metadataFile.toPath, UTF_8)
+    try {
+      writer.write(s"v$VERSION\n")
+      writer.write(this.json)
+    } finally {
+      writer.close()
+    }
+  }
+
+  def immutableFiles: Seq[RocksDBImmutableFile] = sstFiles ++ logFiles
+}
+
+/** Helper class for [[RocksDBCheckpointMetadata]] */
+object RocksDBCheckpointMetadata {
+  val VERSION = 1
+
+  implicit val format = Serialization.formats(NoTypeHints)
+
+  /** Used to convert between classes and JSON. */
+  lazy val mapper = {
+    val _mapper = new ObjectMapper with ScalaObjectMapper
+    _mapper.setSerializationInclusion(Include.NON_ABSENT)
+    _mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+    _mapper.registerModule(DefaultScalaModule)
+    _mapper
+  }
+
+  def readFromFile(metadataFile: File): RocksDBCheckpointMetadata = {
+    val reader = Files.newBufferedReader(metadataFile.toPath, UTF_8)
+    try {
+      val versionLine = reader.readLine()
+      if (versionLine != s"v$VERSION") {
+        throw new IllegalStateException(
+          s"Cannot read RocksDB checkpoint metadata of version $versionLine")
+      }
+      Serialization.read[RocksDBCheckpointMetadata](reader)
+    } finally {
+      reader.close()
+    }
+  }
+
+  def apply(rocksDBFiles: Seq[RocksDBImmutableFile], numKeys: Long): RocksDBCheckpointMetadata = {
+    val sstFiles = rocksDBFiles.collect { case file: RocksDBSstFile => file }
+    val logFiles = rocksDBFiles.collect { case file: RocksDBLogFile => file }
+
+    RocksDBCheckpointMetadata(sstFiles, logFiles, numKeys)
+  }
+}
+
+/**
+ * A RocksDBImmutableFile maintains a mapping between a local RocksDB file name and the name of
+ * its copy on DFS. Since these files are immutable, their DFS copies can be reused.
+ */
+sealed trait RocksDBImmutableFile {
+  def localFileName: String
+  def dfsFileName: String
+  def sizeBytes: Long
+
+  /**
+   * Whether another local file is same as the file described by this class.
+   * A file is same only when the name and the size are same.
+   */
+  def isSameFile(otherFile: File): Boolean = {
+    otherFile.getName == localFileName && otherFile.length() == sizeBytes
+  }
+}
+
+/**
+ * Class to represent a RocksDB SST file. Since this is converted to JSON,
+ * any changes to these MUST be backward-compatible.
+ */
+private[sql] case class RocksDBSstFile(
+    localFileName: String,
+    dfsSstFileName: String,
+    sizeBytes: Long) extends RocksDBImmutableFile {
+
+  override def dfsFileName: String = dfsSstFileName
+}
+
+/**
+ * Class to represent a RocksDB Log file. Since this is converted to JSON,
+ * any changes to these MUST be backward-compatible.
+ */
+private[sql] case class RocksDBLogFile(
+    localFileName: String,
+    dfsLogFileName: String,
+    sizeBytes: Long) extends RocksDBImmutableFile {
+
+  override def dfsFileName: String = dfsLogFileName
+}
+
+object RocksDBImmutableFile {
+  val SST_FILES_DFS_SUBDIR = "SSTs"
+  val LOG_FILES_DFS_SUBDIR = "logs"
+  val LOG_FILES_LOCAL_SUBDIR = "archive"
+
+  def apply(localFileName: String, dfsFileName: String, sizeBytes: Long): RocksDBImmutableFile = {
+    if (isSstFile(localFileName)) {
+      RocksDBSstFile(localFileName, dfsFileName, sizeBytes)
+    } else if (isLogFile(localFileName)) {
+      RocksDBLogFile(localFileName, dfsFileName, sizeBytes)
+    } else {
+      null
+    }
+  }
+
+  def isSstFile(fileName: String): Boolean = fileName.endsWith(".sst")
+
+  def isLogFile(fileName: String): Boolean = fileName.endsWith(".log")
+
+  private def isArchivedLogFile(file: File): Boolean =
+    isLogFile(file.getName) && file.getParentFile.getName == LOG_FILES_LOCAL_SUBDIR
+
+  def isImmutableFile(file: File): Boolean = isSstFile(file.getName) || isArchivedLogFile(file)

Review comment:
       Looks they are shared too as they are immutable files. Can you also update the code comment there?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32272:
URL: https://github.com/apache/spark/pull/32272#issuecomment-840444423


   **[Test build #138507 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/138507/testReport)** for PR 32272 at commit [`3b91a26`](https://github.com/apache/spark/commit/3b91a268c356544c8b6f75e7c365e0bfa7459e67).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuanyuanking commented on a change in pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
xuanyuanking commented on a change in pull request #32272:
URL: https://github.com/apache/spark/pull/32272#discussion_r619786926



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.io.File
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.Files
+
+import scala.collection.Seq
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper}
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
+
+/**
+ * Classes to represent metadata of checkpoints saved to DFS. Since this is converted to JSON, any
+ * changes to this MUST be backward-compatible.
+ */
+case class RocksDBCheckpointMetadata(
+    sstFiles: Seq[RocksDBSstFile],
+    logFiles: Seq[RocksDBLogFile],
+    numKeys: Long) {
+  import RocksDBCheckpointMetadata._
+
+  def json: String = {
+    // We turn this field into a null to avoid write a empty logFiles field in the json.
+    val nullified = if (logFiles.isEmpty) this.copy(logFiles = null) else this
+    mapper.writeValueAsString(nullified)
+  }
+
+  def prettyJson: String = Serialization.writePretty(this)(RocksDBCheckpointMetadata.format)
+
+  def writeToFile(metadataFile: File): Unit = {
+    val writer = Files.newBufferedWriter(metadataFile.toPath, UTF_8)
+    try {
+      writer.write(s"v$VERSION\n")
+      writer.write(this.json)
+    } finally {
+      writer.close()
+    }
+  }
+
+  def immutableFiles: Seq[RocksDBImmutableFile] = sstFiles ++ logFiles
+}
+
+/** Helper class for [[RocksDBCheckpointMetadata]] */
+object RocksDBCheckpointMetadata {
+  val VERSION = 1
+
+  implicit val format = Serialization.formats(NoTypeHints)
+
+  /** Used to convert between classes and JSON. */
+  lazy val mapper = {
+    val _mapper = new ObjectMapper with ScalaObjectMapper
+    _mapper.setSerializationInclusion(Include.NON_ABSENT)
+    _mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+    _mapper.registerModule(DefaultScalaModule)
+    _mapper
+  }
+
+  def readFromFile(metadataFile: File): RocksDBCheckpointMetadata = {
+    val reader = Files.newBufferedReader(metadataFile.toPath, UTF_8)
+    try {
+      val versionLine = reader.readLine()
+      if (versionLine != s"v$VERSION") {
+        throw new IllegalStateException(
+          s"Cannot read RocksDB checkpoint metadata of version $versionLine")
+      }
+      Serialization.read[RocksDBCheckpointMetadata](reader)
+    } finally {
+      reader.close()
+    }
+  }
+
+  def apply(rocksDBFiles: Seq[RocksDBImmutableFile], numKeys: Long): RocksDBCheckpointMetadata = {
+    val sstFiles = rocksDBFiles.collect { case file: RocksDBSstFile => file }
+    val logFiles = rocksDBFiles.collect { case file: RocksDBLogFile => file }
+
+    RocksDBCheckpointMetadata(sstFiles, logFiles, numKeys)
+  }
+}
+
+/**
+ * A RocksDBImmutableFile maintains a mapping between a local RocksDB file name and the name of
+ * its copy on DFS. Since these files are immutable, their DFS copies can be reused.

Review comment:
       Yes. Can be mapped to more than one local file but for different tasks. The most common scenario is task/stage retry.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #32272:
URL: https://github.com/apache/spark/pull/32272#discussion_r635722361



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.io.File
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.Files
+
+import scala.collection.Seq
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper}
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
+
+/**
+ * Classes to represent metadata of checkpoints saved to DFS. Since this is converted to JSON, any
+ * changes to this MUST be backward-compatible.
+ */
+case class RocksDBCheckpointMetadata(
+    sstFiles: Seq[RocksDBSstFile],
+    logFiles: Seq[RocksDBLogFile],
+    numKeys: Long) {
+  import RocksDBCheckpointMetadata._
+
+  def json: String = {
+    // We turn this field into a null to avoid write a empty logFiles field in the json.
+    val nullified = if (logFiles.isEmpty) this.copy(logFiles = null) else this
+    mapper.writeValueAsString(nullified)
+  }
+
+  def prettyJson: String = Serialization.writePretty(this)(RocksDBCheckpointMetadata.format)

Review comment:
       OK I see where it is used. Just for logging - got it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuanyuanking commented on a change in pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
xuanyuanking commented on a change in pull request #32272:
URL: https://github.com/apache/spark/pull/32272#discussion_r631688986



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.io.File
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.Files
+
+import scala.collection.Seq
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper}
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
+
+/**
+ * Classes to represent metadata of checkpoints saved to DFS. Since this is converted to JSON, any
+ * changes to this MUST be backward-compatible.
+ */
+case class RocksDBCheckpointMetadata(
+    sstFiles: Seq[RocksDBSstFile],
+    logFiles: Seq[RocksDBLogFile],
+    numKeys: Long) {
+  import RocksDBCheckpointMetadata._
+
+  def json: String = {
+    // We turn this field into a null to avoid write a empty logFiles field in the json.
+    val nullified = if (logFiles.isEmpty) this.copy(logFiles = null) else this
+    mapper.writeValueAsString(nullified)
+  }
+
+  def prettyJson: String = Serialization.writePretty(this)(RocksDBCheckpointMetadata.format)

Review comment:
       The only difference is the `logFiles` fields. Actually the `prettyJson` field is for providing a readable string for log. `json` field is for files writing.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32272: [SPARK-35172][SS] The implementation of RocksDBCheckpointMetadata

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32272:
URL: https://github.com/apache/spark/pull/32272#issuecomment-849514744


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/43535/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org