You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/10/25 21:15:49 UTC

spark git commit: [SPARK-22227][CORE] DiskBlockManager.getAllBlocks now tolerates temp files

Repository: spark
Updated Branches:
  refs/heads/master d212ef14b -> b377ef133


[SPARK-22227][CORE] DiskBlockManager.getAllBlocks now tolerates temp files

## What changes were proposed in this pull request?

Prior to this commit getAllBlocks implicitly assumed that the directories
managed by the DiskBlockManager contain only the files corresponding to
valid block IDs. In reality, this assumption was violated during shuffle,
which produces temporary files in the same directory as the resulting
blocks. As a result, calls to getAllBlocks during shuffle were unreliable.

The fix could be made more efficient, but this is probably good enough.

## How was this patch tested?

`DiskBlockManagerSuite`

Author: Sergei Lebedev <s....@criteo.com>

Closes #19458 from superbobry/block-id-option.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b377ef13
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b377ef13
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b377ef13

Branch: refs/heads/master
Commit: b377ef133cdc38d49b460b2cc6ece0b5892804cc
Parents: d212ef1
Author: Sergei Lebedev <s....@criteo.com>
Authored: Wed Oct 25 22:15:44 2017 +0100
Committer: Wenchen Fan <we...@databricks.com>
Committed: Wed Oct 25 22:15:44 2017 +0100

----------------------------------------------------------------------
 .../scala/org/apache/spark/storage/BlockId.scala    | 16 +++++++++++++---
 .../org/apache/spark/storage/DiskBlockManager.scala | 11 ++++++++++-
 .../org/apache/spark/storage/BlockIdSuite.scala     |  9 +++------
 .../spark/storage/DiskBlockManagerSuite.scala       |  7 +++++++
 4 files changed, 33 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b377ef13/core/src/main/scala/org/apache/spark/storage/BlockId.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockId.scala b/core/src/main/scala/org/apache/spark/storage/BlockId.scala
index a441bae..7ac2c71 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockId.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockId.scala
@@ -19,6 +19,7 @@ package org.apache.spark.storage
 
 import java.util.UUID
 
+import org.apache.spark.SparkException
 import org.apache.spark.annotation.DeveloperApi
 
 /**
@@ -96,6 +97,10 @@ private[spark] case class TestBlockId(id: String) extends BlockId {
 }
 
 @DeveloperApi
+class UnrecognizedBlockId(name: String)
+    extends SparkException(s"Failed to parse $name into a block ID")
+
+@DeveloperApi
 object BlockId {
   val RDD = "rdd_([0-9]+)_([0-9]+)".r
   val SHUFFLE = "shuffle_([0-9]+)_([0-9]+)_([0-9]+)".r
@@ -104,10 +109,11 @@ object BlockId {
   val BROADCAST = "broadcast_([0-9]+)([_A-Za-z0-9]*)".r
   val TASKRESULT = "taskresult_([0-9]+)".r
   val STREAM = "input-([0-9]+)-([0-9]+)".r
+  val TEMP_LOCAL = "temp_local_([-A-Fa-f0-9]+)".r
+  val TEMP_SHUFFLE = "temp_shuffle_([-A-Fa-f0-9]+)".r
   val TEST = "test_(.*)".r
 
-  /** Converts a BlockId "name" String back into a BlockId. */
-  def apply(id: String): BlockId = id match {
+  def apply(name: String): BlockId = name match {
     case RDD(rddId, splitIndex) =>
       RDDBlockId(rddId.toInt, splitIndex.toInt)
     case SHUFFLE(shuffleId, mapId, reduceId) =>
@@ -122,9 +128,13 @@ object BlockId {
       TaskResultBlockId(taskId.toLong)
     case STREAM(streamId, uniqueId) =>
       StreamBlockId(streamId.toInt, uniqueId.toLong)
+    case TEMP_LOCAL(uuid) =>
+      TempLocalBlockId(UUID.fromString(uuid))
+    case TEMP_SHUFFLE(uuid) =>
+      TempShuffleBlockId(UUID.fromString(uuid))
     case TEST(value) =>
       TestBlockId(value)
     case _ =>
-      throw new IllegalStateException("Unrecognized BlockId: " + id)
+      throw new UnrecognizedBlockId(name)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/b377ef13/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index 3d43e3c..a69bcc9 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -100,7 +100,16 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea
 
   /** List all the blocks currently stored on disk by the disk manager. */
   def getAllBlocks(): Seq[BlockId] = {
-    getAllFiles().map(f => BlockId(f.getName))
+    getAllFiles().flatMap { f =>
+      try {
+        Some(BlockId(f.getName))
+      } catch {
+        case _: UnrecognizedBlockId =>
+          // Skip files which do not correspond to blocks, for example temporary
+          // files created by [[SortShuffleWriter]].
+          None
+      }
+    }
   }
 
   /** Produces a unique block id and File suitable for storing local intermediate results. */

http://git-wip-us.apache.org/repos/asf/spark/blob/b377ef13/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala
index f0c521b..ff47558 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala
@@ -35,13 +35,8 @@ class BlockIdSuite extends SparkFunSuite {
   }
 
   test("test-bad-deserialization") {
-    try {
-      // Try to deserialize an invalid block id.
+    intercept[UnrecognizedBlockId] {
       BlockId("myblock")
-      fail()
-    } catch {
-      case e: IllegalStateException => // OK
-      case _: Throwable => fail()
     }
   }
 
@@ -139,6 +134,7 @@ class BlockIdSuite extends SparkFunSuite {
     assert(id.id.getMostSignificantBits() === 5)
     assert(id.id.getLeastSignificantBits() === 2)
     assert(!id.isShuffle)
+    assertSame(id, BlockId(id.toString))
   }
 
   test("temp shuffle") {
@@ -151,6 +147,7 @@ class BlockIdSuite extends SparkFunSuite {
     assert(id.id.getMostSignificantBits() === 1)
     assert(id.id.getLeastSignificantBits() === 2)
     assert(!id.isShuffle)
+    assertSame(id, BlockId(id.toString))
   }
 
   test("test") {

http://git-wip-us.apache.org/repos/asf/spark/blob/b377ef13/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
index 7859b0b..0c4f3c4 100644
--- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.storage
 
 import java.io.{File, FileWriter}
+import java.util.UUID
 
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
 
@@ -79,6 +80,12 @@ class DiskBlockManagerSuite extends SparkFunSuite with BeforeAndAfterEach with B
     assert(diskBlockManager.getAllBlocks.toSet === ids.toSet)
   }
 
+  test("SPARK-22227: non-block files are skipped") {
+    val file = diskBlockManager.getFile("unmanaged_file")
+    writeToFile(file, 10)
+    assert(diskBlockManager.getAllBlocks().isEmpty)
+  }
+
   def writeToFile(file: File, numBytes: Int) {
     val writer = new FileWriter(file, true)
     for (i <- 0 until numBytes) writer.write(i)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org