You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/04/19 04:30:04 UTC

spark git commit: [SPARK-14667] Remove HashShuffleManager

Repository: spark
Updated Branches:
  refs/heads/master 4b3d1294a -> 5e92583d3


[SPARK-14667] Remove HashShuffleManager

## What changes were proposed in this pull request?
The sort shuffle manager has been the default since Spark 1.2. It is time to remove the old hash shuffle manager.

## How was this patch tested?
Removed some tests related to the old manager.

Author: Reynold Xin <rx...@databricks.com>

Closes #12423 from rxin/SPARK-14667.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5e92583d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5e92583d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5e92583d

Branch: refs/heads/master
Commit: 5e92583d38e11d39deb429a39725443111205a4a
Parents: 4b3d129
Author: Reynold Xin <rx...@databricks.com>
Authored: Mon Apr 18 19:30:00 2016 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Mon Apr 18 19:30:00 2016 -0700

----------------------------------------------------------------------
 .../shuffle/ExternalShuffleBlockResolver.java   |  13 +-
 .../shuffle/protocol/ExecutorShuffleInfo.java   |   2 +-
 .../ExternalShuffleBlockResolverSuite.java      |  33 +----
 .../shuffle/ExternalShuffleCleanupSuite.java    |   3 -
 .../ExternalShuffleIntegrationSuite.java        |  38 +----
 .../network/shuffle/TestShuffleDataContext.java |  11 +-
 .../main/scala/org/apache/spark/SparkEnv.scala  |   5 +-
 .../shuffle/FileShuffleBlockResolver.scala      | 136 ------------------
 .../spark/shuffle/hash/HashShuffleManager.scala |  81 -----------
 .../spark/shuffle/hash/HashShuffleWriter.scala  | 141 -------------------
 .../org/apache/spark/ContextCleanerSuite.scala  |  81 +----------
 .../org/apache/spark/HashShuffleSuite.scala     |  30 ----
 .../storage/BlockManagerReplicationSuite.scala  |   4 +-
 .../spark/storage/BlockManagerSuite.scala       |   4 +-
 .../collection/ExternalAppendOnlyMapSuite.scala |   2 -
 docs/configuration.md                           |   9 --
 .../execution/exchange/ShuffleExchange.scala    |   4 -
 .../streaming/ReceivedBlockHandlerSuite.scala   |   4 +-
 18 files changed, 19 insertions(+), 582 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5e92583d/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
index ce5c68e..3071201 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
@@ -49,7 +49,7 @@ import org.apache.spark.network.util.TransportConf;
  * Manages converting shuffle BlockIds into physical segments of local files, from a process outside
  * of Executors. Each Executor must register its own configuration about where it stores its files
  * (local dirs) and how (shuffle manager). The logic for retrieval of individual files is replicated
