You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2015/04/02 08:41:33 UTC

spark git commit: [SPARK-6627] Some clean-up in shuffle code.

Repository: spark
Updated Branches:
  refs/heads/master 40df5d49b -> 6562787b9


[SPARK-6627] Some clean-up in shuffle code.

Before diving into review #4450 I did a look through the existing shuffle
code to learn how it works. Unfortunately, there are some very
confusing things in this code. This patch makes a few small changes
to simplify things. It is not easily to concisely describe the changes
because of how convoluted the issues were, but they are fairly small
logically:

1. There is a trait named `ShuffleBlockManager` that only deals with
   one logical function which is retrieving shuffle block data given shuffle
   block coordinates. This trait has two implementors FileShuffleBlockManager
   and IndexShuffleBlockManager. Confusingly the vast majority of those
   implementations have nothing to do with this particular functionality.
   So I've renamed the trait to ShuffleBlockResolver and documented it.
2. The aforementioned trait had two almost identical methods, for no good
   reason. I removed one method (getBytes) and modified callers to use the
   other one. I think the behavior is preserved in all cases.
3. The sort shuffle code uses an identifier "0" in the reduce slot of a
   BlockID as a placeholder. I made it into a constant since it needs to
   be consistent across multiple places.

I think for (3) there is actually a better solution that would avoid the
need to do this type of workaround/hack in the first place, but it's more
complex so I'm punting it for now.

Author: Patrick Wendell <pa...@databricks.com>

Closes #5286 from pwendell/cleanup and squashes the following commits:

c71fbc7 [Patrick Wendell] Open interface back up for testing
f36edd5 [Patrick Wendell] Code review feedback
d1c0494 [Patrick Wendell] Style fix
a406079 [Patrick Wendell] [HOTFIX] Some clean-up in shuffle code.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6562787b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6562787b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6562787b

Branch: refs/heads/master
Commit: 6562787b963204763a33e1c4e9d192db913af1fc
Parents: 40df5d4
Author: Patrick Wendell <pa...@databricks.com>
Authored: Wed Apr 1 23:42:09 2015 -0700
Committer: Patrick Wendell <pa...@databricks.com>
Committed: Wed Apr 1 23:42:09 2015 -0700

----------------------------------------------------------------------
 .../spark/shuffle/FileShuffleBlockManager.scala |  7 +---
 .../shuffle/IndexShuffleBlockManager.scala      | 27 +++++++------
 .../spark/shuffle/ShuffleBlockManager.scala     | 37 ------------------
 .../spark/shuffle/ShuffleBlockResolver.scala    | 41 ++++++++++++++++++++
 .../apache/spark/shuffle/ShuffleManager.scala   |  5 ++-
 .../apache/spark/shuffle/ShuffleWriter.scala    |  2 +-
 .../spark/shuffle/hash/HashShuffleManager.scala |  8 ++--
 .../spark/shuffle/sort/SortShuffleManager.scala |  9 +++--
 .../spark/shuffle/sort/SortShuffleWriter.scala  |  6 +--
 .../org/apache/spark/storage/BlockManager.scala | 14 +++----
 .../spark/util/collection/ExternalSorter.scala  |  6 ++-
 .../shuffle/hash/HashShuffleManagerSuite.scala  |  2 +-
 .../apache/spark/tools/StoragePerfTester.scala  |  2 +-
 13 files changed, 83 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6562787b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala
