You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/09/24 23:18:36 UTC

spark git commit: [SPARK-10761] Refactor DiskBlockObjectWriter to not require BlockId

Repository: spark
Updated Branches:
  refs/heads/master b3862d3c5 -> 8023242e7


[SPARK-10761] Refactor DiskBlockObjectWriter to not require BlockId

The DiskBlockObjectWriter constructor took a BlockId parameter but never used it. As part of some general cleanup in these interfaces, this patch refactors its constructor to eliminate this parameter.

Author: Josh Rosen <jo...@databricks.com>

Closes #8871 from JoshRosen/disk-block-object-writer-blockid-cleanup.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8023242e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8023242e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8023242e

Branch: refs/heads/master
Commit: 8023242e77e4a799de6edc490078285684549b6d
Parents: b3862d3
Author: Josh Rosen <jo...@databricks.com>
Authored: Thu Sep 24 14:18:33 2015 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Thu Sep 24 14:18:33 2015 -0700

----------------------------------------------------------------------
 .../sort/BypassMergeSortShuffleWriter.java      |  9 +++---
 .../shuffle/IndexShuffleBlockResolver.scala     |  3 +-
 .../org/apache/spark/storage/BlockManager.scala |  2 +-
 .../spark/storage/DiskBlockObjectWriter.scala   |  7 +++--
 .../unsafe/UnsafeShuffleWriterSuite.java        |  1 -
 .../unsafe/sort/UnsafeExternalSorterSuite.java  |  1 -
 .../BypassMergeSortShuffleWriterSuite.scala     |  1 -
 .../storage/DiskBlockObjectWriterSuite.scala    | 33 ++++++++++----------
 8 files changed, 27 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8023242e/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
index 0b8b604..f5d80bb 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
@@ -151,7 +151,7 @@ final class BypassMergeSortShuffleWriter<K, V> implements SortShuffleFileWriter<
         } finally {
           Closeables.close(in, copyThrewException);
         }
-        if (!blockManager.diskBlockManager().getFile(partitionWriters[i].blockId()).delete()) {
+        if (!partitionWriters[i].fileSegment().file().delete()) {
           logger.error("Unable to delete file for partition {}", i);
         }
       }