- * from Spark's FileShuffleBlockResolver and IndexShuffleBlockResolver.
+ * from Spark's IndexShuffleBlockResolver.
  */
 public class ExternalShuffleBlockResolver {
   private static final Logger logger = LoggerFactory.getLogger(ExternalShuffleBlockResolver.class);
@@ -185,8 +185,6 @@ public class ExternalShuffleBlockResolver {
 
     if ("sort".equals(executor.shuffleManager) || "tungsten-sort".equals(executor.shuffleManager)) {
       return getSortBasedShuffleBlockData(executor, shuffleId, mapId, reduceId);
-    } else if ("hash".equals(executor.shuffleManager)) {
-      return getHashBasedShuffleBlockData(executor, blockId);
     } else {
       throw new UnsupportedOperationException(
         "Unsupported shuffle manager: " + executor.shuffleManager);
@@ -251,15 +249,6 @@ public class ExternalShuffleBlockResolver {
   }
 
   /**
-   * Hash-based shuffle data is simply stored as one file per block.
-   * This logic is from FileShuffleBlockResolver.
-   */
-  private ManagedBuffer getHashBasedShuffleBlockData(ExecutorShuffleInfo executor, String blockId) {
-    File shuffleFile = getFile(executor.localDirs, executor.subDirsPerLocalDir, blockId);
-    return new FileSegmentManagedBuffer(conf, shuffleFile, 0, shuffleFile.length());
-  }
-
-  /**
    * Sort-based shuffle data uses an index called "shuffle_ShuffleId_MapId_0.index" into a data file
    * called "shuffle_ShuffleId_MapId_0.data". This logic is from IndexShuffleBlockResolver,
    * and the block id format is from ShuffleDataBlockId and ShuffleIndexBlockId.

http://git-wip-us.apache.org/repos/asf/spark/blob/5e92583d/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ExecutorShuffleInfo.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ExecutorShuffleInfo.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ExecutorShuffleInfo.java
index 102d4ef..93758bd 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ExecutorShuffleInfo.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ExecutorShuffleInfo.java
@@ -33,7 +33,7 @@ public class ExecutorShuffleInfo implements Encodable {
   public final String[] localDirs;
   /** Number of subdirectories created within each localDir. */
   public final int subDirsPerLocalDir;
-  /** Shuffle manager (SortShuffleManager or HashShuffleManager) that the executor is using. */
+  /** Shuffle manager (SortShuffleManager) that the executor is using. */
   public final String shuffleManager;
 
   @JsonCreator

http://git-wip-us.apache.org/repos/asf/spark/blob/5e92583d/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java
index d9b5f02..de4840a 100644
--- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java
@@ -38,9 +38,6 @@ public class ExternalShuffleBlockResolverSuite {
   private static final String sortBlock0 = "Hello!";
   private static final String sortBlock1 = "World!";
 
-  private static final String hashBlock0 = "Elementary";
-  private static final String hashBlock1 = "Tabular";
-
   private static TestShuffleDataContext dataContext;
 
   private static final TransportConf conf =
@@ -51,13 +48,10 @@ public class ExternalShuffleBlockResolverSuite {
     dataContext = new TestShuffleDataContext(2, 5);
 
     dataContext.create();
-    // Write some sort and hash data.
+    // Write some sort data.
     dataContext.insertSortShuffleData(0, 0, new byte[][] {
         sortBlock0.getBytes(StandardCharsets.UTF_8),
         sortBlock1.getBytes(StandardCharsets.UTF_8)});
-    dataContext.insertHashShuffleData(1, 0, new byte[][] {
-        hashBlock0.getBytes(StandardCharsets.UTF_8),
-        hashBlock1.getBytes(StandardCharsets.UTF_8)});
   }
 
   @AfterClass
@@ -118,27 +112,6 @@ public class ExternalShuffleBlockResolverSuite {
   }
 
   @Test
-  public void testHashShuffleBlocks() throws IOException {
-    ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf, null);
-    resolver.registerExecutor("app0", "exec0",
-      dataContext.createExecutorInfo("hash"));
-
-    InputStream block0Stream =
-      resolver.getBlockData("app0", "exec0", "shuffle_1_0_0").createInputStream();
-    String block0 = CharStreams.toString(
-        new InputStreamReader(block0Stream, StandardCharsets.UTF_8));
-    block0Stream.close();
-    assertEquals(hashBlock0, block0);
-
-    InputStream block1Stream =
-      resolver.getBlockData("app0", "exec0", "shuffle_1_0_1").createInputStream();
-    String block1 = CharStreams.toString(
-        new InputStreamReader(block1Stream, StandardCharsets.UTF_8));
-    block1Stream.close();
-    assertEquals(hashBlock1, block1);
-  }
-
-  @Test
   public void jsonSerializationOfExecutorRegistration() throws IOException {
     ObjectMapper mapper = new ObjectMapper();
     AppExecId appId = new AppExecId("foo", "bar");
@@ -147,7 +120,7 @@ public class ExternalShuffleBlockResolverSuite {
     assertEquals(parsedAppId, appId);
 
     ExecutorShuffleInfo shuffleInfo =
-      new ExecutorShuffleInfo(new String[]{"/bippy", "/flippy"}, 7, "hash");
+      new ExecutorShuffleInfo(new String[]{"/bippy", "/flippy"}, 7, "sort");
     String shuffleJson = mapper.writeValueAsString(shuffleInfo);
     ExecutorShuffleInfo parsedShuffleInfo =
       mapper.readValue(shuffleJson, ExecutorShuffleInfo.class);
@@ -158,7 +131,7 @@ public class ExternalShuffleBlockResolverSuite {
     String legacyAppIdJson = "{\"appId\":\"foo\", \"execId\":\"bar\"}";
     assertEquals(appId, mapper.readValue(legacyAppIdJson, AppExecId.class));
     String legacyShuffleJson = "{\"localDirs\": [\"/bippy\", \"/flippy\"], " +
-      "\"subDirsPerLocalDir\": 7, \"shuffleManager\": \"hash\"}";
+      "\"subDirsPerLocalDir\": 7, \"shuffleManager\": \"sort\"}";
     assertEquals(shuffleInfo, mapper.readValue(legacyShuffleJson, ExecutorShuffleInfo.class));
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/5e92583d/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
index 43d0201..fa5cd13 100644
--- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
@@ -144,9 +144,6 @@ public class ExternalShuffleCleanupSuite {
     dataContext.insertSortShuffleData(rand.nextInt(1000), rand.nextInt(1000), new byte[][] {
         "ABC".getBytes(StandardCharsets.UTF_8),
         "DEF".getBytes(StandardCharsets.UTF_8)});
-    dataContext.insertHashShuffleData(rand.nextInt(1000), rand.nextInt(1000) + 1000, new byte[][] {
-        "GHI".getBytes(StandardCharsets.UTF_8),
-        "JKLMNOPQRSTUVWXYZ".getBytes(StandardCharsets.UTF_8)});
     return dataContext;
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/5e92583d/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
index ecbbe7b..067c815 100644
--- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
@@ -50,12 +50,9 @@ public class ExternalShuffleIntegrationSuite {
 
   static String APP_ID = "app-id";
   static String SORT_MANAGER = "sort";
-  static String HASH_MANAGER = "hash";
 
   // Executor 0 is sort-based
   static TestShuffleDataContext dataContext0;
-  // Executor 1 is hash-based
-  static TestShuffleDataContext dataContext1;
 
   static ExternalShuffleBlockHandler handler;
   static TransportServer server;
@@ -87,10 +84,6 @@ public class ExternalShuffleIntegrationSuite {
     dataContext0.create();
     dataContext0.insertSortShuffleData(0, 0, exec0Blocks);
 
-    dataContext1 = new TestShuffleDataContext(6, 2);
-    dataContext1.create();
-    dataContext1.insertHashShuffleData(1, 0, exec1Blocks);
-
     conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
     handler = new ExternalShuffleBlockHandler(conf, null);
     TransportContext transportContext = new TransportContext(conf, handler);
@@ -100,7 +93,6 @@ public class ExternalShuffleIntegrationSuite {
   @AfterClass
   public static void afterAll() {
     dataContext0.cleanup();
-    dataContext1.cleanup();
     server.close();
   }
 
@@ -193,39 +185,17 @@ public class ExternalShuffleIntegrationSuite {
   }
 
   @Test
-  public void testFetchHash() throws Exception {
-    registerExecutor("exec-1", dataContext1.createExecutorInfo(HASH_MANAGER));
-    FetchResult execFetch = fetchBlocks("exec-1",
-      new String[] { "shuffle_1_0_0", "shuffle_1_0_1" });
-    assertEquals(Sets.newHashSet("shuffle_1_0_0", "shuffle_1_0_1"), execFetch.successBlocks);
-    assertTrue(execFetch.failedBlocks.isEmpty());
-    assertBufferListsEqual(execFetch.buffers, Lists.newArrayList(exec1Blocks));
-    execFetch.releaseBuffers();
-  }
-
-  @Test
-  public void testFetchWrongShuffle() throws Exception {
-    registerExecutor("exec-1", dataContext1.createExecutorInfo(SORT_MANAGER /* wrong manager */));
-    FetchResult execFetch = fetchBlocks("exec-1",
-      new String[] { "shuffle_1_0_0", "shuffle_1_0_1" });
-    assertTrue(execFetch.successBlocks.isEmpty());
-    assertEquals(Sets.newHashSet("shuffle_1_0_0", "shuffle_1_0_1"), execFetch.failedBlocks);
-  }
-
-  @Test
   public void testFetchInvalidShuffle() throws Exception {
-    registerExecutor("exec-1", dataContext1.createExecutorInfo("unknown sort manager"));
-    FetchResult execFetch = fetchBlocks("exec-1",
-      new String[] { "shuffle_1_0_0" });
+    registerExecutor("exec-1", dataContext0.createExecutorInfo("unknown sort manager"));
+    FetchResult execFetch = fetchBlocks("exec-1", new String[] { "shuffle_1_0_0" });
     assertTrue(execFetch.successBlocks.isEmpty());
     assertEquals(Sets.newHashSet("shuffle_1_0_0"), execFetch.failedBlocks);
   }
 
   @Test
   public void testFetchWrongBlockId() throws Exception {
-    registerExecutor("exec-1", dataContext1.createExecutorInfo(SORT_MANAGER /* wrong manager */));
-    FetchResult execFetch = fetchBlocks("exec-1",
-      new String[] { "rdd_1_0_0" });
+    registerExecutor("exec-1", dataContext0.createExecutorInfo(SORT_MANAGER));
+    FetchResult execFetch = fetchBlocks("exec-1", new String[] { "rdd_1_0_0" });
     assertTrue(execFetch.successBlocks.isEmpty());
     assertEquals(Sets.newHashSet("rdd_1_0_0"), execFetch.failedBlocks);
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/5e92583d/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java
index 7ac1ca1..62a1fb4 100644
--- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java
@@ -29,7 +29,7 @@ import com.google.common.io.Files;
 import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
 
 /**
- * Manages some sort- and hash-based shuffle data, including the creation
+ * Manages some sort-shuffle data, including the creation
  * and cleanup of directories that can be read by the {@link ExternalShuffleBlockResolver}.
  */
 public class TestShuffleDataContext {
@@ -85,15 +85,6 @@ public class TestShuffleDataContext {
     }
   }
 
-  /** Creates reducer blocks in a hash-based data format within our local dirs. */
-  public void insertHashShuffleData(int shuffleId, int mapId, byte[][] blocks) throws IOException {
-    for (int i = 0; i < blocks.length; i ++) {
-      String blockId = "shuffle_" + shuffleId + "_" + mapId + "_" + i;
-      Files.write(blocks[i],
-        ExternalShuffleBlockResolver.getFile(localDirs, subDirsPerLocalDir, blockId));
-    }
-  }
-
   /**
    * Creates an ExecutorShuffleInfo object based on the given shuffle manager which targets this
    * context's directories.

http://git-wip-us.apache.org/repos/asf/spark/blob/5e92583d/core/src/main/scala/org/apache/spark/SparkEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 3d11db7..27497e2 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -298,9 +298,8 @@ object SparkEnv extends Logging {
 
     // Let the user specify short names for shuffle managers
     val shortShuffleMgrNames = Map(
-      "hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
-      "sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager",
-      "tungsten-sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager")
+      "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
+      "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
     val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
     val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
     val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)

http://git-wip-us.apache.org/repos/asf/spark/blob/5e92583d/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala
deleted file mode 100644
index be1e84a..0000000
--- a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.shuffle
-
-import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue}
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.{SparkConf, SparkEnv}
-import org.apache.spark.executor.ShuffleWriteMetrics
-import org.apache.spark.internal.Logging
-import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
-import org.apache.spark.network.netty.SparkTransportConf
-import org.apache.spark.serializer.Serializer
-import org.apache.spark.storage._
-import org.apache.spark.util.Utils
-
-/** A group of writers for a ShuffleMapTask, one writer per reducer. */
-private[spark] trait ShuffleWriterGroup {
-  val writers: Array[DiskBlockObjectWriter]
-
-  /** @param success Indicates all writes were successful. If false, no blocks will be recorded. */
-  def releaseWriters(success: Boolean): Unit
-}
-
-/**
- * Manages assigning disk-based block writers to shuffle tasks. Each shuffle task gets one file
- * per reducer.
- */
-// Note: Changes to the format in this file should be kept in sync with
-// org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getHashBasedShuffleBlockData().
-private[spark] class FileShuffleBlockResolver(conf: SparkConf)
-  extends ShuffleBlockResolver with Logging {
-
-  private val transportConf = SparkTransportConf.fromSparkConf(conf, "shuffle")
-
-  private lazy val blockManager = SparkEnv.get.blockManager
-
-  // Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
-  private val bufferSize = conf.getSizeAsKb("spark.shuffle.file.buffer", "32k").toInt * 1024
-
-  /**
-   * Contains all the state related to a particular shuffle.
-   */
-  private class ShuffleState(val numReducers: Int) {
-    /**
-     * The mapIds of all map tasks completed on this Executor for this shuffle.
-     */
-    val completedMapTasks = new ConcurrentLinkedQueue[Int]()
-  }
-
-  private val shuffleStates = new ConcurrentHashMap[ShuffleId, ShuffleState]
-
-  /**
-   * Get a ShuffleWriterGroup for the given map task, which will register it as complete
-   * when the writers are closed successfully
-   */
-  def forMapTask(shuffleId: Int, mapId: Int, numReducers: Int, serializer: Serializer,
-      writeMetrics: ShuffleWriteMetrics): ShuffleWriterGroup = {
-    new ShuffleWriterGroup {
-      private val shuffleState: ShuffleState = {
-        // Note: we do _not_ want to just wrap this java ConcurrentHashMap into a Scala map and use
-        // .getOrElseUpdate() because that's actually NOT atomic.
-        shuffleStates.putIfAbsent(shuffleId, new ShuffleState(numReducers))
-        shuffleStates.get(shuffleId)
-      }
-      val openStartTime = System.nanoTime
-      val serializerInstance = serializer.newInstance()
-      val writers: Array[DiskBlockObjectWriter] = {
-        Array.tabulate[DiskBlockObjectWriter](numReducers) { bucketId =>
-          val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
-          val blockFile = blockManager.diskBlockManager.getFile(blockId)
-          val tmp = Utils.tempFileWith(blockFile)
-          blockManager.getDiskWriter(blockId, tmp, serializerInstance, bufferSize, writeMetrics)
-        }
-      }
-      // Creating the file to write to and creating a disk writer both involve interacting with
-      // the disk, so should be included in the shuffle write time.
-      writeMetrics.incWriteTime(System.nanoTime - openStartTime)
-
-      override def releaseWriters(success: Boolean) {
-        shuffleState.completedMapTasks.add(mapId)
-      }
-    }
-  }
-
-  override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = {
-    val file = blockManager.diskBlockManager.getFile(blockId)
-    new FileSegmentManagedBuffer(transportConf, file, 0, file.length)
-  }
-
-  /** Remove all the blocks / files and metadata related to a particular shuffle. */
-  def removeShuffle(shuffleId: ShuffleId): Boolean = {
-    // Do not change the ordering of this, if shuffleStates should be removed only
-    // after the corresponding shuffle blocks have been removed
-    val cleaned = removeShuffleBlocks(shuffleId)
-    shuffleStates.remove(shuffleId)
-    cleaned
-  }
-
-  /** Remove all the blocks / files related to a particular shuffle. */
-  private def removeShuffleBlocks(shuffleId: ShuffleId): Boolean = {
-    Option(shuffleStates.get(shuffleId)) match {
-      case Some(state) =>
-        for (mapId <- state.completedMapTasks.asScala; reduceId <- 0 until state.numReducers) {
-          val blockId = new ShuffleBlockId(shuffleId, mapId, reduceId)
-          val file = blockManager.diskBlockManager.getFile(blockId)
-          if (!file.delete()) {
-            logWarning(s"Error deleting ${file.getPath()}")
-          }
-        }
-        logInfo("Deleted all files for shuffle " + shuffleId)
-        true
-      case None =>
-        logInfo("Could not find files for shuffle " + shuffleId + " for deleting")
-        false
-    }
-  }
-
-  override def stop(): Unit = {}
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/5e92583d/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala
deleted file mode 100644
index 6bb4ff9..0000000
--- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.shuffle.hash
-
-import org.apache.spark._
-import org.apache.spark.internal.Logging
-import org.apache.spark.shuffle._
-
-/**
- * A ShuffleManager using hashing, that creates one output file per reduce partition on each
- * mapper (possibly reusing these across waves of tasks).
- */
-private[spark] class HashShuffleManager(conf: SparkConf) extends ShuffleManager with Logging {
-
-  if (!conf.getBoolean("spark.shuffle.spill", true)) {
-    logWarning(
-      "spark.shuffle.spill was set to false, but this configuration is ignored as of Spark 1.6+." +
-        " Shuffle will continue to spill to disk when necessary.")
-  }
-
-  private val fileShuffleBlockResolver = new FileShuffleBlockResolver(conf)
-
-  override val shortName: String = "hash"
-
-  /* Register a shuffle with the manager and obtain a handle for it to pass to tasks. */
-  override def registerShuffle[K, V, C](
-      shuffleId: Int,
-      numMaps: Int,
-      dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
-    new BaseShuffleHandle(shuffleId, numMaps, dependency)
-  }
-
-  /**
-   * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive).
-   * Called on executors by reduce tasks.
-   */
-  override def getReader[K, C](
-      handle: ShuffleHandle,
-      startPartition: Int,
-      endPartition: Int,
-      context: TaskContext): ShuffleReader[K, C] = {
-    new BlockStoreShuffleReader(
-      handle.asInstanceOf[BaseShuffleHandle[K, _, C]], startPartition, endPartition, context)
-  }
-
-  /** Get a writer for a given partition. Called on executors by map tasks. */
-  override def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: TaskContext)
-      : ShuffleWriter[K, V] = {
-    new HashShuffleWriter(
-      shuffleBlockResolver, handle.asInstanceOf[BaseShuffleHandle[K, V, _]], mapId, context)
-  }
-
-  /** Remove a shuffle's metadata from the ShuffleManager. */
-  override def unregisterShuffle(shuffleId: Int): Boolean = {
-    shuffleBlockResolver.removeShuffle(shuffleId)
-  }
-
-  override def shuffleBlockResolver: FileShuffleBlockResolver = {
-    fileShuffleBlockResolver
-  }
-
-  /** Shut down this ShuffleManager. */
-  override def stop(): Unit = {
-    shuffleBlockResolver.stop()
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/5e92583d/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
deleted file mode 100644
index 6c4444f..0000000
--- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.shuffle.hash
-
-import java.io.IOException
-
-import org.apache.spark._
-import org.apache.spark.internal.Logging
-import org.apache.spark.scheduler.MapStatus
-import org.apache.spark.shuffle._
-import org.apache.spark.storage.DiskBlockObjectWriter
-
-private[spark] class HashShuffleWriter[K, V](
-    shuffleBlockResolver: FileShuffleBlockResolver,
-    handle: BaseShuffleHandle[K, V, _],
-    mapId: Int,
-    context: TaskContext)
-  extends ShuffleWriter[K, V] with Logging {
-
-  private val dep = handle.dependency
-  private val numOutputSplits = dep.partitioner.numPartitions
-  private val metrics = context.taskMetrics
-
-  // Are we in the process of stopping? Because map tasks can call stop() with success = true
-  // and then call stop() with success = false if they get an exception, we want to make sure
-  // we don't try deleting files, etc twice.
-  private var stopping = false
-
-  private val writeMetrics = metrics.shuffleWriteMetrics
-
-  private val blockManager = SparkEnv.get.blockManager
-  private val shuffle = shuffleBlockResolver.forMapTask(dep.shuffleId, mapId, numOutputSplits,
-    dep.serializer, writeMetrics)
-
-  /** Write a bunch of records to this task's output */
-  override def write(records: Iterator[Product2[K, V]]): Unit = {
-    val iter = if (dep.aggregator.isDefined) {
-      if (dep.mapSideCombine) {
-        dep.aggregator.get.combineValuesByKey(records, context)
-      } else {
-        records
-      }
-    } else {
-      require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!")
-      records
-    }
-
-    for (elem <- iter) {
-      val bucketId = dep.partitioner.getPartition(elem._1)
-      shuffle.writers(bucketId).write(elem._1, elem._2)
-    }
-  }
-
-  /** Close this writer, passing along whether the map completed */
-  override def stop(initiallySuccess: Boolean): Option[MapStatus] = {
-    var success = initiallySuccess
-    try {
-      if (stopping) {
-        return None
-      }
-      stopping = true
-      if (success) {
-        try {
-          Some(commitWritesAndBuildStatus())
-        } catch {
-          case e: Exception =>
-            success = false
-            revertWrites()
-            throw e
-        }
-      } else {
-        revertWrites()
-        None
-      }
-    } finally {
-      // Release the writers back to the shuffle block manager.
-      if (shuffle != null && shuffle.writers != null) {
-        try {
-          shuffle.releaseWriters(success)
-        } catch {
-          case e: Exception => logError("Failed to release shuffle writers", e)
-        }
-      }
-    }
-  }
-
-  private def commitWritesAndBuildStatus(): MapStatus = {
-    // Commit the writes. Get the size of each bucket block (total block size).
-    val sizes: Array[Long] = shuffle.writers.map { writer: DiskBlockObjectWriter =>
-      writer.commitAndClose()
-      writer.fileSegment().length
-    }
-    // rename all shuffle files to final paths
-    // Note: there is only one ShuffleBlockResolver in executor
-    shuffleBlockResolver.synchronized {
-      shuffle.writers.zipWithIndex.foreach { case (writer, i) =>
-        val output = blockManager.diskBlockManager.getFile(writer.blockId)
-        if (sizes(i) > 0) {
-          if (output.exists()) {
-            // Use length of existing file and delete our own temporary one
-            sizes(i) = output.length()
-            writer.file.delete()
-          } else {
-            // Commit by renaming our temporary file to something the fetcher expects
-            if (!writer.file.renameTo(output)) {
-              throw new IOException(s"fail to rename ${writer.file} to $output")
-            }
-          }
-        } else {
-          if (output.exists()) {
-            output.delete()
-          }
-        }
-      }
-    }
-    MapStatus(blockManager.shuffleServerId, sizes)
-  }
-
-  private def revertWrites(): Unit = {
-    if (shuffle != null && shuffle.writers != null) {
-      for (writer <- shuffle.writers) {
-        writer.revertPartialWritesAndClose()
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/5e92583d/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
index f981505..69ff6c7 100644
--- a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
@@ -30,7 +30,6 @@ import org.scalatest.time.SpanSugar._
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.{RDD, ReliableRDDCheckpointData}
-import org.apache.spark.shuffle.hash.HashShuffleManager
 import org.apache.spark.shuffle.sort.SortShuffleManager
 import org.apache.spark.storage._
 
@@ -39,7 +38,7 @@ import org.apache.spark.storage._
  * suitable for cleaner tests and provides some utility functions. Subclasses can use different
  * config options, in particular, a different shuffle manager class
  */
-abstract class ContextCleanerSuiteBase(val shuffleManager: Class[_] = classOf[HashShuffleManager])
+abstract class ContextCleanerSuiteBase(val shuffleManager: Class[_] = classOf[SortShuffleManager])
   extends SparkFunSuite with BeforeAndAfter with LocalSparkContext
 {
   implicit val defaultTimeout = timeout(10000 millis)
@@ -354,84 +353,6 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
 
 
 /**
- * A copy of the shuffle tests for sort-based shuffle
- */
-class SortShuffleContextCleanerSuite extends ContextCleanerSuiteBase(classOf[SortShuffleManager]) {
-  test("cleanup shuffle") {
-    val (rdd, shuffleDeps) = newRDDWithShuffleDependencies()
-    val collected = rdd.collect().toList
-    val tester = new CleanerTester(sc, shuffleIds = shuffleDeps.map(_.shuffleId))
-
-    // Explicit cleanup
-    shuffleDeps.foreach(s => cleaner.doCleanupShuffle(s.shuffleId, blocking = true))
-    tester.assertCleanup()
-
-    // Verify that shuffles can be re-executed after cleaning up
-    assert(rdd.collect().toList.equals(collected))
-  }
-
-  test("automatically cleanup shuffle") {
-    var rdd = newShuffleRDD()
-    rdd.count()
-
-    // Test that GC does not cause shuffle cleanup due to a strong reference
-    val preGCTester = new CleanerTester(sc, shuffleIds = Seq(0))
-    runGC()
-    intercept[Exception] {
-      preGCTester.assertCleanup()(timeout(1000 millis))
-    }
-    rdd.count()  // Defeat early collection by the JVM
-
-    // Test that GC causes shuffle cleanup after dereferencing the RDD
-    val postGCTester = new CleanerTester(sc, shuffleIds = Seq(0))
-    rdd = null  // Make RDD out of scope, so that corresponding shuffle goes out of scope
-    runGC()
-    postGCTester.assertCleanup()
-  }
-
-  test("automatically cleanup RDD + shuffle + broadcast in distributed mode") {
-    sc.stop()
-
-    val conf2 = new SparkConf()
-      .setMaster("local-cluster[2, 1, 1024]")
-      .setAppName("ContextCleanerSuite")
-      .set("spark.cleaner.referenceTracking.blocking", "true")
-      .set("spark.cleaner.referenceTracking.blocking.shuffle", "true")
-      .set("spark.shuffle.manager", shuffleManager.getName)
-    sc = new SparkContext(conf2)
-
-    val numRdds = 10
-    val numBroadcasts = 4 // Broadcasts are more costly
-    val rddBuffer = (1 to numRdds).map(i => randomRdd).toBuffer
-    val broadcastBuffer = (1 to numBroadcasts).map(i => newBroadcast).toBuffer
-    val rddIds = sc.persistentRdds.keys.toSeq
-    val shuffleIds = 0 until sc.newShuffleId()
-    val broadcastIds = broadcastBuffer.map(_.id)
-
-    val preGCTester = new CleanerTester(sc, rddIds, shuffleIds, broadcastIds)
-    runGC()
-    intercept[Exception] {
-      preGCTester.assertCleanup()(timeout(1000 millis))
-    }
-
-    // Test that GC triggers the cleanup of all variables after the dereferencing them
-    val postGCTester = new CleanerTester(sc, rddIds, shuffleIds, broadcastIds)
-    broadcastBuffer.clear()
-    rddBuffer.clear()
-    runGC()
-    postGCTester.assertCleanup()
-
-    // Make sure the broadcasted task closure no longer exists after GC.
-    val taskClosureBroadcastId = broadcastIds.max + 1
-    assert(sc.env.blockManager.master.getMatchingBlockIds({
-      case BroadcastBlockId(`taskClosureBroadcastId`, _) => true
-      case _ => false
-    }, askSlaves = true).isEmpty)
-  }
-}
-
-
-/**
  * Class to test whether RDDs, shuffles, etc. have been successfully cleaned.
  * The checkpoint here refers only to normal (reliable) checkpoints, not local checkpoints.
  */

http://git-wip-us.apache.org/repos/asf/spark/blob/5e92583d/core/src/test/scala/org/apache/spark/HashShuffleSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/HashShuffleSuite.scala b/core/src/test/scala/org/apache/spark/HashShuffleSuite.scala
deleted file mode 100644
index 1079423..0000000
--- a/core/src/test/scala/org/apache/spark/HashShuffleSuite.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark
-
-import org.scalatest.BeforeAndAfterAll
-
-class HashShuffleSuite extends ShuffleSuite with BeforeAndAfterAll {
-
-  // This test suite should run all tests in ShuffleSuite with hash-based shuffle.
-
-  override def beforeAll() {
-    super.beforeAll()
-    conf.set("spark.shuffle.manager", "hash")
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/5e92583d/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index d26df7e..d14728c 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -33,7 +33,7 @@ import org.apache.spark.network.netty.NettyBlockTransferService
 import org.apache.spark.rpc.RpcEnv
 import org.apache.spark.scheduler.LiveListenerBus
 import org.apache.spark.serializer.{KryoSerializer, SerializerManager}
-import org.apache.spark.shuffle.hash.HashShuffleManager
+import org.apache.spark.shuffle.sort.SortShuffleManager
 import org.apache.spark.storage.StorageLevel._
 
 /** Testsuite that tests block replication in BlockManager */
@@ -44,7 +44,7 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
   private var master: BlockManagerMaster = null
   private val securityMgr = new SecurityManager(conf)
   private val mapOutputTracker = new MapOutputTrackerMaster(conf)
-  private val shuffleManager = new HashShuffleManager(conf)
+  private val shuffleManager = new SortShuffleManager(conf)
 
   // List of block manager created during an unit test, so that all of the them can be stopped
   // after the unit test.

http://git-wip-us.apache.org/repos/asf/spark/blob/5e92583d/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index a1c2933..db1efaf 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -42,7 +42,7 @@ import org.apache.spark.network.shuffle.BlockFetchingListener
 import org.apache.spark.rpc.RpcEnv
 import org.apache.spark.scheduler.LiveListenerBus
 import org.apache.spark.serializer.{JavaSerializer, KryoSerializer, SerializerManager}
-import org.apache.spark.shuffle.hash.HashShuffleManager
+import org.apache.spark.shuffle.sort.SortShuffleManager
 import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat
 import org.apache.spark.util._
 import org.apache.spark.util.io.ChunkedByteBuffer
@@ -60,7 +60,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
   var master: BlockManagerMaster = null
   val securityMgr = new SecurityManager(new SparkConf(false))
   val mapOutputTracker = new MapOutputTrackerMaster(new SparkConf(false))
-  val shuffleManager = new HashShuffleManager(new SparkConf(false))
+  val shuffleManager = new SortShuffleManager(new SparkConf(false))
 
   // Reuse a serializer across tests to avoid creating a new thread-local buffer on each test
   val serializer = new KryoSerializer(new SparkConf(false).set("spark.kryoserializer.buffer", "1m"))

http://git-wip-us.apache.org/repos/asf/spark/blob/5e92583d/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
index dc3185a..2410118 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
@@ -237,7 +237,6 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext {
   private def testSimpleSpilling(codec: Option[String] = None): Unit = {
     val size = 1000
     val conf = createSparkConf(loadDefaults = true, codec)  // Load defaults for Spark home
-    conf.set("spark.shuffle.manager", "hash") // avoid using external sorter
     conf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 4).toString)
     sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
 
@@ -401,7 +400,6 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext {
   test("external aggregation updates peak execution memory") {
     val spillThreshold = 1000
     val conf = createSparkConf(loadDefaults = false)
-      .set("spark.shuffle.manager", "hash") // make sure we're not also using ExternalSorter
       .set("spark.shuffle.spill.numElementsForceSpillThreshold", spillThreshold.toString)
     sc = new SparkContext("local", "test", conf)
     // No spilling

http://git-wip-us.apache.org/repos/asf/spark/blob/5e92583d/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index 16d5be6..6512e16 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -456,15 +456,6 @@ Apart from these, the following properties are also available, and may be useful
   </td>
 </tr>
 <tr>
-  <td><code>spark.shuffle.manager</code></td>
-  <td>sort</td>
-  <td>
-    Implementation to use for shuffling data. There are two implementations available:
-    <code>sort</code> and <code>hash</code>.
-    Sort-based shuffle is more memory-efficient and is the default option starting in 1.2.
-  </td>
-</tr>
-<tr>
   <td><code>spark.shuffle.service.enabled</code></td>
   <td>false</td>
   <td>

http://git-wip-us.apache.org/repos/asf/spark/blob/5e92583d/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala
index 7e35db7..d7deac9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala
@@ -22,7 +22,6 @@ import java.util.Random
 import org.apache.spark._
 import org.apache.spark.rdd.RDD
 import org.apache.spark.serializer.Serializer
-import org.apache.spark.shuffle.hash.HashShuffleManager
 import org.apache.spark.shuffle.sort.SortShuffleManager
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.errors._
@@ -179,9 +178,6 @@ object ShuffleExchange {
         // copy.
         true
       }
-    } else if (shuffleManager.isInstanceOf[HashShuffleManager]) {
-      // We're using hash-based shuffle, so we don't need to copy.
-      false
     } else {
       // Catch-all case to safely handle any future ShuffleManager implementations.
       true

http://git-wip-us.apache.org/repos/asf/spark/blob/5e92583d/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
index 39d0de5..4be4882 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
@@ -35,7 +35,7 @@ import org.apache.spark.network.netty.NettyBlockTransferService
 import org.apache.spark.rpc.RpcEnv
 import org.apache.spark.scheduler.LiveListenerBus
 import org.apache.spark.serializer.{KryoSerializer, SerializerManager}
-import org.apache.spark.shuffle.hash.HashShuffleManager
+import org.apache.spark.shuffle.sort.SortShuffleManager
 import org.apache.spark.storage._
 import org.apache.spark.streaming.receiver._
 import org.apache.spark.streaming.util._
@@ -58,7 +58,7 @@ class ReceivedBlockHandlerSuite
   val streamId = 1
   val securityMgr = new SecurityManager(conf)
   val mapOutputTracker = new MapOutputTrackerMaster(conf)
-  val shuffleManager = new HashShuffleManager(conf)
+  val shuffleManager = new SortShuffleManager(conf)
   val serializer = new KryoSerializer(conf)
   var serializerManager = new SerializerManager(serializer, conf)
   val manualClock = new ManualClock


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org