You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2015/10/22 18:46:42 UTC

[4/4] spark git commit: [SPARK-10708] Consolidate sort shuffle implementations

[SPARK-10708] Consolidate sort shuffle implementations

There's a lot of duplication between SortShuffleManager and UnsafeShuffleManager. Given that these now provide the same set of functionality, now that UnsafeShuffleManager supports large records, I think that we should replace SortShuffleManager's serialized shuffle implementation with UnsafeShuffleManager's and should merge the two managers together.

Author: Josh Rosen <jo...@databricks.com>

Closes #8829 from JoshRosen/consolidate-sort-shuffle-implementations.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f6d06adf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f6d06adf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f6d06adf

Branch: refs/heads/master
Commit: f6d06adf05afa9c5386dc2396c94e7a98730289f
Parents: 94e2064
Author: Josh Rosen <jo...@databricks.com>
Authored: Thu Oct 22 09:46:30 2015 -0700
Committer: Josh Rosen <jo...@databricks.com>
Committed: Thu Oct 22 09:46:30 2015 -0700

----------------------------------------------------------------------
 .../sort/BypassMergeSortShuffleWriter.java      | 106 +++-
 .../spark/shuffle/sort/PackedRecordPointer.java |  92 +++
 .../shuffle/sort/ShuffleExternalSorter.java     | 491 ++++++++++++++++
 .../shuffle/sort/ShuffleInMemorySorter.java     | 124 ++++
 .../shuffle/sort/ShuffleSortDataFormat.java     |  67 +++
 .../shuffle/sort/SortShuffleFileWriter.java     |  53 --
 .../apache/spark/shuffle/sort/SpillInfo.java    |  37 ++
 .../spark/shuffle/sort/UnsafeShuffleWriter.java | 489 ++++++++++++++++
 .../shuffle/unsafe/PackedRecordPointer.java     |  92 ---
 .../apache/spark/shuffle/unsafe/SpillInfo.java  |  37 --
 .../unsafe/UnsafeShuffleExternalSorter.java     | 479 ----------------
 .../unsafe/UnsafeShuffleInMemorySorter.java     | 124 ----
 .../unsafe/UnsafeShuffleSortDataFormat.java     |  67 ---
 .../shuffle/unsafe/UnsafeShuffleWriter.java     | 489 ----------------
 .../main/scala/org/apache/spark/SparkEnv.scala  |   2 +-
 .../spark/shuffle/sort/SortShuffleManager.scala | 175 +++++-
 .../spark/shuffle/sort/SortShuffleWriter.scala  |  28 +-
 .../shuffle/unsafe/UnsafeShuffleManager.scala   | 202 -------
 .../spark/util/collection/ChainedBuffer.scala   | 146 -----
 .../spark/util/collection/ExternalSorter.scala  |  35 +-
 .../PartitionedSerializedPairBuffer.scala       | 273 ---------
 .../shuffle/sort/PackedRecordPointerSuite.java  | 102 ++++
 .../sort/ShuffleInMemorySorterSuite.java        | 124 ++++
 .../shuffle/sort/UnsafeShuffleWriterSuite.java  | 560 +++++++++++++++++++
 .../unsafe/PackedRecordPointerSuite.java        | 101 ----
 .../UnsafeShuffleInMemorySorterSuite.java       | 124 ----
 .../unsafe/UnsafeShuffleWriterSuite.java        | 560 -------------------
 .../org/apache/spark/SortShuffleSuite.scala     |  65 +++
 .../spark/scheduler/DAGSchedulerSuite.scala     |   6 +-
 .../BypassMergeSortShuffleWriterSuite.scala     |  64 ++-
 .../shuffle/sort/SortShuffleManagerSuite.scala  | 131 +++++
 .../shuffle/sort/SortShuffleWriterSuite.scala   |  45 --
 .../unsafe/UnsafeShuffleManagerSuite.scala      | 129 -----
 .../shuffle/unsafe/UnsafeShuffleSuite.scala     | 102 ----
 .../util/collection/ChainedBufferSuite.scala    | 144 -----
 .../PartitionedSerializedPairBufferSuite.scala  | 148 -----
 docs/configuration.md                           |   7 +-
 project/MimaExcludes.scala                      |   9 +-
 .../apache/spark/sql/execution/Exchange.scala   |  23 +-
 .../execution/UnsafeRowSerializerSuite.scala    |   9 +-
 40 files changed, 2600 insertions(+), 3461 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f6d06adf/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
index f5d80bb..ee82d67 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
@@ -21,21 +21,30 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import javax.annotation.Nullable;
 
+import scala.None$;
+import scala.Option;
 import scala.Product2;
 import scala.Tuple2;
 import scala.collection.Iterator;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.io.Closeables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.spark.Partitioner;
+import org.apache.spark.ShuffleDependency;
 import org.apache.spark.SparkConf;
 import org.apache.spark.TaskContext;
 import org.apache.spark.executor.ShuffleWriteMetrics;
+import org.apache.spark.scheduler.MapStatus;
+import org.apache.spark.scheduler.MapStatus$;
 import org.apache.spark.serializer.Serializer;
 import org.apache.spark.serializer.SerializerInstance;
+import org.apache.spark.shuffle.IndexShuffleBlockResolver;
+import org.apache.spark.shuffle.ShuffleWriter;
 import org.apache.spark.storage.*;
 import org.apache.spark.util.Utils;
 
@@ -62,7 +71,7 @@ import org.apache.spark.util.Utils;
  * <p>
  * There have been proposals to completely remove this code path; see SPARK-6026 for details.
  */