@@ -168,12 +168,11 @@ final class BypassMergeSortShuffleWriter<K, V> implements SortShuffleFileWriter<
   public void stop() throws IOException {
     if (partitionWriters != null) {
       try {
-        final DiskBlockManager diskBlockManager = blockManager.diskBlockManager();
         for (DiskBlockObjectWriter writer : partitionWriters) {
           // This method explicitly does _not_ throw exceptions:
-          writer.revertPartialWritesAndClose();
-          if (!diskBlockManager.getFile(writer.blockId()).delete()) {
-            logger.error("Error while deleting file for block {}", writer.blockId());
+          File file = writer.revertPartialWritesAndClose();
+          if (!file.delete()) {
+            logger.error("Error while deleting file {}", file.getAbsolutePath());
           }
         }
       } finally {

http://git-wip-us.apache.org/repos/asf/spark/blob/8023242e/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
index 65887d1..5e4c2b5 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
@@ -119,9 +119,8 @@ private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleB
 }
 
 private[spark] object IndexShuffleBlockResolver {
-  // No-op reduce ID used in interactions with disk store and DiskBlockObjectWriter.
+  // No-op reduce ID used in interactions with disk store.
   // The disk store currently expects puts to relate to a (map, reduce) pair, but in the sort
   // shuffle outputs for several reduces are glommed into a single file.
-  // TODO: Avoid this entirely by having the DiskBlockObjectWriter not require a BlockId.
   val NOOP_REDUCE_ID = 0
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/8023242e/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index bca3942..47bd2ef 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -669,7 +669,7 @@ private[spark] class BlockManager(
       writeMetrics: ShuffleWriteMetrics): DiskBlockObjectWriter = {
     val compressStream: OutputStream => OutputStream = wrapForCompression(blockId, _)
     val syncWrites = conf.getBoolean("spark.shuffle.sync", false)
-    new DiskBlockObjectWriter(blockId, file, serializerInstance, bufferSize, compressStream,
+    new DiskBlockObjectWriter(file, serializerInstance, bufferSize, compressStream,
       syncWrites, writeMetrics)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/8023242e/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
index 49d9154..80d426f 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
@@ -34,7 +34,6 @@ import org.apache.spark.util.Utils
  * reopened again.
  */
 private[spark] class DiskBlockObjectWriter(
-    val blockId: BlockId,
     file: File,
     serializerInstance: SerializerInstance,
     bufferSize: Int,
@@ -144,8 +143,10 @@ private[spark] class DiskBlockObjectWriter(
    * Reverts writes that haven't been flushed yet. Callers should invoke this function
    * when there are runtime exceptions. This method will not throw, though it may be
    * unsuccessful in truncating written data.
+   *
+   * @return the file that this DiskBlockObjectWriter wrote to.
    */
-  def revertPartialWritesAndClose() {
+  def revertPartialWritesAndClose(): File = {
     // Discard current writes. We do this by flushing the outstanding writes and then
     // truncating the file to its initial position.
     try {
@@ -160,12 +161,14 @@ private[spark] class DiskBlockObjectWriter(
       val truncateStream = new FileOutputStream(file, true)
       try {
         truncateStream.getChannel.truncate(initialPosition)
+        file
       } finally {
         truncateStream.close()
       }
     } catch {
       case e: Exception =>
         logError("Uncaught exception while reverting partial writes to file " + file, e)
+        file
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/8023242e/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
index a266b0c..d218344 100644
--- a/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
+++ b/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
@@ -129,7 +129,6 @@ public class UnsafeShuffleWriterSuite {
         Object[] args = invocationOnMock.getArguments();
 
         return new DiskBlockObjectWriter(
-          (BlockId) args[0],
           (File) args[1],
           (SerializerInstance) args[2],
           (Integer) args[3],

http://git-wip-us.apache.org/repos/asf/spark/blob/8023242e/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
index 445a37b..a5bbaa9 100644
--- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
+++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
@@ -127,7 +127,6 @@ public class UnsafeExternalSorterSuite {
         Object[] args = invocationOnMock.getArguments();
 
         return new DiskBlockObjectWriter(
-          (BlockId) args[0],
           (File) args[1],
           (SerializerInstance) args[2],
           (Integer) args[3],

http://git-wip-us.apache.org/repos/asf/spark/blob/8023242e/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
index cc7342f..341f56d 100644
--- a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
@@ -72,7 +72,6 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte
       override def answer(invocation: InvocationOnMock): DiskBlockObjectWriter = {
         val args = invocation.getArguments
         new DiskBlockObjectWriter(
-          args(0).asInstanceOf[BlockId],
           args(1).asInstanceOf[File],
           args(2).asInstanceOf[SerializerInstance],
           args(3).asInstanceOf[Int],

http://git-wip-us.apache.org/repos/asf/spark/blob/8023242e/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala
index 66af6e1..7c19531 100644
--- a/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala
@@ -20,7 +20,6 @@ import java.io.File
 
 import org.scalatest.BeforeAndAfterEach
 
-import org.apache.spark.SparkConf
 import org.apache.spark.{SparkConf, SparkFunSuite}
 import org.apache.spark.executor.ShuffleWriteMetrics
 import org.apache.spark.serializer.JavaSerializer
@@ -41,8 +40,8 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach {
   test("verify write metrics") {
     val file = new File(tempDir, "somefile")
     val writeMetrics = new ShuffleWriteMetrics()
-    val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file,
-      new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics)
+    val writer = new DiskBlockObjectWriter(
+      file, new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics)
 
     writer.write(Long.box(20), Long.box(30))
     // Record metrics update on every write
@@ -63,8 +62,8 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach {
   test("verify write metrics on revert") {
     val file = new File(tempDir, "somefile")
     val writeMetrics = new ShuffleWriteMetrics()
-    val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file,
-      new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics)
+    val writer = new DiskBlockObjectWriter(
+      file, new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics)
 
     writer.write(Long.box(20), Long.box(30))
     // Record metrics update on every write
@@ -86,8 +85,8 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach {
   test("Reopening a closed block writer") {
     val file = new File(tempDir, "somefile")
     val writeMetrics = new ShuffleWriteMetrics()
-    val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file,
-      new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics)
+    val writer = new DiskBlockObjectWriter(
+      file, new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics)
 
     writer.open()
     writer.close()
@@ -99,8 +98,8 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach {
   test("calling revertPartialWritesAndClose() on a closed block writer should have no effect") {
     val file = new File(tempDir, "somefile")
     val writeMetrics = new ShuffleWriteMetrics()
-    val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file,
-      new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics)
+    val writer = new DiskBlockObjectWriter(
+      file, new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics)
     for (i <- 1 to 1000) {
       writer.write(i, i)
     }
@@ -115,8 +114,8 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach {
   test("commitAndClose() should be idempotent") {
     val file = new File(tempDir, "somefile")
     val writeMetrics = new ShuffleWriteMetrics()
-    val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file,
-      new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics)
+    val writer = new DiskBlockObjectWriter(
+      file, new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics)
     for (i <- 1 to 1000) {
       writer.write(i, i)
     }
@@ -133,8 +132,8 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach {
   test("revertPartialWritesAndClose() should be idempotent") {
     val file = new File(tempDir, "somefile")
     val writeMetrics = new ShuffleWriteMetrics()
-    val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file,
-      new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics)
+    val writer = new DiskBlockObjectWriter(
+      file, new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics)
     for (i <- 1 to 1000) {
       writer.write(i, i)
     }
@@ -151,8 +150,8 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach {
   test("fileSegment() can only be called after commitAndClose() has been called") {
     val file = new File(tempDir, "somefile")
     val writeMetrics = new ShuffleWriteMetrics()
-    val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file,
-      new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics)
+    val writer = new DiskBlockObjectWriter(
+      file, new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics)
     for (i <- 1 to 1000) {
       writer.write(i, i)
     }
@@ -165,8 +164,8 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite with BeforeAndAfterEach {
   test("commitAndClose() without ever opening or writing") {
     val file = new File(tempDir, "somefile")
     val writeMetrics = new ShuffleWriteMetrics()
-    val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file,
-      new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics)
+    val writer = new DiskBlockObjectWriter(
+      file, new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics)
     writer.commitAndClose()
     assert(writer.fileSegment().length === 0)
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org