You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2016/11/29 20:37:56 UTC

spark git commit: [SPARK-18498][SQL] Revise HDFSMetadataLog API for better testing

Repository: spark
Updated Branches:
  refs/heads/master 95f798501 -> f643fe47f


[SPARK-18498][SQL] Revise HDFSMetadataLog API for better testing

Revise HDFSMetadataLog API such that metadata object serialization and final batch file write are separated. This will allow serialization checks without worrying about batch file name formats. marmbrus zsxwing

Existing tests already ensure this API faithfully support core functionality i.e., creation of batch files.

Author: Tyson Condie <tc...@gmail.com>

Closes #15924 from tcondie/SPARK-18498.

Signed-off-by: Michael Armbrust <mi...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f643fe47
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f643fe47
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f643fe47

Branch: refs/heads/master
Commit: f643fe47f4889faf68da3da8d7850ee48df7c22f
Parents: 95f7985
Author: Tyson Condie <tc...@gmail.com>
Authored: Tue Nov 29 12:36:41 2016 -0800
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Tue Nov 29 12:37:36 2016 -0800

----------------------------------------------------------------------
 .../execution/streaming/HDFSMetadataLog.scala   | 100 ++++++++++++-------
 1 file changed, 66 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f643fe47/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
index d95ec7f..1b41352 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -138,14 +138,7 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
     }
   }
 
-  /**
-   * Write a batch to a temp file then rename it to the batch file.
-   *
-   * There may be multiple [[HDFSMetadataLog]] using the same metadata path. Although it is not a
-   * valid behavior, we still need to prevent it from destroying the files.
-   */
-  private def writeBatch(batchId: Long, metadata: T, writer: (T, OutputStream) => Unit): Unit = {
-    // Use nextId to create a temp file
+  def writeTempBatch(metadata: T, writer: (T, OutputStream) => Unit = serialize): Option[Path] = {
     var nextId = 0
     while (true) {
       val tempPath = new Path(metadataPath, s".${UUID.randomUUID.toString}.tmp")
@@ -153,33 +146,10 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
         val output = fileManager.create(tempPath)
         try {
           writer(metadata, output)
+          return Some(tempPath)
         } finally {
           IOUtils.closeQuietly(output)
         }
-        try {
-          // Try to commit the batch
-          // It will fail if there is an existing file (someone has committed the batch)
-          logDebug(s"Attempting to write log #${batchIdToPath(batchId)}")
-          fileManager.rename(tempPath, batchIdToPath(batchId))
-
-          // SPARK-17475: HDFSMetadataLog should not leak CRC files
-          // If the underlying filesystem didn't rename the CRC file, delete it.
-          val crcPath = new Path(tempPath.getParent(), s".${tempPath.getName()}.crc")
-          if (fileManager.exists(crcPath)) fileManager.delete(crcPath)
-          return
-        } catch {
-          case e: IOException if isFileAlreadyExistsException(e) =>
-            // If "rename" fails, it means some other "HDFSMetadataLog" has committed the batch.
-            // So throw an exception to tell the user this is not a valid behavior.
-            throw new ConcurrentModificationException(
-              s"Multiple HDFSMetadataLog are using $path", e)
-          case e: FileNotFoundException =>
-            // Sometimes, "create" will succeed when multiple writers are calling it at the same
-            // time. However, only one writer can call "rename" successfully, others will get
-            // FileNotFoundException because the first writer has removed it.
-            throw new ConcurrentModificationException(
-              s"Multiple HDFSMetadataLog are using $path", e)
-        }
       } catch {
         case e: IOException if isFileAlreadyExistsException(e) =>
           // Failed to create "tempPath". There are two cases:
@@ -195,10 +165,45 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
           // metadata path. In addition, the old Streaming also have this issue, people can create
           // malicious checkpoint files to crash a Streaming application too.
           nextId += 1
-      } finally {
-        fileManager.delete(tempPath)
       }
     }
+    None
+  }
+
+  /**
+   * Write a batch to a temp file then rename it to the batch file.
+   *
+   * There may be multiple [[HDFSMetadataLog]] using the same metadata path. Although it is not a
+   * valid behavior, we still need to prevent it from destroying the files.
+   */
+  private def writeBatch(batchId: Long, metadata: T, writer: (T, OutputStream) => Unit): Unit = {
+    val tempPath = writeTempBatch(metadata, writer).getOrElse(
+      throw new IllegalStateException(s"Unable to create temp batch file $batchId"))
+    try {
+      // Try to commit the batch
+      // It will fail if there is an existing file (someone has committed the batch)
+      logDebug(s"Attempting to write log #${batchIdToPath(batchId)}")
+      fileManager.rename(tempPath, batchIdToPath(batchId))
+
+      // SPARK-17475: HDFSMetadataLog should not leak CRC files
+      // If the underlying filesystem didn't rename the CRC file, delete it.
+      val crcPath = new Path(tempPath.getParent(), s".${tempPath.getName()}.crc")
+      if (fileManager.exists(crcPath)) fileManager.delete(crcPath)
+    } catch {
+      case e: IOException if isFileAlreadyExistsException(e) =>
+        // If "rename" fails, it means some other "HDFSMetadataLog" has committed the batch.
+        // So throw an exception to tell the user this is not a valid behavior.
+        throw new ConcurrentModificationException(
+          s"Multiple HDFSMetadataLog are using $path", e)
+      case e: FileNotFoundException =>
+        // Sometimes, "create" will succeed when multiple writers are calling it at the same
+        // time. However, only one writer can call "rename" successfully, others will get
+        // FileNotFoundException because the first writer has removed it.
+        throw new ConcurrentModificationException(
+          s"Multiple HDFSMetadataLog are using $path", e)
+    } finally {
+      fileManager.delete(tempPath)
+    }
   }
 
   private def isFileAlreadyExistsException(e: IOException): Boolean = {
@@ -208,6 +213,22 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
       (e.getMessage != null && e.getMessage.startsWith("File already exists: "))
   }
 
+  /**
+   * @return the deserialized metadata in a batch file, or None if file not exist.
+   * @throws IllegalArgumentException when path does not point to a batch file.
+   */
+  def get(batchFile: Path): Option[T] = {
+    if (fileManager.exists(batchFile)) {
+      if (isBatchFile(batchFile)) {
+        get(pathToBatchId(batchFile))
+      } else {
+        throw new IllegalArgumentException(s"File ${batchFile} is not a batch file!")
+      }
+    } else {
+      None
+    }
+  }
+
   override def get(batchId: Long): Option[T] = {
     val batchMetadataFile = batchIdToPath(batchId)
     if (fileManager.exists(batchMetadataFile)) {
@@ -251,6 +272,17 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
   }
 
   /**
+   * Get an array of [FileStatus] referencing batch files.
+   * The array is sorted by most recent batch file first to
+   * oldest batch file.
+   */
+  def getOrderedBatchFiles(): Array[FileStatus] = {
+    fileManager.list(metadataPath, batchFilesFilter)
+      .sortBy(f => pathToBatchId(f.getPath))
+      .reverse
+  }
+
+  /**
    * Removes all the log entry earlier than thresholdBatchId (exclusive).
    */
   override def purge(thresholdBatchId: Long): Unit = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org