index d0178df..5be3ed7 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala
@@ -67,7 +67,7 @@ private[spark] trait ShuffleWriterGroup {
 // org.apache.spark.network.shuffle.StandaloneShuffleBlockManager#getHashBasedShuffleBlockData().
 private[spark]
 class FileShuffleBlockManager(conf: SparkConf)
-  extends ShuffleBlockManager with Logging {
+  extends ShuffleBlockResolver with Logging {
 
   private val transportConf = SparkTransportConf.fromSparkConf(conf)
 
@@ -175,11 +175,6 @@ class FileShuffleBlockManager(conf: SparkConf)
     }
   }
 
-  override def getBytes(blockId: ShuffleBlockId): Option[ByteBuffer] = {
-    val segment = getBlockData(blockId)
-    Some(segment.nioByteBuffer())
-  }
-
   override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = {
     if (consolidateShuffleFiles) {
       // Search all file groups associated with this shuffle.

http://git-wip-us.apache.org/repos/asf/spark/blob/6562787b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala
index 87fd161..50edb5a 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala
@@ -27,6 +27,8 @@ import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
 import org.apache.spark.network.netty.SparkTransportConf
 import org.apache.spark.storage._
 
+import IndexShuffleBlockManager.NOOP_REDUCE_ID
+
 /**
  * Create and maintain the shuffle blocks' mapping between logic block and physical file location.
  * Data of shuffle blocks from the same map task are stored in a single consolidated data file.
@@ -39,25 +41,18 @@ import org.apache.spark.storage._
 // Note: Changes to the format in this file should be kept in sync with
 // org.apache.spark.network.shuffle.StandaloneShuffleBlockManager#getSortBasedShuffleBlockData().
 private[spark]
-class IndexShuffleBlockManager(conf: SparkConf) extends ShuffleBlockManager {
+class IndexShuffleBlockManager(conf: SparkConf) extends ShuffleBlockResolver {
 
   private lazy val blockManager = SparkEnv.get.blockManager
 
   private val transportConf = SparkTransportConf.fromSparkConf(conf)
 
-  /**
-   * Mapping to a single shuffleBlockId with reduce ID 0.
-   * */
-  def consolidateId(shuffleId: Int, mapId: Int): ShuffleBlockId = {
-    ShuffleBlockId(shuffleId, mapId, 0)
-  }
-
   def getDataFile(shuffleId: Int, mapId: Int): File = {
-    blockManager.diskBlockManager.getFile(ShuffleDataBlockId(shuffleId, mapId, 0))
+    blockManager.diskBlockManager.getFile(ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID))
   }
 
   private def getIndexFile(shuffleId: Int, mapId: Int): File = {
-    blockManager.diskBlockManager.getFile(ShuffleIndexBlockId(shuffleId, mapId, 0))
+    blockManager.diskBlockManager.getFile(ShuffleIndexBlockId(shuffleId, mapId, NOOP_REDUCE_ID))
   }
 
   /**
@@ -97,10 +92,6 @@ class IndexShuffleBlockManager(conf: SparkConf) extends ShuffleBlockManager {
     }
   }
 
-  override def getBytes(blockId: ShuffleBlockId): Option[ByteBuffer] = {
-    Some(getBlockData(blockId).nioByteBuffer())
-  }
-
   override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = {
     // The block is actually going to be a range of a single map output file for this map, so
     // find out the consolidated file, then the offset within that from our index
@@ -123,3 +114,11 @@ class IndexShuffleBlockManager(conf: SparkConf) extends ShuffleBlockManager {
 
   override def stop(): Unit = {}
 }
+
+private[spark] object IndexShuffleBlockManager {
+  // No-op reduce ID used in interactions with disk store and BlockObjectWriter.
+  // The disk store currently expects puts to relate to a (map, reduce) pair, but in the sort
+  // shuffle outputs for several reduces are glommed into a single file.
+  // TODO: Avoid this entirely by having the DiskBlockObjectWriter not require a BlockId.
+  val NOOP_REDUCE_ID = 0
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/6562787b/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockManager.scala
deleted file mode 100644
index b521f0c..0000000
--- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockManager.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.shuffle
-
-import java.nio.ByteBuffer
-import org.apache.spark.network.buffer.ManagedBuffer
-import org.apache.spark.storage.ShuffleBlockId
-
-private[spark]
-trait ShuffleBlockManager {
-  type ShuffleId = Int
-
-  /**
-   * Get shuffle block data managed by the local ShuffleBlockManager.
-   * @return Some(ByteBuffer) if block found, otherwise None.
-   */
-  def getBytes(blockId: ShuffleBlockId): Option[ByteBuffer]
-
-  def getBlockData(blockId: ShuffleBlockId): ManagedBuffer
-
-  def stop(): Unit
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/6562787b/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockResolver.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockResolver.scala
new file mode 100644
index 0000000..4342b0d
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockResolver.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle
+
+import java.nio.ByteBuffer
+import org.apache.spark.network.buffer.ManagedBuffer
+import org.apache.spark.storage.ShuffleBlockId
+
+private[spark]
+/**
+ * Implementers of this trait understand how to retrieve block data for a logical shuffle block
+ * identifier (i.e. map, reduce, and shuffle). Implementations may use files or file segments to
+ * encapsulate shuffle data. This is used by the BlockStore to abstract over different shuffle
+ * implementations when shuffle data is retrieved.
+ */
+trait ShuffleBlockResolver {
+  type ShuffleId = Int
+
+  /**
+   * Retrieve the data for the specified block. If the data for that block is not available,
+   * throws an unspecified exception.
+   */
+  def getBlockData(blockId: ShuffleBlockId): ManagedBuffer
+
+  def stop(): Unit
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/6562787b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala
index a44a8e1..978366d 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala
@@ -55,7 +55,10 @@ private[spark] trait ShuffleManager {
     */
   def unregisterShuffle(shuffleId: Int): Boolean
 
-  def shuffleBlockManager: ShuffleBlockManager
+  /**
+   * Return a resolver capable of retrieving shuffle block data based on block coordinates.
+   */
+  def shuffleBlockResolver: ShuffleBlockResolver
 
   /** Shut down this ShuffleManager. */
   def stop(): Unit

http://git-wip-us.apache.org/repos/asf/spark/blob/6562787b/core/src/main/scala/org/apache/spark/shuffle/ShuffleWriter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleWriter.scala
index b934480..f6e6fe5 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleWriter.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleWriter.scala
@@ -23,7 +23,7 @@ import org.apache.spark.scheduler.MapStatus
  * Obtained inside a map task to write out records to the shuffle system.
  */
 private[spark] trait ShuffleWriter[K, V] {
-  /** Write a bunch of records to this task's output */
+  /** Write a sequence of records to this task's output */
   def write(records: Iterator[_ <: Product2[K, V]]): Unit
 
   /** Close this writer, passing along whether the map completed */

http://git-wip-us.apache.org/repos/asf/spark/blob/6562787b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala
index 62e0629..2a7df8d 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala
@@ -53,20 +53,20 @@ private[spark] class HashShuffleManager(conf: SparkConf) extends ShuffleManager
   override def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: TaskContext)
       : ShuffleWriter[K, V] = {
     new HashShuffleWriter(
-      shuffleBlockManager, handle.asInstanceOf[BaseShuffleHandle[K, V, _]], mapId, context)
+      shuffleBlockResolver, handle.asInstanceOf[BaseShuffleHandle[K, V, _]], mapId, context)
   }
 
   /** Remove a shuffle's metadata from the ShuffleManager. */
   override def unregisterShuffle(shuffleId: Int): Boolean = {
-    shuffleBlockManager.removeShuffle(shuffleId)
+    shuffleBlockResolver.removeShuffle(shuffleId)
   }
 
-  override def shuffleBlockManager: FileShuffleBlockManager = {
+  override def shuffleBlockResolver: FileShuffleBlockManager = {
     fileShuffleBlockManager
   }
 
   /** Shut down this ShuffleManager. */
   override def stop(): Unit = {
-    shuffleBlockManager.stop()
+    shuffleBlockResolver.stop()
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6562787b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
index bda30a5..0497036 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
@@ -58,7 +58,7 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager
     val baseShuffleHandle = handle.asInstanceOf[BaseShuffleHandle[K, V, _]]
     shuffleMapNumber.putIfAbsent(baseShuffleHandle.shuffleId, baseShuffleHandle.numMaps)
     new SortShuffleWriter(
-      shuffleBlockManager, baseShuffleHandle, mapId, context)
+      shuffleBlockResolver, baseShuffleHandle, mapId, context)
   }
 
   /** Remove a shuffle's metadata from the ShuffleManager. */
@@ -66,18 +66,19 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager
     if (shuffleMapNumber.containsKey(shuffleId)) {
       val numMaps = shuffleMapNumber.remove(shuffleId)
       (0 until numMaps).map{ mapId =>
-        shuffleBlockManager.removeDataByMap(shuffleId, mapId)
+        shuffleBlockResolver.removeDataByMap(shuffleId, mapId)
       }
     }
     true
   }
 
-  override def shuffleBlockManager: IndexShuffleBlockManager = {
+  override def shuffleBlockResolver: IndexShuffleBlockManager = {
     indexShuffleBlockManager
   }
 
   /** Shut down this ShuffleManager. */
   override def stop(): Unit = {
-    shuffleBlockManager.stop()
+    shuffleBlockResolver.stop()
   }
 }
+

http://git-wip-us.apache.org/repos/asf/spark/blob/6562787b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
index 55ea0f1..a066435 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
@@ -58,8 +58,7 @@ private[spark] class SortShuffleWriter[K, V, C](
       // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
       // care whether the keys get sorted in each partition; that will be done on the reduce side
       // if the operation being run is sortByKey.
-      sorter = new ExternalSorter[K, V, V](
-        None, Some(dep.partitioner), None, dep.serializer)
+      sorter = new ExternalSorter[K, V, V](None, Some(dep.partitioner), None, dep.serializer)
       sorter.insertAll(records)
     }
 
@@ -67,7 +66,7 @@ private[spark] class SortShuffleWriter[K, V, C](
     // because it just opens a single file, so is typically too fast to measure accurately
     // (see SPARK-3570).
     val outputFile = shuffleBlockManager.getDataFile(dep.shuffleId, mapId)
-    val blockId = shuffleBlockManager.consolidateId(dep.shuffleId, mapId)
+    val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockManager.NOOP_REDUCE_ID)
     val partitionLengths = sorter.writePartitionedFile(blockId, context, outputFile)
     shuffleBlockManager.writeIndexFile(dep.shuffleId, mapId, partitionLengths)
 
@@ -100,3 +99,4 @@ private[spark] class SortShuffleWriter[K, V, C](
     }
   }
 }