-final class BypassMergeSortShuffleWriter<K, V> implements SortShuffleFileWriter<K, V> {
+final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
 
   private final Logger logger = LoggerFactory.getLogger(BypassMergeSortShuffleWriter.class);
 
@@ -72,31 +81,52 @@ final class BypassMergeSortShuffleWriter<K, V> implements SortShuffleFileWriter<
   private final BlockManager blockManager;
   private final Partitioner partitioner;
   private final ShuffleWriteMetrics writeMetrics;
+  private final int shuffleId;
+  private final int mapId;
   private final Serializer serializer;
+  private final IndexShuffleBlockResolver shuffleBlockResolver;
 
   /** Array of file writers, one for each partition */
   private DiskBlockObjectWriter[] partitionWriters;
+  @Nullable private MapStatus mapStatus;
+  private long[] partitionLengths;
+
+  /**
+   * Are we in the process of stopping? Because map tasks can call stop() with success = true
+   * and then call stop() with success = false if they get an exception, we want to make sure
+   * we don't try deleting files, etc twice.
+   */
+  private boolean stopping = false;
 
   public BypassMergeSortShuffleWriter(
-      SparkConf conf,
       BlockManager blockManager,
-      Partitioner partitioner,
-      ShuffleWriteMetrics writeMetrics,
-      Serializer serializer) {
+      IndexShuffleBlockResolver shuffleBlockResolver,
+      BypassMergeSortShuffleHandle<K, V> handle,
+      int mapId,
+      TaskContext taskContext,
+      SparkConf conf) {
     // Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
     this.fileBufferSize = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
     this.transferToEnabled = conf.getBoolean("spark.file.transferTo", true);
-    this.numPartitions = partitioner.numPartitions();
     this.blockManager = blockManager;
-    this.partitioner = partitioner;
-    this.writeMetrics = writeMetrics;
-    this.serializer = serializer;
+    final ShuffleDependency<K, V, V> dep = handle.dependency();
+    this.mapId = mapId;
+    this.shuffleId = dep.shuffleId();
+    this.partitioner = dep.partitioner();
+    this.numPartitions = partitioner.numPartitions();
+    this.writeMetrics = new ShuffleWriteMetrics();
+    taskContext.taskMetrics().shuffleWriteMetrics_$eq(Option.apply(writeMetrics));
+    this.serializer = Serializer.getSerializer(dep.serializer());
+    this.shuffleBlockResolver = shuffleBlockResolver;
   }
 
   @Override
-  public void insertAll(Iterator<Product2<K, V>> records) throws IOException {
+  public void write(Iterator<Product2<K, V>> records) throws IOException {
     assert (partitionWriters == null);
     if (!records.hasNext()) {
+      partitionLengths = new long[numPartitions];
+      shuffleBlockResolver.writeIndexFile(shuffleId, mapId, partitionLengths);
+      mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
       return;
     }
     final SerializerInstance serInstance = serializer.newInstance();
@@ -124,13 +154,24 @@ final class BypassMergeSortShuffleWriter<K, V> implements SortShuffleFileWriter<
     for (DiskBlockObjectWriter writer : partitionWriters) {
       writer.commitAndClose();
     }
+
+    partitionLengths =
+      writePartitionedFile(shuffleBlockResolver.getDataFile(shuffleId, mapId));
+    shuffleBlockResolver.writeIndexFile(shuffleId, mapId, partitionLengths);
+    mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
   }
 
-  @Override
-  public long[] writePartitionedFile(
-      BlockId blockId,
-      TaskContext context,
-      File outputFile) throws IOException {
+  @VisibleForTesting
+  long[] getPartitionLengths() {
+    return partitionLengths;
+  }
+
+  /**
+   * Concatenate all of the per-partition files into a single combined file.
+   *
+   * @return array of lengths, in bytes, of each partition of the file (used by map output tracker).
+   */
+  private long[] writePartitionedFile(File outputFile) throws IOException {
     // Track location of the partition starts in the output file
     final long[] lengths = new long[numPartitions];
     if (partitionWriters == null) {
@@ -165,18 +206,33 @@ final class BypassMergeSortShuffleWriter<K, V> implements SortShuffleFileWriter<
   }
 
   @Override
-  public void stop() throws IOException {
-    if (partitionWriters != null) {
-      try {
-        for (DiskBlockObjectWriter writer : partitionWriters) {
-          // This method explicitly does _not_ throw exceptions:
-          File file = writer.revertPartialWritesAndClose();
-          if (!file.delete()) {
-            logger.error("Error while deleting file {}", file.getAbsolutePath());
+  public Option<MapStatus> stop(boolean success) {
+    if (stopping) {
+      return None$.empty();
+    } else {
+      stopping = true;
+      if (success) {
+        if (mapStatus == null) {
+          throw new IllegalStateException("Cannot call stop(true) without having called write()");
+        }
+        return Option.apply(mapStatus);
+      } else {
+        // The map task failed, so delete our output data.
+        if (partitionWriters != null) {
+          try {
+            for (DiskBlockObjectWriter writer : partitionWriters) {
+              // This method explicitly does _not_ throw exceptions:
+              File file = writer.revertPartialWritesAndClose();
+              if (!file.delete()) {
+                logger.error("Error while deleting file {}", file.getAbsolutePath());
+              }
+            }
+          } finally {
+            partitionWriters = null;
           }
         }
-      } finally {
-        partitionWriters = null;
+        shuffleBlockResolver.removeDataByMap(shuffleId, mapId);
+        return None$.empty();
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/f6d06adf/core/src/main/java/org/apache/spark/shuffle/sort/PackedRecordPointer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/PackedRecordPointer.java b/core/src/main/java/org/apache/spark/shuffle/sort/PackedRecordPointer.java
new file mode 100644
index 0000000..c117119
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/PackedRecordPointer.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.sort;
+
+/**
+ * Wrapper around an 8-byte word that holds a 24-bit partition number and 40-bit record pointer.
+ * <p>
+ * Within the long, the data is laid out as follows:
+ * <pre>
+ *   [24 bit partition number][13 bit memory page number][27 bit offset in page]
+ * </pre>
+ * This implies that the maximum addressable page size is 2^27 bits = 128 megabytes, assuming that
+ * our offsets in pages are not 8-byte-word-aligned. Since we have 2^13 pages (based off the
+ * 13-bit page numbers assigned by {@link org.apache.spark.unsafe.memory.TaskMemoryManager}), this
+ * implies that we can address 2^13 * 128 megabytes = 1 terabyte of RAM per task.
+ * <p>
+ * Assuming word-alignment would allow for a 1 gigabyte maximum page size, but we leave this
+ * optimization to future work as it will require more careful design to ensure that addresses are
+ * properly aligned (e.g. by padding records).
+ */
+final class PackedRecordPointer {
+
+  static final int MAXIMUM_PAGE_SIZE_BYTES = 1 << 27;  // 128 megabytes
+
+  /**
+   * The maximum partition identifier that can be encoded. Note that partition ids start from 0.
+   */
+  static final int MAXIMUM_PARTITION_ID = (1 << 24) - 1;  // 16777215
+
+  /** Bit mask for the lower 40 bits of a long. */
+  private static final long MASK_LONG_LOWER_40_BITS = (1L << 40) - 1;
+
+  /** Bit mask for the upper 24 bits of a long */
+  private static final long MASK_LONG_UPPER_24_BITS = ~MASK_LONG_LOWER_40_BITS;
+
+  /** Bit mask for the lower 27 bits of a long. */
+  private static final long MASK_LONG_LOWER_27_BITS = (1L << 27) - 1;
+
+  /** Bit mask for the lower 51 bits of a long. */
+  private static final long MASK_LONG_LOWER_51_BITS = (1L << 51) - 1;
+
+  /** Bit mask for the upper 13 bits of a long */
+  private static final long MASK_LONG_UPPER_13_BITS = ~MASK_LONG_LOWER_51_BITS;
+
+  /**
+   * Pack a record address and partition id into a single word.
+   *
+   * @param recordPointer a record pointer encoded by TaskMemoryManager.
+   * @param partitionId a shuffle partition id (maximum value of 2^24).
+   * @return a packed pointer that can be decoded using the {@link PackedRecordPointer} class.
+   */
+  public static long packPointer(long recordPointer, int partitionId) {
+    assert (partitionId <= MAXIMUM_PARTITION_ID);
+    // Note that without word alignment we can address 2^27 bytes = 128 megabytes per page.
+    // Also note that this relies on some internals of how TaskMemoryManager encodes its addresses.
+    final long pageNumber = (recordPointer & MASK_LONG_UPPER_13_BITS) >>> 24;
+    final long compressedAddress = pageNumber | (recordPointer & MASK_LONG_LOWER_27_BITS);
+    return (((long) partitionId) << 40) | compressedAddress;
+  }
+
+  private long packedRecordPointer;
+
+  public void set(long packedRecordPointer) {
+    this.packedRecordPointer = packedRecordPointer;
+  }
+
+  public int getPartitionId() {
+    return (int) ((packedRecordPointer & MASK_LONG_UPPER_24_BITS) >>> 40);
+  }
+
+  public long getRecordPointer() {
+    final long pageNumber = (packedRecordPointer << 24) & MASK_LONG_UPPER_13_BITS;
+    final long offsetInPage = packedRecordPointer & MASK_LONG_LOWER_27_BITS;
+    return pageNumber | offsetInPage;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f6d06adf/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
new file mode 100644
index 0000000..85fdaa8
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
@@ -0,0 +1,491 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.sort;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.LinkedList;
+
+import scala.Tuple2;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.TaskContext;
+import org.apache.spark.executor.ShuffleWriteMetrics;
+import org.apache.spark.serializer.DummySerializerInstance;
+import org.apache.spark.serializer.SerializerInstance;
+import org.apache.spark.shuffle.ShuffleMemoryManager;
+import org.apache.spark.storage.BlockManager;
+import org.apache.spark.storage.DiskBlockObjectWriter;
+import org.apache.spark.storage.TempShuffleBlockId;
+import org.apache.spark.unsafe.Platform;
+import org.apache.spark.unsafe.array.ByteArrayMethods;
+import org.apache.spark.unsafe.memory.MemoryBlock;
+import org.apache.spark.unsafe.memory.TaskMemoryManager;
+import org.apache.spark.util.Utils;
+
+/**
+ * An external sorter that is specialized for sort-based shuffle.
+ * <p>
+ * Incoming records are appended to data pages. When all records have been inserted (or when the
+ * current thread's shuffle memory limit is reached), the in-memory records are sorted according to
+ * their partition ids (using a {@link ShuffleInMemorySorter}). The sorted records are then
+ * written to a single output file (or multiple files, if we've spilled). The format of the output
+ * files is the same as the format of the final output file written by
+ * {@link org.apache.spark.shuffle.sort.SortShuffleWriter}: each output partition's records are
+ * written as a single serialized, compressed stream that can be read with a new decompression and
+ * deserialization stream.
+ * <p>
+ * Unlike {@link org.apache.spark.util.collection.ExternalSorter}, this sorter does not merge its
+ * spill files. Instead, this merging is performed in {@link UnsafeShuffleWriter}, which uses a
+ * specialized merge procedure that avoids extra serialization/deserialization.
+ */
+final class ShuffleExternalSorter {
+
+  private final Logger logger = LoggerFactory.getLogger(ShuffleExternalSorter.class);
+
+  @VisibleForTesting
+  static final int DISK_WRITE_BUFFER_SIZE = 1024 * 1024;
+
+  private final int initialSize;
+  private final int numPartitions;
+  private final int pageSizeBytes;
+  @VisibleForTesting
+  final int maxRecordSizeBytes;
+  private final TaskMemoryManager taskMemoryManager;
+  private final ShuffleMemoryManager shuffleMemoryManager;
+  private final BlockManager blockManager;
+  private final TaskContext taskContext;
+  private final ShuffleWriteMetrics writeMetrics;
+  private long numRecordsInsertedSinceLastSpill = 0;
+
+  /** Force this sorter to spill when there are this many elements in memory. For testing only */
+  private final long numElementsForSpillThreshold;
+
+  /** The buffer size to use when writing spills using DiskBlockObjectWriter */
+  private final int fileBufferSizeBytes;
+
+  /**
+   * Memory pages that hold the records being sorted. The pages in this list are freed when
+   * spilling, although in principle we could recycle these pages across spills (on the other hand,
+   * this might not be necessary if we maintained a pool of re-usable pages in the TaskMemoryManager
+   * itself).
+   */
+  private final LinkedList<MemoryBlock> allocatedPages = new LinkedList<MemoryBlock>();
+
+  private final LinkedList<SpillInfo> spills = new LinkedList<SpillInfo>();
+
+  /** Peak memory used by this sorter so far, in bytes. **/
+  private long peakMemoryUsedBytes;
+
+  // These variables are reset after spilling:
+  @Nullable private ShuffleInMemorySorter inMemSorter;
+  @Nullable private MemoryBlock currentPage = null;
+  private long currentPagePosition = -1;
+  private long freeSpaceInCurrentPage = 0;
+
+  public ShuffleExternalSorter(
+      TaskMemoryManager memoryManager,
+      ShuffleMemoryManager shuffleMemoryManager,
+      BlockManager blockManager,
+      TaskContext taskContext,
+      int initialSize,
+      int numPartitions,
+      SparkConf conf,
+      ShuffleWriteMetrics writeMetrics) throws IOException {
+    this.taskMemoryManager = memoryManager;
+    this.shuffleMemoryManager = shuffleMemoryManager;
+    this.blockManager = blockManager;
+    this.taskContext = taskContext;
+    this.initialSize = initialSize;
+    this.peakMemoryUsedBytes = initialSize;
+    this.numPartitions = numPartitions;
+    // Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
+    this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
+    this.numElementsForSpillThreshold =
+      conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold", Long.MAX_VALUE);
+    this.pageSizeBytes = (int) Math.min(
+      PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES, shuffleMemoryManager.pageSizeBytes());
+    this.maxRecordSizeBytes = pageSizeBytes - 4;
+    this.writeMetrics = writeMetrics;
+    initializeForWriting();
+
+    // preserve first page to ensure that we have at least one page to work with. Otherwise,
+    // other operators in the same task may starve this sorter (SPARK-9709).
+    acquireNewPageIfNecessary(pageSizeBytes);
+  }
+
+  /**
+   * Allocates new sort data structures. Called when creating the sorter and after each spill.
+   */
+  private void initializeForWriting() throws IOException {
+    // TODO: move this sizing calculation logic into a static method of sorter:
+    final long memoryRequested = initialSize * 8L;
+    final long memoryAcquired = shuffleMemoryManager.tryToAcquire(memoryRequested);
+    if (memoryAcquired != memoryRequested) {
+      shuffleMemoryManager.release(memoryAcquired);
+      throw new IOException("Could not acquire " + memoryRequested + " bytes of memory");
+    }
+
+    this.inMemSorter = new ShuffleInMemorySorter(initialSize);
+    numRecordsInsertedSinceLastSpill = 0;
+  }
+
+  /**
+   * Sorts the in-memory records and writes the sorted records to an on-disk file.
+   * This method does not free the sort data structures.
+   *
+   * @param isLastFile if true, this indicates that we're writing the final output file and that the
+   *                   bytes written should be counted towards shuffle spill metrics rather than
+   *                   shuffle write metrics.
+   */
+  private void writeSortedFile(boolean isLastFile) throws IOException {
+
+    final ShuffleWriteMetrics writeMetricsToUse;
+
+    if (isLastFile) {
+      // We're writing the final non-spill file, so we _do_ want to count this as shuffle bytes.
+      writeMetricsToUse = writeMetrics;
+    } else {
+      // We're spilling, so bytes written should be counted towards spill rather than write.
+      // Create a dummy WriteMetrics object to absorb these metrics, since we don't want to count
+      // them towards shuffle bytes written.
+      writeMetricsToUse = new ShuffleWriteMetrics();
+    }
+
+    // This call performs the actual sort.
+    final ShuffleInMemorySorter.ShuffleSorterIterator sortedRecords =
+      inMemSorter.getSortedIterator();
+
+    // Currently, we need to open a new DiskBlockObjectWriter for each partition; we can avoid this
+    // after SPARK-5581 is fixed.
+    DiskBlockObjectWriter writer;
+
+    // Small writes to DiskBlockObjectWriter will be fairly inefficient. Since there doesn't seem to
+    // be an API to directly transfer bytes from managed memory to the disk writer, we buffer
+    // data through a byte array. This array does not need to be large enough to hold a single
+    // record;
+    final byte[] writeBuffer = new byte[DISK_WRITE_BUFFER_SIZE];
+
+    // Because this output will be read during shuffle, its compression codec must be controlled by
+    // spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use
+    // createTempShuffleBlock here; see SPARK-3426 for more details.
+    final Tuple2<TempShuffleBlockId, File> spilledFileInfo =
+      blockManager.diskBlockManager().createTempShuffleBlock();
+    final File file = spilledFileInfo._2();
+    final TempShuffleBlockId blockId = spilledFileInfo._1();
+    final SpillInfo spillInfo = new SpillInfo(numPartitions, file, blockId);
+
+    // Unfortunately, we need a serializer instance in order to construct a DiskBlockObjectWriter.
+    // Our write path doesn't actually use this serializer (since we end up calling the `write()`
+    // OutputStream methods), but DiskBlockObjectWriter still calls some methods on it. To work
+    // around this, we pass a dummy no-op serializer.
+    final SerializerInstance ser = DummySerializerInstance.INSTANCE;
+
+    writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSizeBytes, writeMetricsToUse);
+
+    int currentPartition = -1;
+    while (sortedRecords.hasNext()) {
+      sortedRecords.loadNext();
+      final int partition = sortedRecords.packedRecordPointer.getPartitionId();
+      assert (partition >= currentPartition);
+      if (partition != currentPartition) {
+        // Switch to the new partition
+        if (currentPartition != -1) {
+          writer.commitAndClose();
+          spillInfo.partitionLengths[currentPartition] = writer.fileSegment().length();
+        }
+        currentPartition = partition;
+        writer =
+          blockManager.getDiskWriter(blockId, file, ser, fileBufferSizeBytes, writeMetricsToUse);
+      }
+
+      final long recordPointer = sortedRecords.packedRecordPointer.getRecordPointer();
+      final Object recordPage = taskMemoryManager.getPage(recordPointer);
+      final long recordOffsetInPage = taskMemoryManager.getOffsetInPage(recordPointer);
+      int dataRemaining = Platform.getInt(recordPage, recordOffsetInPage);
+      long recordReadPosition = recordOffsetInPage + 4; // skip over record length
+      while (dataRemaining > 0) {
+        final int toTransfer = Math.min(DISK_WRITE_BUFFER_SIZE, dataRemaining);
+        Platform.copyMemory(
+          recordPage, recordReadPosition, writeBuffer, Platform.BYTE_ARRAY_OFFSET, toTransfer);
+        writer.write(writeBuffer, 0, toTransfer);
+        recordReadPosition += toTransfer;
+        dataRemaining -= toTransfer;
+      }
+      writer.recordWritten();
+    }
+
+    if (writer != null) {
+      writer.commitAndClose();
+      // If `writeSortedFile()` was called from `closeAndGetSpills()` and no records were inserted,
+      // then the file might be empty. Note that it might be better to avoid calling
+      // writeSortedFile() in that case.
+      if (currentPartition != -1) {
+        spillInfo.partitionLengths[currentPartition] = writer.fileSegment().length();
+        spills.add(spillInfo);
+      }
+    }
+
+    if (!isLastFile) {  // i.e. this is a spill file
+      // The current semantics of `shuffleRecordsWritten` seem to be that it's updated when records
+      // are written to disk, not when they enter the shuffle sorting code. DiskBlockObjectWriter
+      // relies on its `recordWritten()` method being called in order to trigger periodic updates to
+      // `shuffleBytesWritten`. If we were to remove the `recordWritten()` call and increment that
+      // counter at a higher-level, then the in-progress metrics for records written and bytes
+      // written would get out of sync.
+      //
+      // When writing the last file, we pass `writeMetrics` directly to the DiskBlockObjectWriter;
+      // in all other cases, we pass in a dummy write metrics to capture metrics, then copy those
+      // metrics to the true write metrics here. The reason for performing this copying is so that
+      // we can avoid reporting spilled bytes as shuffle write bytes.
+      //
+      // Note that we intentionally ignore the value of `writeMetricsToUse.shuffleWriteTime()`.
+      // Consistent with ExternalSorter, we do not count this IO towards shuffle write time.
+      // This means that this IO time is not accounted for anywhere; SPARK-3577 will fix this.
+      writeMetrics.incShuffleRecordsWritten(writeMetricsToUse.shuffleRecordsWritten());
+      taskContext.taskMetrics().incDiskBytesSpilled(writeMetricsToUse.shuffleBytesWritten());
+    }
+  }
+
+  /**
+   * Sort and spill the current records in response to memory pressure.
+   */
+  @VisibleForTesting
+  void spill() throws IOException {
+    logger.info("Thread {} spilling sort data of {} to disk ({} {} so far)",
+      Thread.currentThread().getId(),
+      Utils.bytesToString(getMemoryUsage()),
+      spills.size(),
+      spills.size() > 1 ? " times" : " time");
+
+    writeSortedFile(false);
+    final long inMemSorterMemoryUsage = inMemSorter.getMemoryUsage();
+    inMemSorter = null;
+    shuffleMemoryManager.release(inMemSorterMemoryUsage);
+    final long spillSize = freeMemory();
+    taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);
+
+    initializeForWriting();
+  }
+
+  private long getMemoryUsage() {
+    long totalPageSize = 0;
+    for (MemoryBlock page : allocatedPages) {
+      totalPageSize += page.size();
+    }
+    return ((inMemSorter == null) ? 0 : inMemSorter.getMemoryUsage()) + totalPageSize;
+  }
+
+  private void updatePeakMemoryUsed() {
+    long mem = getMemoryUsage();
+    if (mem > peakMemoryUsedBytes) {
+      peakMemoryUsedBytes = mem;
+    }
+  }
+
+  /**
+   * Return the peak memory used so far, in bytes.
+   */
+  long getPeakMemoryUsedBytes() {
+    updatePeakMemoryUsed();
+    return peakMemoryUsedBytes;
+  }
+
+  private long freeMemory() {
+    updatePeakMemoryUsed();
+    long memoryFreed = 0;
+    for (MemoryBlock block : allocatedPages) {
+      taskMemoryManager.freePage(block);
+      shuffleMemoryManager.release(block.size());
+      memoryFreed += block.size();
+    }
+    allocatedPages.clear();
+    currentPage = null;
+    currentPagePosition = -1;
+    freeSpaceInCurrentPage = 0;
+    return memoryFreed;
+  }
+
+  /**
+   * Force all memory and spill files to be deleted; called by shuffle error-handling code.
+   */
+  public void cleanupResources() {
+    freeMemory();
+    for (SpillInfo spill : spills) {
+      if (spill.file.exists() && !spill.file.delete()) {
+        logger.error("Unable to delete spill file {}", spill.file.getPath());
+      }
+    }
+    if (inMemSorter != null) {
+      shuffleMemoryManager.release(inMemSorter.getMemoryUsage());
+      inMemSorter = null;
+    }
+  }
+
+  /**
+   * Checks whether there is enough space to insert an additional record in to the sort pointer
+   * array and grows the array if additional space is required. If the required space cannot be
+   * obtained, then the in-memory data will be spilled to disk.
+   */
+  private void growPointerArrayIfNecessary() throws IOException {
+    assert(inMemSorter != null);
+    if (!inMemSorter.hasSpaceForAnotherRecord()) {
+      logger.debug("Attempting to expand sort pointer array");
+      final long oldPointerArrayMemoryUsage = inMemSorter.getMemoryUsage();
+      final long memoryToGrowPointerArray = oldPointerArrayMemoryUsage * 2;
+      final long memoryAcquired = shuffleMemoryManager.tryToAcquire(memoryToGrowPointerArray);
+      if (memoryAcquired < memoryToGrowPointerArray) {
+        shuffleMemoryManager.release(memoryAcquired);
+        spill();
+      } else {
+        inMemSorter.expandPointerArray();
+        shuffleMemoryManager.release(oldPointerArrayMemoryUsage);
+      }
+    }
+  }
+  
+  /**
+   * Allocates more memory in order to insert an additional record. This will request additional
+   * memory from the {@link ShuffleMemoryManager} and spill if the requested memory can not be
+   * obtained.
+   *
+   * @param requiredSpace the required space in the data page, in bytes, including space for storing
+   *                      the record size. This must be less than or equal to the page size (records
+   *                      that exceed the page size are handled via a different code path which uses
+   *                      special overflow pages).
+   */
+  private void acquireNewPageIfNecessary(int requiredSpace) throws IOException {
+    growPointerArrayIfNecessary();
+    if (requiredSpace > freeSpaceInCurrentPage) {
+      logger.trace("Required space {} is less than free space in current page ({})", requiredSpace,
+        freeSpaceInCurrentPage);
+      // TODO: we should track metrics on the amount of space wasted when we roll over to a new page
+      // without using the free space at the end of the current page. We should also do this for
+      // BytesToBytesMap.
+      if (requiredSpace > pageSizeBytes) {
+        throw new IOException("Required space " + requiredSpace + " is greater than page size (" +
+          pageSizeBytes + ")");
+      } else {
+        final long memoryAcquired = shuffleMemoryManager.tryToAcquire(pageSizeBytes);
+        if (memoryAcquired < pageSizeBytes) {
+          shuffleMemoryManager.release(memoryAcquired);
+          spill();
+          final long memoryAcquiredAfterSpilling = shuffleMemoryManager.tryToAcquire(pageSizeBytes);
+          if (memoryAcquiredAfterSpilling != pageSizeBytes) {
+            shuffleMemoryManager.release(memoryAcquiredAfterSpilling);
+            throw new IOException("Unable to acquire " + pageSizeBytes + " bytes of memory");
+          }
+        }
+        currentPage = taskMemoryManager.allocatePage(pageSizeBytes);
+        currentPagePosition = currentPage.getBaseOffset();
+        freeSpaceInCurrentPage = pageSizeBytes;
+        allocatedPages.add(currentPage);
+      }
+    }
+  }
+
+  /**
+   * Write a record to the shuffle sorter.
+   */
+  public void insertRecord(
+      Object recordBaseObject,
+      long recordBaseOffset,
+      int lengthInBytes,
+      int partitionId) throws IOException {
+
+    if (numRecordsInsertedSinceLastSpill > numElementsForSpillThreshold) {
+      spill();
+    }
+
+    growPointerArrayIfNecessary();
+    // Need 4 bytes to store the record length.
+    final int totalSpaceRequired = lengthInBytes + 4;
+
+    // --- Figure out where to insert the new record ----------------------------------------------
+
+    final MemoryBlock dataPage;
+    long dataPagePosition;
+    boolean useOverflowPage = totalSpaceRequired > pageSizeBytes;
+    if (useOverflowPage) {
+      long overflowPageSize = ByteArrayMethods.roundNumberOfBytesToNearestWord(totalSpaceRequired);
+      // The record is larger than the page size, so allocate a special overflow page just to hold
+      // that record.
+      final long memoryGranted = shuffleMemoryManager.tryToAcquire(overflowPageSize);
+      if (memoryGranted != overflowPageSize) {
+        shuffleMemoryManager.release(memoryGranted);
+        spill();
+        final long memoryGrantedAfterSpill = shuffleMemoryManager.tryToAcquire(overflowPageSize);
+        if (memoryGrantedAfterSpill != overflowPageSize) {
+          shuffleMemoryManager.release(memoryGrantedAfterSpill);
+          throw new IOException("Unable to acquire " + overflowPageSize + " bytes of memory");
+        }
+      }
+      MemoryBlock overflowPage = taskMemoryManager.allocatePage(overflowPageSize);
+      allocatedPages.add(overflowPage);
+      dataPage = overflowPage;
+      dataPagePosition = overflowPage.getBaseOffset();
+    } else {
+      // The record is small enough to fit in a regular data page, but the current page might not
+      // have enough space to hold it (or no pages have been allocated yet).
+      acquireNewPageIfNecessary(totalSpaceRequired);
+      dataPage = currentPage;
+      dataPagePosition = currentPagePosition;
+      // Update bookkeeping information
+      freeSpaceInCurrentPage -= totalSpaceRequired;
+      currentPagePosition += totalSpaceRequired;
+    }
+    final Object dataPageBaseObject = dataPage.getBaseObject();
+
+    final long recordAddress =
+      taskMemoryManager.encodePageNumberAndOffset(dataPage, dataPagePosition);
+    Platform.putInt(dataPageBaseObject, dataPagePosition, lengthInBytes);
+    dataPagePosition += 4;
+    Platform.copyMemory(
+      recordBaseObject, recordBaseOffset, dataPageBaseObject, dataPagePosition, lengthInBytes);
+    assert(inMemSorter != null);
+    inMemSorter.insertRecord(recordAddress, partitionId);
+    numRecordsInsertedSinceLastSpill += 1;
+  }
+
+  /**
+   * Close the sorter, causing any buffered data to be sorted and written out to disk.
+   *
+   * @return metadata for the spill files written by this sorter. If no records were ever inserted
+   *         into this sorter, then this will return an empty array.
+   * @throws IOException
+   */
+  public SpillInfo[] closeAndGetSpills() throws IOException {
+    try {
+      if (inMemSorter != null) {
+        // Do not count the final file towards the spill count.
+        writeSortedFile(true);
+        freeMemory();
+      }
+      return spills.toArray(new SpillInfo[spills.size()]);
+    } catch (IOException e) {
+      cleanupResources();
+      throw e;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f6d06adf/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
new file mode 100644
index 0000000..a8dee6c
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.sort;
+
+import java.util.Comparator;
+
+import org.apache.spark.util.collection.Sorter;
+
+final class ShuffleInMemorySorter {
+
+  private final Sorter<PackedRecordPointer, long[]> sorter;
+  private static final class SortComparator implements Comparator<PackedRecordPointer> {
+    @Override
+    public int compare(PackedRecordPointer left, PackedRecordPointer right) {
+      return left.getPartitionId() - right.getPartitionId();
+    }
+  }
+  private static final SortComparator SORT_COMPARATOR = new SortComparator();
+
+  /**
+   * An array of record pointers and partition ids that have been encoded by
+   * {@link PackedRecordPointer}. The sort operates on this array instead of directly manipulating
+   * records.
+   */
+  private long[] pointerArray;
+
+  /**
+   * The position in the pointer array where new records can be inserted.
+   */
+  private int pointerArrayInsertPosition = 0;
+
+  public ShuffleInMemorySorter(int initialSize) {
+    assert (initialSize > 0);
+    this.pointerArray = new long[initialSize];
+    this.sorter = new Sorter<PackedRecordPointer, long[]>(ShuffleSortDataFormat.INSTANCE);
+  }
+
+  public void expandPointerArray() {
+    final long[] oldArray = pointerArray;
+    // Guard against overflow:
+    final int newLength = oldArray.length * 2 > 0 ? (oldArray.length * 2) : Integer.MAX_VALUE;
+    pointerArray = new long[newLength];
+    System.arraycopy(oldArray, 0, pointerArray, 0, oldArray.length);
+  }
+
+  public boolean hasSpaceForAnotherRecord() {
+    return pointerArrayInsertPosition + 1 < pointerArray.length;
+  }
+
+  public long getMemoryUsage() {
+    return pointerArray.length * 8L;
+  }
+
+  /**
+   * Inserts a record to be sorted.
+   *
+   * @param recordPointer a pointer to the record, encoded by the task memory manager. Due to
+   *                      certain pointer compression techniques used by the sorter, the sort can
+   *                      only operate on pointers that point to locations in the first
+   *                      {@link PackedRecordPointer#MAXIMUM_PAGE_SIZE_BYTES} bytes of a data page.
+   * @param partitionId the partition id, which must be less than or equal to
+   *                    {@link PackedRecordPointer#MAXIMUM_PARTITION_ID}.
+   */
+  public void insertRecord(long recordPointer, int partitionId) {
+    if (!hasSpaceForAnotherRecord()) {
+      if (pointerArray.length == Integer.MAX_VALUE) {
+        throw new IllegalStateException("Sort pointer array has reached maximum size");
+      } else {
+        expandPointerArray();
+      }
+    }
+    pointerArray[pointerArrayInsertPosition] =
+        PackedRecordPointer.packPointer(recordPointer, partitionId);
+    pointerArrayInsertPosition++;
+  }
+
+  /**
+   * An iterator-like class that's used instead of Java's Iterator in order to facilitate inlining.
+   */
+  public static final class ShuffleSorterIterator {
+
+    private final long[] pointerArray;
+    private final int numRecords;
+    final PackedRecordPointer packedRecordPointer = new PackedRecordPointer();
+    private int position = 0;
+
+    public ShuffleSorterIterator(int numRecords, long[] pointerArray) {
+      this.numRecords = numRecords;
+      this.pointerArray = pointerArray;
+    }
+
+    public boolean hasNext() {
+      return position < numRecords;
+    }
+
+    public void loadNext() {
+      packedRecordPointer.set(pointerArray[position]);
+      position++;
+    }
+  }
+
+  /**
+   * Return an iterator over record pointers in sorted order.
+   */
+  public ShuffleSorterIterator getSortedIterator() {
+    sorter.sort(pointerArray, 0, pointerArrayInsertPosition, SORT_COMPARATOR);
+    return new ShuffleSorterIterator(pointerArrayInsertPosition, pointerArray);
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f6d06adf/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleSortDataFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleSortDataFormat.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleSortDataFormat.java
new file mode 100644
index 0000000..8a1e5ae
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleSortDataFormat.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.sort;
+
+import org.apache.spark.util.collection.SortDataFormat;
+
+final class ShuffleSortDataFormat extends SortDataFormat<PackedRecordPointer, long[]> {
+
+  public static final ShuffleSortDataFormat INSTANCE = new ShuffleSortDataFormat();
+
+  private ShuffleSortDataFormat() { }
+
+  @Override
+  public PackedRecordPointer getKey(long[] data, int pos) {
+    // Since we re-use keys, this method shouldn't be called.
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public PackedRecordPointer newKey() {
+    return new PackedRecordPointer();
+  }
+
+  @Override
+  public PackedRecordPointer getKey(long[] data, int pos, PackedRecordPointer reuse) {
+    reuse.set(data[pos]);
+    return reuse;
+  }
+
+  @Override
+  public void swap(long[] data, int pos0, int pos1) {
+    final long temp = data[pos0];
+    data[pos0] = data[pos1];
+    data[pos1] = temp;
+  }
+
+  @Override
+  public void copyElement(long[] src, int srcPos, long[] dst, int dstPos) {
+    dst[dstPos] = src[srcPos];
+  }
+
+  @Override
+  public void copyRange(long[] src, int srcPos, long[] dst, int dstPos, int length) {
+    System.arraycopy(src, srcPos, dst, dstPos, length);
+  }
+
+  @Override
+  public long[] allocate(int length) {
+    return new long[length];
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f6d06adf/core/src/main/java/org/apache/spark/shuffle/sort/SortShuffleFileWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/SortShuffleFileWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/SortShuffleFileWriter.java
deleted file mode 100644
index 656ea04..0000000
--- a/core/src/main/java/org/apache/spark/shuffle/sort/SortShuffleFileWriter.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.shuffle.sort;
-
-import java.io.File;
-import java.io.IOException;
-
-import scala.Product2;
-import scala.collection.Iterator;
-
-import org.apache.spark.annotation.Private;
-import org.apache.spark.TaskContext;
-import org.apache.spark.storage.BlockId;
-
-/**
- * Interface for objects that {@link SortShuffleWriter} uses to write its output files.
- */
-@Private
-public interface SortShuffleFileWriter<K, V> {
-
-  void insertAll(Iterator<Product2<K, V>> records) throws IOException;
-
-  /**
-   * Write all the data added into this shuffle sorter into a file in the disk store. This is
-   * called by the SortShuffleWriter and can go through an efficient path of just concatenating
-   * binary files if we decided to avoid merge-sorting.
-   *
-   * @param blockId block ID to write to. The index file will be blockId.name + ".index".
-   * @param context a TaskContext for a running Spark task, for us to update shuffle metrics.
-   * @return array of lengths, in bytes, of each partition of the file (used by map output tracker)
-   */
-  long[] writePartitionedFile(
-      BlockId blockId,
-      TaskContext context,
-      File outputFile) throws IOException;
-
-  void stop() throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/f6d06adf/core/src/main/java/org/apache/spark/shuffle/sort/SpillInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/SpillInfo.java b/core/src/main/java/org/apache/spark/shuffle/sort/SpillInfo.java
new file mode 100644
index 0000000..df9f7b7
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/SpillInfo.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.sort;
+
+import java.io.File;
+
+import org.apache.spark.storage.TempShuffleBlockId;
+
+/**
+ * Metadata for a block of data written by {@link ShuffleExternalSorter}.
+ */
+final class SpillInfo {
+  final long[] partitionLengths;
+  final File file;
+  final TempShuffleBlockId blockId;
+
+  public SpillInfo(int numPartitions, File file, TempShuffleBlockId blockId) {
+    this.partitionLengths = new long[numPartitions];
+    this.file = file;
+    this.blockId = blockId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f6d06adf/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
new file mode 100644
index 0000000..e8f050c
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
@@ -0,0 +1,489 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.sort;
+
+import javax.annotation.Nullable;
+import java.io.*;
+import java.nio.channels.FileChannel;
+import java.util.Iterator;
+
+import scala.Option;
+import scala.Product2;
+import scala.collection.JavaConverters;
+import scala.collection.immutable.Map;
+import scala.reflect.ClassTag;
+import scala.reflect.ClassTag$;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.io.ByteStreams;
+import com.google.common.io.Closeables;
+import com.google.common.io.Files;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.spark.*;
+import org.apache.spark.annotation.Private;
+import org.apache.spark.executor.ShuffleWriteMetrics;
+import org.apache.spark.io.CompressionCodec;
+import org.apache.spark.io.CompressionCodec$;
+import org.apache.spark.io.LZFCompressionCodec;
+import org.apache.spark.network.util.LimitedInputStream;
+import org.apache.spark.scheduler.MapStatus;
+import org.apache.spark.scheduler.MapStatus$;
+import org.apache.spark.serializer.SerializationStream;
+import org.apache.spark.serializer.Serializer;
+import org.apache.spark.serializer.SerializerInstance;
+import org.apache.spark.shuffle.IndexShuffleBlockResolver;
+import org.apache.spark.shuffle.ShuffleMemoryManager;
+import org.apache.spark.shuffle.ShuffleWriter;
+import org.apache.spark.storage.BlockManager;
+import org.apache.spark.storage.TimeTrackingOutputStream;
+import org.apache.spark.unsafe.Platform;
+import org.apache.spark.unsafe.memory.TaskMemoryManager;
+
+@Private
+public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
+
+  private final Logger logger = LoggerFactory.getLogger(UnsafeShuffleWriter.class);
+
+  private static final ClassTag<Object> OBJECT_CLASS_TAG = ClassTag$.MODULE$.Object();
+
+  @VisibleForTesting
+  static final int INITIAL_SORT_BUFFER_SIZE = 4096;
+
+  private final BlockManager blockManager;
+  private final IndexShuffleBlockResolver shuffleBlockResolver;
+  private final TaskMemoryManager memoryManager;
+  private final ShuffleMemoryManager shuffleMemoryManager;
+  private final SerializerInstance serializer;
+  private final Partitioner partitioner;
+  private final ShuffleWriteMetrics writeMetrics;
+  private final int shuffleId;
+  private final int mapId;
+  private final TaskContext taskContext;
+  private final SparkConf sparkConf;
+  private final boolean transferToEnabled;
+
+  @Nullable private MapStatus mapStatus;
+  @Nullable private ShuffleExternalSorter sorter;
+  private long peakMemoryUsedBytes = 0;
+
+  /** Subclass of ByteArrayOutputStream that exposes `buf` directly. */
+  private static final class MyByteArrayOutputStream extends ByteArrayOutputStream {
+    public MyByteArrayOutputStream(int size) { super(size); }
+    public byte[] getBuf() { return buf; }
+  }
+
+  private MyByteArrayOutputStream serBuffer;
+  private SerializationStream serOutputStream;
+
+  /**
+   * Are we in the process of stopping? Because map tasks can call stop() with success = true
+   * and then call stop() with success = false if they get an exception, we want to make sure
+   * we don't try deleting files, etc twice.
+   */
+  private boolean stopping = false;
+
+  public UnsafeShuffleWriter(
+      BlockManager blockManager,
+      IndexShuffleBlockResolver shuffleBlockResolver,
+      TaskMemoryManager memoryManager,
+      ShuffleMemoryManager shuffleMemoryManager,
+      SerializedShuffleHandle<K, V> handle,
+      int mapId,
+      TaskContext taskContext,
+      SparkConf sparkConf) throws IOException {
+    final int numPartitions = handle.dependency().partitioner().numPartitions();
+    if (numPartitions > SortShuffleManager.MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE()) {
+      throw new IllegalArgumentException(
+        "UnsafeShuffleWriter can only be used for shuffles with at most " +
+          SortShuffleManager.MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE() + " reduce partitions");
+    }
+    this.blockManager = blockManager;
+    this.shuffleBlockResolver = shuffleBlockResolver;
+    this.memoryManager = memoryManager;
+    this.shuffleMemoryManager = shuffleMemoryManager;
+    this.mapId = mapId;
+    final ShuffleDependency<K, V, V> dep = handle.dependency();
+    this.shuffleId = dep.shuffleId();
+    this.serializer = Serializer.getSerializer(dep.serializer()).newInstance();
+    this.partitioner = dep.partitioner();
+    this.writeMetrics = new ShuffleWriteMetrics();
+    taskContext.taskMetrics().shuffleWriteMetrics_$eq(Option.apply(writeMetrics));
+    this.taskContext = taskContext;
+    this.sparkConf = sparkConf;
+    this.transferToEnabled = sparkConf.getBoolean("spark.file.transferTo", true);
+    open();
+  }
+
+  @VisibleForTesting
+  public int maxRecordSizeBytes() {
+    assert(sorter != null);
+    return sorter.maxRecordSizeBytes;
+  }
+
+  private void updatePeakMemoryUsed() {
+    // sorter can be null if this writer is closed
+    if (sorter != null) {
+      long mem = sorter.getPeakMemoryUsedBytes();
+      if (mem > peakMemoryUsedBytes) {
+        peakMemoryUsedBytes = mem;
+      }
+    }
+  }
+
+  /**
+   * Return the peak memory used so far, in bytes.
+   */
+  public long getPeakMemoryUsedBytes() {
+    updatePeakMemoryUsed();
+    return peakMemoryUsedBytes;
+  }
+
+  /**
+   * This convenience method should only be called in test code.
+   */
+  @VisibleForTesting
+  public void write(Iterator<Product2<K, V>> records) throws IOException {
+    write(JavaConverters.asScalaIteratorConverter(records).asScala());
+  }
+
+  @Override
+  public void write(scala.collection.Iterator<Product2<K, V>> records) throws IOException {
+    // Keep track of success so we know if we encountered an exception
+    // We do this rather than a standard try/catch/re-throw to handle
+    // generic throwables.
+    boolean success = false;
+    try {
+      while (records.hasNext()) {
+        insertRecordIntoSorter(records.next());
+      }
+      closeAndWriteOutput();
+      success = true;
+    } finally {
+      if (sorter != null) {
+        try {
+          sorter.cleanupResources();
+        } catch (Exception e) {
+          // Only throw this error if we won't be masking another
+          // error.
+          if (success) {
+            throw e;
+          } else {
+            logger.error("In addition to a failure during writing, we failed during " +
+                         "cleanup.", e);
+          }
+        }
+      }
+    }
+  }
+
+  private void open() throws IOException {
+    assert (sorter == null);
+    sorter = new ShuffleExternalSorter(
+      memoryManager,
+      shuffleMemoryManager,
+      blockManager,
+      taskContext,
+      INITIAL_SORT_BUFFER_SIZE,
+      partitioner.numPartitions(),
+      sparkConf,
+      writeMetrics);
+    serBuffer = new MyByteArrayOutputStream(1024 * 1024);
+    serOutputStream = serializer.serializeStream(serBuffer);
+  }
+
+  @VisibleForTesting
+  void closeAndWriteOutput() throws IOException {
+    assert(sorter != null);
+    updatePeakMemoryUsed();
+    serBuffer = null;
+    serOutputStream = null;
+    final SpillInfo[] spills = sorter.closeAndGetSpills();
+    sorter = null;
+    final long[] partitionLengths;
+    try {
+      partitionLengths = mergeSpills(spills);
+    } finally {
+      for (SpillInfo spill : spills) {
+        if (spill.file.exists() && ! spill.file.delete()) {
+          logger.error("Error while deleting spill file {}", spill.file.getPath());
+        }
+      }
+    }
+    shuffleBlockResolver.writeIndexFile(shuffleId, mapId, partitionLengths);
+    mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
+  }
+
+  @VisibleForTesting
+  void insertRecordIntoSorter(Product2<K, V> record) throws IOException {
+    assert(sorter != null);
+    final K key = record._1();
+    final int partitionId = partitioner.getPartition(key);
+    serBuffer.reset();
+    serOutputStream.writeKey(key, OBJECT_CLASS_TAG);
+    serOutputStream.writeValue(record._2(), OBJECT_CLASS_TAG);
+    serOutputStream.flush();
+
+    final int serializedRecordSize = serBuffer.size();
+    assert (serializedRecordSize > 0);
+
+    sorter.insertRecord(
+      serBuffer.getBuf(), Platform.BYTE_ARRAY_OFFSET, serializedRecordSize, partitionId);
+  }
+
+  @VisibleForTesting
+  void forceSorterToSpill() throws IOException {
+    assert (sorter != null);
+    sorter.spill();
+  }
+
+  /**
+   * Merge zero or more spill files together, choosing the fastest merging strategy based on the
+   * number of spills and the IO compression codec.
+   *
+   * @return the partition lengths in the merged file.
+   */
+  private long[] mergeSpills(SpillInfo[] spills) throws IOException {
+    final File outputFile = shuffleBlockResolver.getDataFile(shuffleId, mapId);
+    final boolean compressionEnabled = sparkConf.getBoolean("spark.shuffle.compress", true);
+    final CompressionCodec compressionCodec = CompressionCodec$.MODULE$.createCodec(sparkConf);
+    final boolean fastMergeEnabled =
+      sparkConf.getBoolean("spark.shuffle.unsafe.fastMergeEnabled", true);
+    final boolean fastMergeIsSupported =
+      !compressionEnabled || compressionCodec instanceof LZFCompressionCodec;
+    try {
+      if (spills.length == 0) {
+        new FileOutputStream(outputFile).close(); // Create an empty file
+        return new long[partitioner.numPartitions()];
+      } else if (spills.length == 1) {
+        // Here, we don't need to perform any metrics updates because the bytes written to this
+        // output file would have already been counted as shuffle bytes written.
+        Files.move(spills[0].file, outputFile);
+        return spills[0].partitionLengths;
+      } else {
+        final long[] partitionLengths;
+        // There are multiple spills to merge, so none of these spill files' lengths were counted
+        // towards our shuffle write count or shuffle write time. If we use the slow merge path,
+        // then the final output file's size won't necessarily be equal to the sum of the spill
+        // files' sizes. To guard against this case, we look at the output file's actual size when
+        // computing shuffle bytes written.
+        //
+        // We allow the individual merge methods to report their own IO times since different merge
+        // strategies use different IO techniques.  We count IO during merge towards the shuffle
+        // shuffle write time, which appears to be consistent with the "not bypassing merge-sort"
+        // branch in ExternalSorter.
+        if (fastMergeEnabled && fastMergeIsSupported) {
+          // Compression is disabled or we are using an IO compression codec that supports
+          // decompression of concatenated compressed streams, so we can perform a fast spill merge
+          // that doesn't need to interpret the spilled bytes.
+          if (transferToEnabled) {
+            logger.debug("Using transferTo-based fast merge");
+            partitionLengths = mergeSpillsWithTransferTo(spills, outputFile);
+          } else {
+            logger.debug("Using fileStream-based fast merge");
+            partitionLengths = mergeSpillsWithFileStream(spills, outputFile, null);
+          }
+        } else {
+          logger.debug("Using slow merge");
+          partitionLengths = mergeSpillsWithFileStream(spills, outputFile, compressionCodec);
+        }
+        // When closing an UnsafeShuffleExternalSorter that has already spilled once but also has
+        // in-memory records, we write out the in-memory records to a file but do not count that
+        // final write as bytes spilled (instead, it's accounted as shuffle write). The merge needs
+        // to be counted as shuffle write, but this will lead to double-counting of the final
+        // SpillInfo's bytes.
+        writeMetrics.decShuffleBytesWritten(spills[spills.length - 1].file.length());
+        writeMetrics.incShuffleBytesWritten(outputFile.length());
+        return partitionLengths;
+      }
+    } catch (IOException e) {
+      if (outputFile.exists() && !outputFile.delete()) {
+        logger.error("Unable to delete output file {}", outputFile.getPath());
+      }
+      throw e;
+    }
+  }
+
+  /**
+   * Merges spill files using Java FileStreams. This code path is slower than the NIO-based merge,
+   * {@link UnsafeShuffleWriter#mergeSpillsWithTransferTo(SpillInfo[], File)}, so it's only used in
+   * cases where the IO compression codec does not support concatenation of compressed data, or in
+   * cases where users have explicitly disabled use of {@code transferTo} in order to work around
+   * kernel bugs.
+   *
+   * @param spills the spills to merge.
+   * @param outputFile the file to write the merged data to.
+   * @param compressionCodec the IO compression codec, or null if shuffle compression is disabled.
+   * @return the partition lengths in the merged file.
+   */
+  private long[] mergeSpillsWithFileStream(
+      SpillInfo[] spills,
+      File outputFile,
+      @Nullable CompressionCodec compressionCodec) throws IOException {
+    assert (spills.length >= 2);
+    final int numPartitions = partitioner.numPartitions();
+    final long[] partitionLengths = new long[numPartitions];
+    final InputStream[] spillInputStreams = new FileInputStream[spills.length];
+    OutputStream mergedFileOutputStream = null;
+
+    boolean threwException = true;
+    try {
+      for (int i = 0; i < spills.length; i++) {
+        spillInputStreams[i] = new FileInputStream(spills[i].file);
+      }
+      for (int partition = 0; partition < numPartitions; partition++) {
+        final long initialFileLength = outputFile.length();
+        mergedFileOutputStream =
+          new TimeTrackingOutputStream(writeMetrics, new FileOutputStream(outputFile, true));
+        if (compressionCodec != null) {
+          mergedFileOutputStream = compressionCodec.compressedOutputStream(mergedFileOutputStream);
+        }
+
+        for (int i = 0; i < spills.length; i++) {
+          final long partitionLengthInSpill = spills[i].partitionLengths[partition];
+          if (partitionLengthInSpill > 0) {
+            InputStream partitionInputStream =
+              new LimitedInputStream(spillInputStreams[i], partitionLengthInSpill);
+            if (compressionCodec != null) {
+              partitionInputStream = compressionCodec.compressedInputStream(partitionInputStream);
+            }
+            ByteStreams.copy(partitionInputStream, mergedFileOutputStream);
+          }
+        }
+        mergedFileOutputStream.flush();
+        mergedFileOutputStream.close();
+        partitionLengths[partition] = (outputFile.length() - initialFileLength);
+      }
+      threwException = false;
+    } finally {
+      // To avoid masking exceptions that caused us to prematurely enter the finally block, only
+      // throw exceptions during cleanup if threwException == false.
+      for (InputStream stream : spillInputStreams) {
+        Closeables.close(stream, threwException);
+      }
+      Closeables.close(mergedFileOutputStream, threwException);
+    }
+    return partitionLengths;
+  }
+
+  /**
+   * Merges spill files by using NIO's transferTo to concatenate spill partitions' bytes.
+   * This is only safe when the IO compression codec and serializer support concatenation of
+   * serialized streams.
+   *
+   * @return the partition lengths in the merged file.
+   */
+  private long[] mergeSpillsWithTransferTo(SpillInfo[] spills, File outputFile) throws IOException {
+    assert (spills.length >= 2);
+    final int numPartitions = partitioner.numPartitions();
+    final long[] partitionLengths = new long[numPartitions];
+    final FileChannel[] spillInputChannels = new FileChannel[spills.length];
+    final long[] spillInputChannelPositions = new long[spills.length];
+    FileChannel mergedFileOutputChannel = null;
+
+    boolean threwException = true;
+    try {
+      for (int i = 0; i < spills.length; i++) {
+        spillInputChannels[i] = new FileInputStream(spills[i].file).getChannel();
+      }
+      // This file needs to opened in append mode in order to work around a Linux kernel bug that
+      // affects transferTo; see SPARK-3948 for more details.
+      mergedFileOutputChannel = new FileOutputStream(outputFile, true).getChannel();
+
+      long bytesWrittenToMergedFile = 0;
+      for (int partition = 0; partition < numPartitions; partition++) {
+        for (int i = 0; i < spills.length; i++) {
+          final long partitionLengthInSpill = spills[i].partitionLengths[partition];
+          long bytesToTransfer = partitionLengthInSpill;
+          final FileChannel spillInputChannel = spillInputChannels[i];
+          final long writeStartTime = System.nanoTime();
+          while (bytesToTransfer > 0) {
+            final long actualBytesTransferred = spillInputChannel.transferTo(
+              spillInputChannelPositions[i],
+              bytesToTransfer,
+              mergedFileOutputChannel);
+            spillInputChannelPositions[i] += actualBytesTransferred;
+            bytesToTransfer -= actualBytesTransferred;
+          }
+          writeMetrics.incShuffleWriteTime(System.nanoTime() - writeStartTime);
+          bytesWrittenToMergedFile += partitionLengthInSpill;
+          partitionLengths[partition] += partitionLengthInSpill;
+        }
+      }
+      // Check the position after transferTo loop to see if it is in the right position and raise an
+      // exception if it is incorrect. The position will not be increased to the expected length
+      // after calling transferTo in kernel version 2.6.32. This issue is described at
+      // https://bugs.openjdk.java.net/browse/JDK-7052359 and SPARK-3948.
+      if (mergedFileOutputChannel.position() != bytesWrittenToMergedFile) {
+        throw new IOException(
+          "Current position " + mergedFileOutputChannel.position() + " does not equal expected " +
+            "position " + bytesWrittenToMergedFile + " after transferTo. Please check your kernel" +
+            " version to see if it is 2.6.32, as there is a kernel bug which will lead to " +
+            "unexpected behavior when using transferTo. You can set spark.file.transferTo=false " +
+            "to disable this NIO feature."
+        );
+      }
+      threwException = false;
+    } finally {
+      // To avoid masking exceptions that caused us to prematurely enter the finally block, only
+      // throw exceptions during cleanup if threwException == false.
+      for (int i = 0; i < spills.length; i++) {
+        assert(spillInputChannelPositions[i] == spills[i].file.length());
+        Closeables.close(spillInputChannels[i], threwException);
+      }
+      Closeables.close(mergedFileOutputChannel, threwException);
+    }
+    return partitionLengths;
+  }
+
+  @Override
+  public Option<MapStatus> stop(boolean success) {
+    try {
+      // Update task metrics from accumulators (null in UnsafeShuffleWriterSuite)
+      Map<String, Accumulator<Object>> internalAccumulators =
+        taskContext.internalMetricsToAccumulators();
+      if (internalAccumulators != null) {
+        internalAccumulators.apply(InternalAccumulator.PEAK_EXECUTION_MEMORY())
+          .add(getPeakMemoryUsedBytes());
+      }
+
+      if (stopping) {
+        return Option.apply(null);
+      } else {
+        stopping = true;
+        if (success) {
+          if (mapStatus == null) {
+            throw new IllegalStateException("Cannot call stop(true) without having called write()");
+          }
+          return Option.apply(mapStatus);
+        } else {
+          // The map task failed, so delete our output data.
+          shuffleBlockResolver.removeDataByMap(shuffleId, mapId);
+          return Option.apply(null);
+        }
+      }
+    } finally {
+      if (sorter != null) {
+        // If sorter is non-null, then this implies that we called stop() in response to an error,
+        // so we need to clean up memory and spill files created by the sorter
+        sorter.cleanupResources();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f6d06adf/core/src/main/java/org/apache/spark/shuffle/unsafe/PackedRecordPointer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/shuffle/unsafe/PackedRecordPointer.java b/core/src/main/java/org/apache/spark/shuffle/unsafe/PackedRecordPointer.java
deleted file mode 100644
index 4ee6a82..0000000
--- a/core/src/main/java/org/apache/spark/shuffle/unsafe/PackedRecordPointer.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.shuffle.unsafe;
-
-/**
- * Wrapper around an 8-byte word that holds a 24-bit partition number and 40-bit record pointer.
- * <p>
- * Within the long, the data is laid out as follows:
- * <pre>
- *   [24 bit partition number][13 bit memory page number][27 bit offset in page]
- * </pre>
- * This implies that the maximum addressable page size is 2^27 bits = 128 megabytes, assuming that
- * our offsets in pages are not 8-byte-word-aligned. Since we have 2^13 pages (based off the
- * 13-bit page numbers assigned by {@link org.apache.spark.unsafe.memory.TaskMemoryManager}), this
- * implies that we can address 2^13 * 128 megabytes = 1 terabyte of RAM per task.
- * <p>
- * Assuming word-alignment would allow for a 1 gigabyte maximum page size, but we leave this
- * optimization to future work as it will require more careful design to ensure that addresses are
- * properly aligned (e.g. by padding records).
- */
-final class PackedRecordPointer {
-
-  static final int MAXIMUM_PAGE_SIZE_BYTES = 1 << 27;  // 128 megabytes
-
-  /**
-   * The maximum partition identifier that can be encoded. Note that partition ids start from 0.
-   */
-  static final int MAXIMUM_PARTITION_ID = (1 << 24) - 1;  // 16777215
-
-  /** Bit mask for the lower 40 bits of a long. */
-  private static final long MASK_LONG_LOWER_40_BITS = (1L << 40) - 1;
-
-  /** Bit mask for the upper 24 bits of a long */
-  private static final long MASK_LONG_UPPER_24_BITS = ~MASK_LONG_LOWER_40_BITS;
-
-  /** Bit mask for the lower 27 bits of a long. */
-  private static final long MASK_LONG_LOWER_27_BITS = (1L << 27) - 1;
-
-  /** Bit mask for the lower 51 bits of a long. */
-  private static final long MASK_LONG_LOWER_51_BITS = (1L << 51) - 1;
-
-  /** Bit mask for the upper 13 bits of a long */
-  private static final long MASK_LONG_UPPER_13_BITS = ~MASK_LONG_LOWER_51_BITS;
-
-  /**
-   * Pack a record address and partition id into a single word.
-   *
-   * @param recordPointer a record pointer encoded by TaskMemoryManager.
-   * @param partitionId a shuffle partition id (maximum value of 2^24).
-   * @return a packed pointer that can be decoded using the {@link PackedRecordPointer} class.
-   */
-  public static long packPointer(long recordPointer, int partitionId) {
-    assert (partitionId <= MAXIMUM_PARTITION_ID);
-    // Note that without word alignment we can address 2^27 bytes = 128 megabytes per page.
-    // Also note that this relies on some internals of how TaskMemoryManager encodes its addresses.
-    final long pageNumber = (recordPointer & MASK_LONG_UPPER_13_BITS) >>> 24;
-    final long compressedAddress = pageNumber | (recordPointer & MASK_LONG_LOWER_27_BITS);
-    return (((long) partitionId) << 40) | compressedAddress;
-  }
-
-  private long packedRecordPointer;
-
-  public void set(long packedRecordPointer) {
-    this.packedRecordPointer = packedRecordPointer;
-  }
-
-  public int getPartitionId() {
-    return (int) ((packedRecordPointer & MASK_LONG_UPPER_24_BITS) >>> 40);
-  }
-
-  public long getRecordPointer() {
-    final long pageNumber = (packedRecordPointer << 24) & MASK_LONG_UPPER_13_BITS;
-    final long offsetInPage = packedRecordPointer & MASK_LONG_LOWER_27_BITS;
-    return pageNumber | offsetInPage;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/f6d06adf/core/src/main/java/org/apache/spark/shuffle/unsafe/SpillInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/shuffle/unsafe/SpillInfo.java b/core/src/main/java/org/apache/spark/shuffle/unsafe/SpillInfo.java
deleted file mode 100644
index 7bac0dc..0000000
--- a/core/src/main/java/org/apache/spark/shuffle/unsafe/SpillInfo.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.shuffle.unsafe;
-
-import java.io.File;
-
-import org.apache.spark.storage.TempShuffleBlockId;
-
-/**
- * Metadata for a block of data written by {@link UnsafeShuffleExternalSorter}.
- */
-final class SpillInfo {
-  final long[] partitionLengths;
-  final File file;
-  final TempShuffleBlockId blockId;
-
-  public SpillInfo(int numPartitions, File file, TempShuffleBlockId blockId) {
-    this.partitionLengths = new long[numPartitions];
-    this.file = file;
-    this.blockId = blockId;
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org