+

http://git-wip-us.apache.org/repos/asf/spark/blob/6562787b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 1dff09a..fc31296 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -301,7 +301,7 @@ private[spark] class BlockManager(
    */
   override def getBlockData(blockId: BlockId): ManagedBuffer = {
     if (blockId.isShuffle) {
-      shuffleManager.shuffleBlockManager.getBlockData(blockId.asInstanceOf[ShuffleBlockId])
+      shuffleManager.shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId])
     } else {
       val blockBytesOpt = doGetLocal(blockId, asBlockResult = false)
         .asInstanceOf[Option[ByteBuffer]]
@@ -439,14 +439,10 @@ private[spark] class BlockManager(
     // As an optimization for map output fetches, if the block is for a shuffle, return it
     // without acquiring a lock; the disk store never deletes (recent) items so this should work
     if (blockId.isShuffle) {
-      val shuffleBlockManager = shuffleManager.shuffleBlockManager
-      shuffleBlockManager.getBytes(blockId.asInstanceOf[ShuffleBlockId]) match {
-        case Some(bytes) =>
-          Some(bytes)
-        case None =>
-          throw new BlockException(
-            blockId, s"Block $blockId not found on disk, though it should be")
-      }
+      val shuffleBlockManager = shuffleManager.shuffleBlockResolver
+      // TODO: This should gracefully handle case where local block is not available. Currently
+      // downstream code will throw an exception.
+      Option(shuffleBlockManager.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer())
     } else {
       doGetLocal(blockId, asBlockResult = false).asInstanceOf[Option[ByteBuffer]]
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/6562787b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index b962c10..7bd3c78 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -664,6 +664,8 @@ private[spark] class ExternalSorter[K, V, C](
   }
 
   /**
+   * Exposed for testing purposes.
+   *
    * Return an iterator over all the data written to this object, grouped by partition and
    * aggregated by the requested aggregator. For each partition we then have an iterator over its
    * contents, and these are expected to be accessed in order (you can't "skip ahead" to one
@@ -673,7 +675,7 @@ private[spark] class ExternalSorter[K, V, C](
    * For now, we just merge all the spilled files in once pass, but this can be modified to
    * support hierarchical merging.
    */
-  def partitionedIterator: Iterator[(Int, Iterator[Product2[K, C]])] = {
+   def partitionedIterator: Iterator[(Int, Iterator[Product2[K, C]])] = {
     val usingMap = aggregator.isDefined
     val collection: SizeTrackingPairCollection[(Int, K), C] = if (usingMap) map else buffer
     if (spills.isEmpty && partitionWriters == null) {
@@ -781,7 +783,7 @@ private[spark] class ExternalSorter[K, V, C](
   /**
    * Read a partition file back as an iterator (used in our iterator method)
    */
-  def readPartitionFile(writer: BlockObjectWriter): Iterator[Product2[K, C]] = {
+  private def readPartitionFile(writer: BlockObjectWriter): Iterator[Product2[K, C]] = {
     if (writer.isOpen) {
       writer.commitAndClose()
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/6562787b/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala
index 6790388..b834dc0 100644
--- a/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala
@@ -54,7 +54,7 @@ class HashShuffleManagerSuite extends FunSuite with LocalSparkContext {
     sc = new SparkContext("local", "test", conf)
 
     val shuffleBlockManager =
-      SparkEnv.get.shuffleManager.shuffleBlockManager.asInstanceOf[FileShuffleBlockManager]
+      SparkEnv.get.shuffleManager.shuffleBlockResolver.asInstanceOf[FileShuffleBlockManager]
 
     val shuffle1 = shuffleBlockManager.forMapTask(1, 1, 1, new JavaSerializer(conf),
       new ShuffleWriteMetrics)

http://git-wip-us.apache.org/repos/asf/spark/blob/6562787b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
index 15ee950..6b666a0 100644
--- a/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
+++ b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
@@ -59,7 +59,7 @@ object StoragePerfTester {
     val hashShuffleManager = sc.env.shuffleManager.asInstanceOf[HashShuffleManager]
 
     def writeOutputBytes(mapId: Int, total: AtomicLong) = {
-      val shuffle = hashShuffleManager.shuffleBlockManager.forMapTask(1, mapId, numOutputSplits,
+      val shuffle = hashShuffleManager.shuffleBlockResolver.forMapTask(1, mapId, numOutputSplits,
         new KryoSerializer(sc.conf), new ShuffleWriteMetrics())
       val writers = shuffle.writers
       for (i <- 1 to recordsPerMap) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org