You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/07/22 20:58:58 UTC

git commit: SPARK-2047: Introduce an in-mem Sorter, and use it to reduce mem usage

Repository: spark
Updated Branches:
  refs/heads/master 140787173 -> 85d3596e6


SPARK-2047: Introduce an in-mem Sorter, and use it to reduce mem usage

### Why and what?
Currently, the AppendOnlyMap performs an "in-place" sort by converting its array of [key, value, key, value] pairs into a an array of [(key, value), (key, value)] pairs. However, this causes us to allocate many Tuple2 objects, which come at a nontrivial overhead.

This patch adds a Sorter API, intended for in memory sorts, which simply ports the Android Timsort implementation (available under Apache v2) and abstracts the interface in a way which introduces no more than 1 virtual function invocation of overhead at each abstraction point.

Please compare our port of the Android Timsort sort with the original implementation: http://www.diffchecker.com/wiwrykcl

### Memory implications
An AppendOnlyMap contains N kv pairs, which results in roughly 2N elements within its underlying array. Each of these elements is 4 bytes wide in a [compressed OOPS](https://wikis.oracle.com/display/HotSpotInternals/CompressedOops) system, which is the default.

Today's approach immediately allocates N Tuple2 objects, which take up 24N bytes in total (exposed via YourKit), and undergoes a Java sort. The Java 6 version immediately copies the entire array (4N bytes here), while the Java 7 version has a worst-case allocation of half the array (2N bytes).
This results in a worst-case sorting overhead of 24N + 2N = 26N bytes (for Java 7).

The Sorter does not require allocating any tuples, but since it uses Timsort, it may copy up to half the entire array in the worst case.
This results in a worst-case sorting overhead of 4N bytes.

Thus, we have reduced the worst-case overhead of the sort by roughly 22 bytes times the number of elements.

### Performance implications
As the destructiveSortedIterator is used for spilling in an ExternalAppendOnlyMap, the purpose of this patch is to provide stability by reducing memory usage rather than improve performance. However, because it implements Timsort, it also brings a substantial performance boost over our prior implementation.

Here are the results of a microbenchmark that sorted 25 million, randomly distributed (Float, Int) pairs. The Java Arrays.sort() tests were run **only on the keys**, and thus moved less data. Our current implementation is called "Tuple-sort using Arrays.sort()" while the new implementation is "KV-array using Sorter".

<table>
<tr><th>Test</th><th>First run (JDK6)</th><th>Average of 10 (JDK6)</th><th>First run (JDK7)</th><th>Average of 10 (JDK7)</th></tr>
<tr><td>primitive Arrays.sort()</td><td>3216 ms</td><td>1190 ms</td><td>2724 ms</td><td>131 ms (!!)</td></tr>
<tr><td>Arrays.sort()</td><td>18564 ms</td><td>2006 ms</td><td>13201 ms</td><td>878 ms</td></tr>
<tr><td>Tuple-sort using Arrays.sort()</td><td>31813 ms</td><td>3550 ms</td><td>20990 ms</td><td>1919 ms</td></tr>
<tr><td><b>KV-array using Sorter</b></td><td></td><td></td><td><b>15020 ms</b></td><td><b>834 ms</b></td></tr>
</table>

The results show that this Sorter performs exactly as expected (after the first run) -- it is as fast as the Java 7 Arrays.sort() (which shares the same algorithm), but is significantly faster than the Tuple-sort on Java 6 or 7.

In short, this patch should significantly improve performance for users running either Java 6 or 7.

Author: Aaron Davidson <aa...@databricks.com>

Closes #1502 from aarondav/sort and squashes the following commits:

652d936 [Aaron Davidson] Update license, move Sorter to java src
a7b5b1c [Aaron Davidson] fix licenses
5c0efaf [Aaron Davidson] Update tmpLength
ec395c8 [Aaron Davidson] Ignore benchmark (again) and fix docs
034bf10 [Aaron Davidson] Change to Apache v2 Timsort
b97296c [Aaron Davidson] Don't try to run benchmark on Jenkins + private[spark]
6307338 [Aaron Davidson] SPARK-2047: Introduce an in-mem Sorter, and use it to reduce mem usage


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/85d3596e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/85d3596e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/85d3596e

Branch: refs/heads/master
Commit: 85d3596e65512d481f4be54df100be6bdc9c8e29
Parents: 1407871
Author: Aaron Davidson <aa...@databricks.com>
Authored: Tue Jul 22 11:58:53 2014 -0700
Committer: Matei Zaharia <ma...@databricks.com>
Committed: Tue Jul 22 11:58:53 2014 -0700

----------------------------------------------------------------------
 LICENSE                                         |  20 +-
 .../apache/spark/util/collection/Sorter.java    | 915 +++++++++++++++++++
 .../spark/util/collection/AppendOnlyMap.scala   |  15 +-
 .../util/collection/ExternalAppendOnlyMap.scala |  35 +-
 .../spark/util/collection/SortDataFormat.scala  |  94 ++
 .../util/collection/AppendOnlyMapSuite.scala    |   8 +-
 .../spark/util/collection/SorterSuite.scala     | 167 ++++
 7 files changed, 1222 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/85d3596e/LICENSE
----------------------------------------------------------------------
diff --git a/LICENSE b/LICENSE
index 383f079..65e1f48 100644
--- a/LICENSE
+++ b/LICENSE
@@ -442,7 +442,7 @@ Written by Pavel Binko, Dino Ferrero Merlino, Wolfgang Hoschek, Tony Johnson, An
 
 
 ========================================================================
-Fo SnapTree:
+For SnapTree:
 ========================================================================
 
 SNAPTREE LICENSE
@@ -483,6 +483,24 @@ SUCH DAMAGE.
 
 
 ========================================================================
+For Timsort (core/src/main/java/org/apache/spark/util/collection/Sorter.java):
+========================================================================
+Copyright (C) 2008 The Android Open Source Project
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+
+========================================================================
 BSD-style licenses
 ========================================================================
 

http://git-wip-us.apache.org/repos/asf/spark/blob/85d3596e/core/src/main/java/org/apache/spark/util/collection/Sorter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/util/collection/Sorter.java b/core/src/main/java/org/apache/spark/util/collection/Sorter.java
new file mode 100644
index 0000000..64ad18c
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/util/collection/Sorter.java
@@ -0,0 +1,915 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection;
+
+import java.util.Comparator;
+
+/**
+ * A port of the Android Timsort class, which utilizes a "stable, adaptive, iterative mergesort."
+ * See the method comment on sort() for more details.
+ *
+ * This has been kept in Java with the original style in order to match very closely with the
+ * Anroid source code, and thus be easy to verify correctness.
+ *
+ * The purpose of the port is to generalize the interface to the sort to accept input data formats
+ * besides simple arrays where every element is sorted individually. For instance, the AppendOnlyMap
+ * uses this to sort an Array with alternating elements of the form [key, value, key, value].
+ * This generalization comes with minimal overhead -- see SortDataFormat for more information.
+ */
+class Sorter<K, Buffer> {
+
+  /**
+   * This is the minimum sized sequence that will be merged.  Shorter
+   * sequences will be lengthened by calling binarySort.  If the entire
+   * array is less than this length, no merges will be performed.
+   *
+   * This constant should be a power of two.  It was 64 in Tim Peter's C
+   * implementation, but 32 was empirically determined to work better in
+   * this implementation.  In the unlikely event that you set this constant
+   * to be a number that's not a power of two, you'll need to change the
+   * minRunLength computation.
+   *
+   * If you decrease this constant, you must change the stackLen
+   * computation in the TimSort constructor, or you risk an
+   * ArrayOutOfBounds exception.  See listsort.txt for a discussion
+   * of the minimum stack length required as a function of the length
+   * of the array being sorted and the minimum merge sequence length.
+   */
+  private static final int MIN_MERGE = 32;
+
+  private final SortDataFormat<K, Buffer> s;
+
+  public Sorter(SortDataFormat<K, Buffer> sortDataFormat) {
+    this.s = sortDataFormat;
+  }
+
+  /**
+   * A stable, adaptive, iterative mergesort that requires far fewer than
+   * n lg(n) comparisons when running on partially sorted arrays, while
+   * offering performance comparable to a traditional mergesort when run
+   * on random arrays.  Like all proper mergesorts, this sort is stable and
+   * runs O(n log n) time (worst case).  In the worst case, this sort requires
+   * temporary storage space for n/2 object references; in the best case,
+   * it requires only a small constant amount of space.
+   *
+   * This implementation was adapted from Tim Peters's list sort for
+   * Python, which is described in detail here:
+   *
+   *   http://svn.python.org/projects/python/trunk/Objects/listsort.txt
+   *
+   * Tim's C code may be found here:
+   *
+   *   http://svn.python.org/projects/python/trunk/Objects/listobject.c
+   *
+   * The underlying techniques are described in this paper (and may have
+   * even earlier origins):
+   *
+   *  "Optimistic Sorting and Information Theoretic Complexity"
+   *  Peter McIlroy
+   *  SODA (Fourth Annual ACM-SIAM Symposium on Discrete Algorithms),
+   *  pp 467-474, Austin, Texas, 25-27 January 1993.
+   *
+   * While the API to this class consists solely of static methods, it is
+   * (privately) instantiable; a TimSort instance holds the state of an ongoing
+   * sort, assuming the input array is large enough to warrant the full-blown
+   * TimSort. Small arrays are sorted in place, using a binary insertion sort.
+   *
+   * @author Josh Bloch
+   */
+  void sort(Buffer a, int lo, int hi, Comparator<? super K> c) {
+    assert c != null;
+
+    int nRemaining  = hi - lo;
+    if (nRemaining < 2)
+      return;  // Arrays of size 0 and 1 are always sorted
+
+    // If array is small, do a "mini-TimSort" with no merges
+    if (nRemaining < MIN_MERGE) {
+      int initRunLen = countRunAndMakeAscending(a, lo, hi, c);
+      binarySort(a, lo, hi, lo + initRunLen, c);
+      return;
+    }
+
+    /**
+     * March over the array once, left to right, finding natural runs,
+     * extending short natural runs to minRun elements, and merging runs
+     * to maintain stack invariant.
+     */
+    SortState sortState = new SortState(a, c, hi - lo);
+    int minRun = minRunLength(nRemaining);
+    do {
+      // Identify next run
+      int runLen = countRunAndMakeAscending(a, lo, hi, c);
+
+      // If run is short, extend to min(minRun, nRemaining)
+      if (runLen < minRun) {
+        int force = nRemaining <= minRun ? nRemaining : minRun;
+        binarySort(a, lo, lo + force, lo + runLen, c);
+        runLen = force;
+      }
+
+      // Push run onto pending-run stack, and maybe merge
+      sortState.pushRun(lo, runLen);
+      sortState.mergeCollapse();
+
+      // Advance to find next run
+      lo += runLen;
+      nRemaining -= runLen;
+    } while (nRemaining != 0);
+
+    // Merge all remaining runs to complete sort
+    assert lo == hi;
+    sortState.mergeForceCollapse();
+    assert sortState.stackSize == 1;
+  }
+
+  /**
+   * Sorts the specified portion of the specified array using a binary
+   * insertion sort.  This is the best method for sorting small numbers
+   * of elements.  It requires O(n log n) compares, but O(n^2) data
+   * movement (worst case).
+   *
+   * If the initial part of the specified range is already sorted,
+   * this method can take advantage of it: the method assumes that the
+   * elements from index {@code lo}, inclusive, to {@code start},
+   * exclusive are already sorted.
+   *
+   * @param a the array in which a range is to be sorted
+   * @param lo the index of the first element in the range to be sorted
+   * @param hi the index after the last element in the range to be sorted
+   * @param start the index of the first element in the range that is
+   *        not already known to be sorted ({@code lo <= start <= hi})
+   * @param c comparator to used for the sort
+   */
+  @SuppressWarnings("fallthrough")
+  private void binarySort(Buffer a, int lo, int hi, int start, Comparator<? super K> c) {
+    assert lo <= start && start <= hi;
+    if (start == lo)
+      start++;
+
+    Buffer pivotStore = s.allocate(1);
+    for ( ; start < hi; start++) {
+      s.copyElement(a, start, pivotStore, 0);
+      K pivot = s.getKey(pivotStore, 0);
+
+      // Set left (and right) to the index where a[start] (pivot) belongs
+      int left = lo;
+      int right = start;
+      assert left <= right;
+      /*
+       * Invariants:
+       *   pivot >= all in [lo, left).
+       *   pivot <  all in [right, start).
+       */
+      while (left < right) {
+        int mid = (left + right) >>> 1;
+        if (c.compare(pivot, s.getKey(a, mid)) < 0)
+          right = mid;
+        else
+          left = mid + 1;
+      }
+      assert left == right;
+
+      /*
+       * The invariants still hold: pivot >= all in [lo, left) and
+       * pivot < all in [left, start), so pivot belongs at left.  Note
+       * that if there are elements equal to pivot, left points to the
+       * first slot after them -- that's why this sort is stable.
+       * Slide elements over to make room for pivot.
+       */
+      int n = start - left;  // The number of elements to move
+      // Switch is just an optimization for arraycopy in default case
+      switch (n) {
+        case 2:  s.copyElement(a, left + 1, a, left + 2);
+        case 1:  s.copyElement(a, left, a, left + 1);
+          break;
+        default: s.copyRange(a, left, a, left + 1, n);
+      }
+      s.copyElement(pivotStore, 0, a, left);
+    }
+  }
+
+  /**
+   * Returns the length of the run beginning at the specified position in
+   * the specified array and reverses the run if it is descending (ensuring
+   * that the run will always be ascending when the method returns).
+   *
+   * A run is the longest ascending sequence with:
+   *
+   *    a[lo] <= a[lo + 1] <= a[lo + 2] <= ...
+   *
+   * or the longest descending sequence with:
+   *
+   *    a[lo] >  a[lo + 1] >  a[lo + 2] >  ...
+   *
+   * For its intended use in a stable mergesort, the strictness of the
+   * definition of "descending" is needed so that the call can safely
+   * reverse a descending sequence without violating stability.
+   *
+   * @param a the array in which a run is to be counted and possibly reversed
+   * @param lo index of the first element in the run
+   * @param hi index after the last element that may be contained in the run.
+  It is required that {@code lo < hi}.
+   * @param c the comparator to used for the sort
+   * @return  the length of the run beginning at the specified position in
+   *          the specified array
+   */
+  private int countRunAndMakeAscending(Buffer a, int lo, int hi, Comparator<? super K> c) {
+    assert lo < hi;
+    int runHi = lo + 1;
+    if (runHi == hi)
+      return 1;
+
+    // Find end of run, and reverse range if descending
+    if (c.compare(s.getKey(a, runHi++), s.getKey(a, lo)) < 0) { // Descending
+      while (runHi < hi && c.compare(s.getKey(a, runHi), s.getKey(a, runHi - 1)) < 0)
+        runHi++;
+      reverseRange(a, lo, runHi);
+    } else {                              // Ascending
+      while (runHi < hi && c.compare(s.getKey(a, runHi), s.getKey(a, runHi - 1)) >= 0)
+        runHi++;
+    }
+
+    return runHi - lo;
+  }
+
+  /**
+   * Reverse the specified range of the specified array.
+   *
+   * @param a the array in which a range is to be reversed
+   * @param lo the index of the first element in the range to be reversed
+   * @param hi the index after the last element in the range to be reversed
+   */
+  private void reverseRange(Buffer a, int lo, int hi) {
+    hi--;
+    while (lo < hi) {
+      s.swap(a, lo, hi);
+      lo++;
+      hi--;
+    }
+  }
+
+  /**
+   * Returns the minimum acceptable run length for an array of the specified
+   * length. Natural runs shorter than this will be extended with
+   * {@link #binarySort}.
+   *
+   * Roughly speaking, the computation is:
+   *
+   *  If n < MIN_MERGE, return n (it's too small to bother with fancy stuff).
+   *  Else if n is an exact power of 2, return MIN_MERGE/2.
+   *  Else return an int k, MIN_MERGE/2 <= k <= MIN_MERGE, such that n/k
+   *   is close to, but strictly less than, an exact power of 2.
+   *
+   * For the rationale, see listsort.txt.
+   *
+   * @param n the length of the array to be sorted
+   * @return the length of the minimum run to be merged
+   */
+  private int minRunLength(int n) {
+    assert n >= 0;
+    int r = 0;      // Becomes 1 if any 1 bits are shifted off
+    while (n >= MIN_MERGE) {
+      r |= (n & 1);
+      n >>= 1;
+    }
+    return n + r;
+  }
+
+  private class SortState {
+
+    /**
+     * The Buffer being sorted.
+     */
+    private final Buffer a;
+
+    /**
+     * Length of the sort Buffer.
+     */
+    private final int aLength;
+
+    /**
+     * The comparator for this sort.
+     */
+    private final Comparator<? super K> c;
+
+    /**
+     * When we get into galloping mode, we stay there until both runs win less
+     * often than MIN_GALLOP consecutive times.
+     */
+    private static final int  MIN_GALLOP = 7;
+
+    /**
+     * This controls when we get *into* galloping mode.  It is initialized
+     * to MIN_GALLOP.  The mergeLo and mergeHi methods nudge it higher for
+     * random data, and lower for highly structured data.
+     */
+    private int minGallop = MIN_GALLOP;
+
+    /**
+     * Maximum initial size of tmp array, which is used for merging.  The array
+     * can grow to accommodate demand.
+     *
+     * Unlike Tim's original C version, we do not allocate this much storage
+     * when sorting smaller arrays.  This change was required for performance.
+     */
+    private static final int INITIAL_TMP_STORAGE_LENGTH = 256;
+
+    /**
+     * Temp storage for merges.
+     */
+    private Buffer tmp; // Actual runtime type will be Object[], regardless of T
+
+    /**
+     * Length of the temp storage.
+     */
+    private int tmpLength = 0;
+
+    /**
+     * A stack of pending runs yet to be merged.  Run i starts at
+     * address base[i] and extends for len[i] elements.  It's always
+     * true (so long as the indices are in bounds) that:
+     *
+     *     runBase[i] + runLen[i] == runBase[i + 1]
+     *
+     * so we could cut the storage for this, but it's a minor amount,
+     * and keeping all the info explicit simplifies the code.
+     */
+    private int stackSize = 0;  // Number of pending runs on stack
+    private final int[] runBase;
+    private final int[] runLen;
+
+    /**
+     * Creates a TimSort instance to maintain the state of an ongoing sort.
+     *
+     * @param a the array to be sorted
+     * @param c the comparator to determine the order of the sort
+     */
+    private SortState(Buffer a, Comparator<? super K> c, int len) {
+      this.aLength = len;
+      this.a = a;
+      this.c = c;
+
+      // Allocate temp storage (which may be increased later if necessary)
+      tmpLength = len < 2 * INITIAL_TMP_STORAGE_LENGTH ? len >>> 1 : INITIAL_TMP_STORAGE_LENGTH;
+      tmp = s.allocate(tmpLength);
+
+      /*
+       * Allocate runs-to-be-merged stack (which cannot be expanded).  The
+       * stack length requirements are described in listsort.txt.  The C
+       * version always uses the same stack length (85), but this was
+       * measured to be too expensive when sorting "mid-sized" arrays (e.g.,
+       * 100 elements) in Java.  Therefore, we use smaller (but sufficiently
+       * large) stack lengths for smaller arrays.  The "magic numbers" in the
+       * computation below must be changed if MIN_MERGE is decreased.  See
+       * the MIN_MERGE declaration above for more information.
+       */
+      int stackLen = (len <    120  ?  5 :
+                      len <   1542  ? 10 :
+                      len < 119151  ? 19 : 40);
+      runBase = new int[stackLen];
+      runLen = new int[stackLen];
+    }
+
+    /**
+     * Pushes the specified run onto the pending-run stack.
+     *
+     * @param runBase index of the first element in the run
+     * @param runLen  the number of elements in the run
+     */
+    private void pushRun(int runBase, int runLen) {
+      this.runBase[stackSize] = runBase;
+      this.runLen[stackSize] = runLen;
+      stackSize++;
+    }
+
+    /**
+     * Examines the stack of runs waiting to be merged and merges adjacent runs
+     * until the stack invariants are reestablished:
+     *
+     *     1. runLen[i - 3] > runLen[i - 2] + runLen[i - 1]
+     *     2. runLen[i - 2] > runLen[i - 1]
+     *
+     * This method is called each time a new run is pushed onto the stack,
+     * so the invariants are guaranteed to hold for i < stackSize upon
+     * entry to the method.
+     */
+    private void mergeCollapse() {
+      while (stackSize > 1) {
+        int n = stackSize - 2;
+        if (n > 0 && runLen[n-1] <= runLen[n] + runLen[n+1]) {
+          if (runLen[n - 1] < runLen[n + 1])
+            n--;
+          mergeAt(n);
+        } else if (runLen[n] <= runLen[n + 1]) {
+          mergeAt(n);
+        } else {
+          break; // Invariant is established
+        }
+      }
+    }
+
+    /**
+     * Merges all runs on the stack until only one remains.  This method is
+     * called once, to complete the sort.
+     */
+    private void mergeForceCollapse() {
+      while (stackSize > 1) {
+        int n = stackSize - 2;
+        if (n > 0 && runLen[n - 1] < runLen[n + 1])
+          n--;
+        mergeAt(n);
+      }
+    }
+
+    /**
+     * Merges the two runs at stack indices i and i+1.  Run i must be
+     * the penultimate or antepenultimate run on the stack.  In other words,
+     * i must be equal to stackSize-2 or stackSize-3.
+     *
+     * @param i stack index of the first of the two runs to merge
+     */
+    private void mergeAt(int i) {
+      assert stackSize >= 2;
+      assert i >= 0;
+      assert i == stackSize - 2 || i == stackSize - 3;
+
+      int base1 = runBase[i];
+      int len1 = runLen[i];
+      int base2 = runBase[i + 1];
+      int len2 = runLen[i + 1];
+      assert len1 > 0 && len2 > 0;
+      assert base1 + len1 == base2;
+
+      /*
+       * Record the length of the combined runs; if i is the 3rd-last
+       * run now, also slide over the last run (which isn't involved
+       * in this merge).  The current run (i+1) goes away in any case.
+       */
+      runLen[i] = len1 + len2;
+      if (i == stackSize - 3) {
+        runBase[i + 1] = runBase[i + 2];
+        runLen[i + 1] = runLen[i + 2];
+      }
+      stackSize--;
+
+      /*
+       * Find where the first element of run2 goes in run1. Prior elements
+       * in run1 can be ignored (because they're already in place).
+       */
+      int k = gallopRight(s.getKey(a, base2), a, base1, len1, 0, c);
+      assert k >= 0;
+      base1 += k;
+      len1 -= k;
+      if (len1 == 0)
+        return;
+
+      /*
+       * Find where the last element of run1 goes in run2. Subsequent elements
+       * in run2 can be ignored (because they're already in place).
+       */
+      len2 = gallopLeft(s.getKey(a, base1 + len1 - 1), a, base2, len2, len2 - 1, c);
+      assert len2 >= 0;
+      if (len2 == 0)
+        return;
+
+      // Merge remaining runs, using tmp array with min(len1, len2) elements
+      if (len1 <= len2)
+        mergeLo(base1, len1, base2, len2);
+      else
+        mergeHi(base1, len1, base2, len2);
+    }
+
+    /**
+     * Locates the position at which to insert the specified key into the
+     * specified sorted range; if the range contains an element equal to key,
+     * returns the index of the leftmost equal element.
+     *
+     * @param key the key whose insertion point to search for
+     * @param a the array in which to search
+     * @param base the index of the first element in the range
+     * @param len the length of the range; must be > 0
+     * @param hint the index at which to begin the search, 0 <= hint < n.
+     *     The closer hint is to the result, the faster this method will run.
+     * @param c the comparator used to order the range, and to search
+     * @return the int k,  0 <= k <= n such that a[b + k - 1] < key <= a[b + k],
+     *    pretending that a[b - 1] is minus infinity and a[b + n] is infinity.
+     *    In other words, key belongs at index b + k; or in other words,
+     *    the first k elements of a should precede key, and the last n - k
+     *    should follow it.
+     */
+    private int gallopLeft(K key, Buffer a, int base, int len, int hint, Comparator<? super K> c) {
+      assert len > 0 && hint >= 0 && hint < len;
+      int lastOfs = 0;
+      int ofs = 1;
+      if (c.compare(key, s.getKey(a, base + hint)) > 0) {
+        // Gallop right until a[base+hint+lastOfs] < key <= a[base+hint+ofs]
+        int maxOfs = len - hint;
+        while (ofs < maxOfs && c.compare(key, s.getKey(a, base + hint + ofs)) > 0) {
+          lastOfs = ofs;
+          ofs = (ofs << 1) + 1;
+          if (ofs <= 0)   // int overflow
+            ofs = maxOfs;
+        }
+        if (ofs > maxOfs)
+          ofs = maxOfs;
+
+        // Make offsets relative to base
+        lastOfs += hint;
+        ofs += hint;
+      } else { // key <= a[base + hint]
+        // Gallop left until a[base+hint-ofs] < key <= a[base+hint-lastOfs]
+        final int maxOfs = hint + 1;
+        while (ofs < maxOfs && c.compare(key, s.getKey(a, base + hint - ofs)) <= 0) {
+          lastOfs = ofs;
+          ofs = (ofs << 1) + 1;
+          if (ofs <= 0)   // int overflow
+            ofs = maxOfs;
+        }
+        if (ofs > maxOfs)
+          ofs = maxOfs;
+
+        // Make offsets relative to base
+        int tmp = lastOfs;
+        lastOfs = hint - ofs;
+        ofs = hint - tmp;
+      }
+      assert -1 <= lastOfs && lastOfs < ofs && ofs <= len;
+
+      /*
+       * Now a[base+lastOfs] < key <= a[base+ofs], so key belongs somewhere
+       * to the right of lastOfs but no farther right than ofs.  Do a binary
+       * search, with invariant a[base + lastOfs - 1] < key <= a[base + ofs].
+       */
+      lastOfs++;
+      while (lastOfs < ofs) {
+        int m = lastOfs + ((ofs - lastOfs) >>> 1);
+
+        if (c.compare(key, s.getKey(a, base + m)) > 0)
+          lastOfs = m + 1;  // a[base + m] < key
+        else
+          ofs = m;          // key <= a[base + m]
+      }
+      assert lastOfs == ofs;    // so a[base + ofs - 1] < key <= a[base + ofs]
+      return ofs;
+    }
+
+    /**
+     * Like gallopLeft, except that if the range contains an element equal to
+     * key, gallopRight returns the index after the rightmost equal element.
+     *
+     * @param key the key whose insertion point to search for
+     * @param a the array in which to search
+     * @param base the index of the first element in the range
+     * @param len the length of the range; must be > 0
+     * @param hint the index at which to begin the search, 0 <= hint < n.
+     *     The closer hint is to the result, the faster this method will run.
+     * @param c the comparator used to order the range, and to search
+     * @return the int k,  0 <= k <= n such that a[b + k - 1] <= key < a[b + k]
+     */
+    private int gallopRight(K key, Buffer a, int base, int len, int hint, Comparator<? super K> c) {
+      assert len > 0 && hint >= 0 && hint < len;
+
+      int ofs = 1;
+      int lastOfs = 0;
+      if (c.compare(key, s.getKey(a, base + hint)) < 0) {
+        // Gallop left until a[b+hint - ofs] <= key < a[b+hint - lastOfs]
+        int maxOfs = hint + 1;
+        while (ofs < maxOfs && c.compare(key, s.getKey(a, base + hint - ofs)) < 0) {
+          lastOfs = ofs;
+          ofs = (ofs << 1) + 1;
+          if (ofs <= 0)   // int overflow
+            ofs = maxOfs;
+        }
+        if (ofs > maxOfs)
+          ofs = maxOfs;
+
+        // Make offsets relative to b
+        int tmp = lastOfs;
+        lastOfs = hint - ofs;
+        ofs = hint - tmp;
+      } else { // a[b + hint] <= key
+        // Gallop right until a[b+hint + lastOfs] <= key < a[b+hint + ofs]
+        int maxOfs = len - hint;
+        while (ofs < maxOfs && c.compare(key, s.getKey(a, base + hint + ofs)) >= 0) {
+          lastOfs = ofs;
+          ofs = (ofs << 1) + 1;
+          if (ofs <= 0)   // int overflow
+            ofs = maxOfs;
+        }
+        if (ofs > maxOfs)
+          ofs = maxOfs;
+
+        // Make offsets relative to b
+        lastOfs += hint;
+        ofs += hint;
+      }
+      assert -1 <= lastOfs && lastOfs < ofs && ofs <= len;
+
+      /*
+       * Now a[b + lastOfs] <= key < a[b + ofs], so key belongs somewhere to
+       * the right of lastOfs but no farther right than ofs.  Do a binary
+       * search, with invariant a[b + lastOfs - 1] <= key < a[b + ofs].
+       */
+      lastOfs++;
+      while (lastOfs < ofs) {
+        int m = lastOfs + ((ofs - lastOfs) >>> 1);
+
+        if (c.compare(key, s.getKey(a, base + m)) < 0)
+          ofs = m;          // key < a[b + m]
+        else
+          lastOfs = m + 1;  // a[b + m] <= key
+      }
+      assert lastOfs == ofs;    // so a[b + ofs - 1] <= key < a[b + ofs]
+      return ofs;
+    }
+
+    /**
+     * Merges two adjacent runs in place, in a stable fashion.  The first
+     * element of the first run must be greater than the first element of the
+     * second run (a[base1] > a[base2]), and the last element of the first run
+     * (a[base1 + len1-1]) must be greater than all elements of the second run.
+     *
+     * For performance, this method should be called only when len1 <= len2;
+     * its twin, mergeHi should be called if len1 >= len2.  (Either method
+     * may be called if len1 == len2.)
+     *
+     * @param base1 index of first element in first run to be merged
+     * @param len1  length of first run to be merged (must be > 0)
+     * @param base2 index of first element in second run to be merged
+     *        (must be aBase + aLen)
+     * @param len2  length of second run to be merged (must be > 0)
+     */
+    private void mergeLo(int base1, int len1, int base2, int len2) {
+      assert len1 > 0 && len2 > 0 && base1 + len1 == base2;
+
+      // Copy first run into temp array
+      Buffer a = this.a; // For performance
+      Buffer tmp = ensureCapacity(len1);
+      s.copyRange(a, base1, tmp, 0, len1);
+
+      int cursor1 = 0;       // Indexes into tmp array
+      int cursor2 = base2;   // Indexes int a
+      int dest = base1;      // Indexes int a
+
+      // Move first element of second run and deal with degenerate cases
+      s.copyElement(a, cursor2++, a, dest++);
+      if (--len2 == 0) {
+        s.copyRange(tmp, cursor1, a, dest, len1);
+        return;
+      }
+      if (len1 == 1) {
+        s.copyRange(a, cursor2, a, dest, len2);
+        s.copyElement(tmp, cursor1, a, dest + len2); // Last elt of run 1 to end of merge
+        return;
+      }
+
+      Comparator<? super K> c = this.c;  // Use local variable for performance
+      int minGallop = this.minGallop;    //  "    "       "     "      "
+      outer:
+      while (true) {
+        int count1 = 0; // Number of times in a row that first run won
+        int count2 = 0; // Number of times in a row that second run won
+
+        /*
+         * Do the straightforward thing until (if ever) one run starts
+         * winning consistently.
+         */
+        do {
+          assert len1 > 1 && len2 > 0;
+          if (c.compare(s.getKey(a, cursor2), s.getKey(tmp, cursor1)) < 0) {
+            s.copyElement(a, cursor2++, a, dest++);
+            count2++;
+            count1 = 0;
+            if (--len2 == 0)
+              break outer;
+          } else {
+            s.copyElement(tmp, cursor1++, a, dest++);
+            count1++;
+            count2 = 0;
+            if (--len1 == 1)
+              break outer;
+          }
+        } while ((count1 | count2) < minGallop);
+
+        /*
+         * One run is winning so consistently that galloping may be a
+         * huge win. So try that, and continue galloping until (if ever)
+         * neither run appears to be winning consistently anymore.
+         */
+        do {
+          assert len1 > 1 && len2 > 0;
+          count1 = gallopRight(s.getKey(a, cursor2), tmp, cursor1, len1, 0, c);
+          if (count1 != 0) {
+            s.copyRange(tmp, cursor1, a, dest, count1);
+            dest += count1;
+            cursor1 += count1;
+            len1 -= count1;
+            if (len1 <= 1) // len1 == 1 || len1 == 0
+              break outer;
+          }
+          s.copyElement(a, cursor2++, a, dest++);
+          if (--len2 == 0)
+            break outer;
+
+          count2 = gallopLeft(s.getKey(tmp, cursor1), a, cursor2, len2, 0, c);
+          if (count2 != 0) {
+            s.copyRange(a, cursor2, a, dest, count2);
+            dest += count2;
+            cursor2 += count2;
+            len2 -= count2;
+            if (len2 == 0)
+              break outer;
+          }
+          s.copyElement(tmp, cursor1++, a, dest++);
+          if (--len1 == 1)
+            break outer;
+          minGallop--;
+        } while (count1 >= MIN_GALLOP | count2 >= MIN_GALLOP);
+        if (minGallop < 0)
+          minGallop = 0;
+        minGallop += 2;  // Penalize for leaving gallop mode
+      }  // End of "outer" loop
+      this.minGallop = minGallop < 1 ? 1 : minGallop;  // Write back to field
+
+      if (len1 == 1) {
+        assert len2 > 0;
+        s.copyRange(a, cursor2, a, dest, len2);
+        s.copyElement(tmp, cursor1, a, dest + len2); //  Last elt of run 1 to end of merge
+      } else if (len1 == 0) {
+        throw new IllegalArgumentException(
+            "Comparison method violates its general contract!");
+      } else {
+        assert len2 == 0;
+        assert len1 > 1;
+        s.copyRange(tmp, cursor1, a, dest, len1);
+      }
+    }
+
+    /**
+     * Like mergeLo, except that this method should be called only if
+     * len1 >= len2; mergeLo should be called if len1 <= len2.  (Either method
+     * may be called if len1 == len2.)
+     *
+     * @param base1 index of first element in first run to be merged
+     * @param len1  length of first run to be merged (must be > 0)
+     * @param base2 index of first element in second run to be merged
+     *        (must be aBase + aLen)
+     * @param len2  length of second run to be merged (must be > 0)
+     */
+    private void mergeHi(int base1, int len1, int base2, int len2) {
+      assert len1 > 0 && len2 > 0 && base1 + len1 == base2;
+
+      // Copy second run into temp array
+      Buffer a = this.a; // For performance
+      Buffer tmp = ensureCapacity(len2);
+      s.copyRange(a, base2, tmp, 0, len2);
+
+      int cursor1 = base1 + len1 - 1;  // Indexes into a
+      int cursor2 = len2 - 1;          // Indexes into tmp array
+      int dest = base2 + len2 - 1;     // Indexes into a
+
+      // Move last element of first run and deal with degenerate cases
+      s.copyElement(a, cursor1--, a, dest--);
+      if (--len1 == 0) {
+        s.copyRange(tmp, 0, a, dest - (len2 - 1), len2);
+        return;
+      }
+      if (len2 == 1) {
+        dest -= len1;
+        cursor1 -= len1;
+        s.copyRange(a, cursor1 + 1, a, dest + 1, len1);
+        s.copyElement(tmp, cursor2, a, dest);
+        return;
+      }
+
+      Comparator<? super K> c = this.c;  // Use local variable for performance
+      int minGallop = this.minGallop;    //  "    "       "     "      "
+      outer:
+      while (true) {
+        int count1 = 0; // Number of times in a row that first run won
+        int count2 = 0; // Number of times in a row that second run won
+
+        /*
+         * Do the straightforward thing until (if ever) one run
+         * appears to win consistently.
+         */
+        do {
+          assert len1 > 0 && len2 > 1;
+          if (c.compare(s.getKey(tmp, cursor2), s.getKey(a, cursor1)) < 0) {
+            s.copyElement(a, cursor1--, a, dest--);
+            count1++;
+            count2 = 0;
+            if (--len1 == 0)
+              break outer;
+          } else {
+            s.copyElement(tmp, cursor2--, a, dest--);
+            count2++;
+            count1 = 0;
+            if (--len2 == 1)
+              break outer;
+          }
+        } while ((count1 | count2) < minGallop);
+
+        /*
+         * One run is winning so consistently that galloping may be a
+         * huge win. So try that, and continue galloping until (if ever)
+         * neither run appears to be winning consistently anymore.
+         */
+        do {
+          assert len1 > 0 && len2 > 1;
+          count1 = len1 - gallopRight(s.getKey(tmp, cursor2), a, base1, len1, len1 - 1, c);
+          if (count1 != 0) {
+            dest -= count1;
+            cursor1 -= count1;
+            len1 -= count1;
+            s.copyRange(a, cursor1 + 1, a, dest + 1, count1);
+            if (len1 == 0)
+              break outer;
+          }
+          s.copyElement(tmp, cursor2--, a, dest--);
+          if (--len2 == 1)
+            break outer;
+
+          count2 = len2 - gallopLeft(s.getKey(a, cursor1), tmp, 0, len2, len2 - 1, c);
+          if (count2 != 0) {
+            dest -= count2;
+            cursor2 -= count2;
+            len2 -= count2;
+            s.copyRange(tmp, cursor2 + 1, a, dest + 1, count2);
+            if (len2 <= 1)  // len2 == 1 || len2 == 0
+              break outer;
+          }
+          s.copyElement(a, cursor1--, a, dest--);
+          if (--len1 == 0)
+            break outer;
+          minGallop--;
+        } while (count1 >= MIN_GALLOP | count2 >= MIN_GALLOP);
+        if (minGallop < 0)
+          minGallop = 0;
+        minGallop += 2;  // Penalize for leaving gallop mode
+      }  // End of "outer" loop
+      this.minGallop = minGallop < 1 ? 1 : minGallop;  // Write back to field
+
+      if (len2 == 1) {
+        assert len1 > 0;
+        dest -= len1;
+        cursor1 -= len1;
+        s.copyRange(a, cursor1 + 1, a, dest + 1, len1);
+        s.copyElement(tmp, cursor2, a, dest); // Move first elt of run2 to front of merge
+      } else if (len2 == 0) {
+        throw new IllegalArgumentException(
+            "Comparison method violates its general contract!");
+      } else {
+        assert len1 == 0;
+        assert len2 > 0;
+        s.copyRange(tmp, 0, a, dest - (len2 - 1), len2);
+      }
+    }
+
+    /**
+     * Ensures that the external array tmp has at least the specified
+     * number of elements, increasing its size if necessary.  The size
+     * increases exponentially to ensure amortized linear time complexity.
+     *
+     * @param minCapacity the minimum required capacity of the tmp array
+     * @return tmp, whether or not it grew
+     */
+    private Buffer ensureCapacity(int minCapacity) {
+      if (tmpLength < minCapacity) {
+        // Compute smallest power of 2 > minCapacity
+        int newSize = minCapacity;
+        newSize |= newSize >> 1;
+        newSize |= newSize >> 2;
+        newSize |= newSize >> 4;
+        newSize |= newSize >> 8;
+        newSize |= newSize >> 16;
+        newSize++;
+
+        if (newSize < 0) // Not bloody likely!
+          newSize = minCapacity;
+        else
+          newSize = Math.min(newSize, aLength >>> 1);
+
+        tmp = s.allocate(newSize);
+        tmpLength = newSize;
+      }
+      return tmp;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/85d3596e/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
index 1a6f1c2..290282c 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
@@ -254,26 +254,21 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64)
    * Return an iterator of the map in sorted order. This provides a way to sort the map without
    * using additional memory, at the expense of destroying the validity of the map.
    */
-  def destructiveSortedIterator(cmp: Comparator[(K, V)]): Iterator[(K, V)] = {
+  def destructiveSortedIterator(keyComparator: Comparator[K]): Iterator[(K, V)] = {
     destroyed = true
     // Pack KV pairs into the front of the underlying array
     var keyIndex, newIndex = 0
     while (keyIndex < capacity) {
       if (data(2 * keyIndex) != null) {
-        data(newIndex) = (data(2 * keyIndex), data(2 * keyIndex + 1))
+        data(2 * newIndex) = data(2 * keyIndex)
+        data(2 * newIndex + 1) = data(2 * keyIndex + 1)
         newIndex += 1
       }
       keyIndex += 1
     }
     assert(curSize == newIndex + (if (haveNullValue) 1 else 0))
 
-    // Sort by the given ordering
-    val rawOrdering = new Comparator[AnyRef] {
-      def compare(x: AnyRef, y: AnyRef): Int = {
-        cmp.compare(x.asInstanceOf[(K, V)], y.asInstanceOf[(K, V)])
-      }
-    }
-    Arrays.sort(data, 0, newIndex, rawOrdering)
+    new Sorter(new KVArraySortDataFormat[K, AnyRef]).sort(data, 0, newIndex, keyComparator)
 
     new Iterator[(K, V)] {
       var i = 0
@@ -284,7 +279,7 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64)
           nullValueReady = false
           (null.asInstanceOf[K], nullValue)
         } else {
-          val item = data(i).asInstanceOf[(K, V)]
+          val item = (data(2 * i).asInstanceOf[K], data(2 * i + 1).asInstanceOf[V])
           i += 1
           item
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/85d3596e/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 765254b..71ab2a3 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -30,6 +30,7 @@ import org.apache.spark.{Logging, SparkEnv}
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.serializer.Serializer
 import org.apache.spark.storage.{BlockId, BlockManager}
+import org.apache.spark.util.collection.ExternalAppendOnlyMap.HashComparator
 
 /**
  * :: DeveloperApi ::
@@ -66,8 +67,6 @@ class ExternalAppendOnlyMap[K, V, C](
     blockManager: BlockManager = SparkEnv.get.blockManager)
   extends Iterable[(K, C)] with Serializable with Logging {
 
-  import ExternalAppendOnlyMap._
-
   private var currentMap = new SizeTrackingAppendOnlyMap[K, C]
   private val spilledMaps = new ArrayBuffer[DiskMapIterator]
   private val sparkConf = SparkEnv.get.conf
@@ -105,7 +104,7 @@ class ExternalAppendOnlyMap[K, V, C](
   private var _diskBytesSpilled = 0L
 
   private val fileBufferSize = sparkConf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024
-  private val comparator = new KCComparator[K, C]
+  private val keyComparator = new HashComparator[K]
   private val ser = serializer.newInstance()
 
   /**
@@ -173,7 +172,7 @@ class ExternalAppendOnlyMap[K, V, C](
     }
 
     try {
-      val it = currentMap.destructiveSortedIterator(comparator)
+      val it = currentMap.destructiveSortedIterator(keyComparator)
       while (it.hasNext) {
         val kv = it.next()
         writer.write(kv)
@@ -231,7 +230,7 @@ class ExternalAppendOnlyMap[K, V, C](
 
     // Input streams are derived both from the in-memory map and spilled maps on disk
     // The in-memory map is sorted in place, while the spilled maps are already in sorted order
-    private val sortedMap = currentMap.destructiveSortedIterator(comparator)
+    private val sortedMap = currentMap.destructiveSortedIterator(keyComparator)
     private val inputStreams = (Seq(sortedMap) ++ spilledMaps).map(it => it.buffered)
 
     inputStreams.foreach { it =>
@@ -252,7 +251,7 @@ class ExternalAppendOnlyMap[K, V, C](
       if (it.hasNext) {
         var kc = it.next()
         kcPairs += kc
-        val minHash = getKeyHashCode(kc)
+        val minHash = hashKey(kc)
         while (it.hasNext && it.head._1.hashCode() == minHash) {
           kc = it.next()
           kcPairs += kc
@@ -298,7 +297,7 @@ class ExternalAppendOnlyMap[K, V, C](
       val minPair = minPairs.remove(0)
       val minKey = minPair._1
       var minCombiner = minPair._2
-      assert(getKeyHashCode(minPair) == minHash)
+      assert(hashKey(minPair) == minHash)
 
       // For all other streams that may have this key (i.e. have the same minimum key hash),
       // merge in the corresponding value (if any) from that stream
@@ -339,7 +338,7 @@ class ExternalAppendOnlyMap[K, V, C](
       // Invalid if there are no more pairs in this stream
       def minKeyHash: Int = {
         assert(pairs.length > 0)
-        getKeyHashCode(pairs.head)
+        hashKey(pairs.head)
       }
 
       override def compareTo(other: StreamBuffer): Int = {
@@ -423,25 +422,27 @@ class ExternalAppendOnlyMap[K, V, C](
       file.delete()
     }
   }
+
+  /** Convenience function to hash the given (K, C) pair by the key. */
+  private def hashKey(kc: (K, C)): Int = ExternalAppendOnlyMap.hash(kc._1)
 }
 
 private[spark] object ExternalAppendOnlyMap {
 
   /**
-   * Return the key hash code of the given (key, combiner) pair.
-   * If the key is null, return a special hash code.
+   * Return the hash code of the given object. If the object is null, return a special hash code.
    */
-  private def getKeyHashCode[K, C](kc: (K, C)): Int = {
-    if (kc._1 == null) 0 else kc._1.hashCode()
+  private def hash[T](obj: T): Int = {
+    if (obj == null) 0 else obj.hashCode()
   }
 
   /**
-   * A comparator for (key, combiner) pairs based on their key hash codes.
+   * A comparator which sorts arbitrary keys based on their hash codes.
    */
-  private class KCComparator[K, C] extends Comparator[(K, C)] {
-    def compare(kc1: (K, C), kc2: (K, C)): Int = {
-      val hash1 = getKeyHashCode(kc1)
-      val hash2 = getKeyHashCode(kc2)
+  private class HashComparator[K] extends Comparator[K] {
+    def compare(key1: K, key2: K): Int = {
+      val hash1 = hash(key1)
+      val hash2 = hash(key2)
       if (hash1 < hash2) -1 else if (hash1 == hash2) 0 else 1
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/85d3596e/core/src/main/scala/org/apache/spark/util/collection/SortDataFormat.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/SortDataFormat.scala b/core/src/main/scala/org/apache/spark/util/collection/SortDataFormat.scala
new file mode 100644
index 0000000..ac15289
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/collection/SortDataFormat.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import scala.reflect.ClassTag
+
+/**
+ * Abstraction for sorting an arbitrary input buffer of data. This interface requires determining
+ * the sort key for a given element index, as well as swapping elements and moving data from one
+ * buffer to another.
+ *
+ * Example format: an array of numbers, where each element is also the key.
+ * See [[KVArraySortDataFormat]] for a more exciting format.
+ *
+ * This trait extends Any to ensure it is universal (and thus compiled to a Java interface).
+ *
+ * @tparam K Type of the sort key of each element
+ * @tparam Buffer Internal data structure used by a particular format (e.g., Array[Int]).
+ */
+// TODO: Making Buffer a real trait would be a better abstraction, but adds some complexity.
+private[spark] trait SortDataFormat[K, Buffer] extends Any {
+  /** Return the sort key for the element at the given index. */
+  protected def getKey(data: Buffer, pos: Int): K
+
+  /** Swap two elements. */
+  protected def swap(data: Buffer, pos0: Int, pos1: Int): Unit
+
+  /** Copy a single element from src(srcPos) to dst(dstPos). */
+  protected def copyElement(src: Buffer, srcPos: Int, dst: Buffer, dstPos: Int): Unit
+
+  /**
+   * Copy a range of elements starting at src(srcPos) to dst, starting at dstPos.
+   * Overlapping ranges are allowed.
+   */
+  protected def copyRange(src: Buffer, srcPos: Int, dst: Buffer, dstPos: Int, length: Int): Unit
+
+  /**
+   * Allocates a Buffer that can hold up to 'length' elements.
+   * All elements of the buffer should be considered invalid until data is explicitly copied in.
+   */
+  protected def allocate(length: Int): Buffer
+}
+
+/**
+ * Supports sorting an array of key-value pairs where the elements of the array alternate between
+ * keys and values, as used in [[AppendOnlyMap]].
+ *
+ * @tparam K Type of the sort key of each element
+ * @tparam T Type of the Array we're sorting. Typically this must extend AnyRef, to support cases
+ *           when the keys and values are not the same type.
+ */
+private[spark]
+class KVArraySortDataFormat[K, T <: AnyRef : ClassTag] extends SortDataFormat[K, Array[T]] {
+
+  override protected def getKey(data: Array[T], pos: Int): K = data(2 * pos).asInstanceOf[K]
+
+  override protected def swap(data: Array[T], pos0: Int, pos1: Int) {
+    val tmpKey = data(2 * pos0)
+    val tmpVal = data(2 * pos0 + 1)
+    data(2 * pos0)     = data(2 * pos1)
+    data(2 * pos0 + 1) = data(2 * pos1 + 1)
+    data(2 * pos1)     = tmpKey
+    data(2 * pos1 + 1) = tmpVal
+  }
+
+  override protected def copyElement(src: Array[T], srcPos: Int, dst: Array[T], dstPos: Int) {
+    dst(2 * dstPos) = src(2 * srcPos)
+    dst(2 * dstPos + 1) = src(2 * srcPos + 1)
+  }
+
+  override protected def copyRange(src: Array[T], srcPos: Int,
+                                   dst: Array[T], dstPos: Int, length: Int) {
+    System.arraycopy(src, 2 * srcPos, dst, 2 * dstPos, 2 * length)
+  }
+
+  override protected def allocate(length: Int): Array[T] = {
+    new Array[T](2 * length)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/85d3596e/core/src/test/scala/org/apache/spark/util/collection/AppendOnlyMapSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/collection/AppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/AppendOnlyMapSuite.scala
index 52c7288..cb99d14 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/AppendOnlyMapSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/AppendOnlyMapSuite.scala
@@ -170,10 +170,10 @@ class AppendOnlyMapSuite extends FunSuite {
       case e: IllegalStateException => fail()
     }
 
-    val it = map.destructiveSortedIterator(new Comparator[(String, String)] {
-      def compare(kv1: (String, String), kv2: (String, String)): Int = {
-        val x = if (kv1 != null && kv1._1 != null) kv1._1.toInt else Int.MinValue
-        val y = if (kv2 != null && kv2._1 != null) kv2._1.toInt else Int.MinValue
+    val it = map.destructiveSortedIterator(new Comparator[String] {
+      def compare(key1: String, key2: String): Int = {
+        val x = if (key1 != null) key1.toInt else Int.MinValue
+        val y = if (key2 != null) key2.toInt else Int.MinValue
         x.compareTo(y)
       }
     })

http://git-wip-us.apache.org/repos/asf/spark/blob/85d3596e/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala
new file mode 100644
index 0000000..6fe1079
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.lang.{Float => JFloat}
+import java.util.{Arrays, Comparator}
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.util.random.XORShiftRandom
+
+class SorterSuite extends FunSuite {
+
+  test("equivalent to Arrays.sort") {
+    val rand = new XORShiftRandom(123)
+    val data0 = Array.tabulate[Int](10000) { i => rand.nextInt() }
+    val data1 = data0.clone()
+
+    Arrays.sort(data0)
+    new Sorter(new IntArraySortDataFormat).sort(data1, 0, data1.length, Ordering.Int)
+
+    data0.zip(data1).foreach { case (x, y) => assert(x === y) }
+  }
+
+  test("KVArraySorter") {
+    val rand = new XORShiftRandom(456)
+
+    // Construct an array of keys (to Java sort) and an array where the keys and values
+    // alternate. Keys are random doubles, values are ordinals from 0 to length.
+    val keys = Array.tabulate[Double](5000) { i => rand.nextDouble() }
+    val keyValueArray = Array.tabulate[Number](10000) { i =>
+      if (i % 2 == 0) keys(i / 2) else new Integer(i / 2)
+    }
+
+    // Map from generated keys to values, to verify correctness later
+    val kvMap =
+      keyValueArray.grouped(2).map { case Array(k, v) => k.doubleValue() -> v.intValue() }.toMap
+
+    Arrays.sort(keys)
+    new Sorter(new KVArraySortDataFormat[Double, Number])
+      .sort(keyValueArray, 0, keys.length, Ordering.Double)
+
+    keys.zipWithIndex.foreach { case (k, i) =>
+      assert(k === keyValueArray(2 * i))
+      assert(kvMap(k) === keyValueArray(2 * i + 1))
+    }
+  }
+
+  /**
+   * This provides a simple benchmark for comparing the Sorter with Java internal sorting.
+   * Ideally these would be executed one at a time, each in their own JVM, so their listing
+   * here is mainly to have the code.
+   *
+   * The goal of this code is to sort an array of key-value pairs, where the array physically
+   * has the keys and values alternating. The basic Java sorts work only on the keys, so the
+   * real Java solution is to make Tuple2s to store the keys and values and sort an array of
+   * those, while the Sorter approach can work directly on the input data format.
+   *
+   * Note that the Java implementation varies tremendously between Java 6 and Java 7, when
+   * the Java sort changed from merge sort to Timsort.
+   */
+  ignore("Sorter benchmark") {
+
+    /** Runs an experiment several times. */
+    def runExperiment(name: String)(f: => Unit): Unit = {
+      val firstTry = org.apache.spark.util.Utils.timeIt(1)(f)
+      System.gc()
+
+      var i = 0
+      var next10: Long = 0
+      while (i < 10) {
+        val time = org.apache.spark.util.Utils.timeIt(1)(f)
+        next10 += time
+        println(s"$name: Took $time ms")
+        i += 1
+      }
+
+      println(s"$name: ($firstTry ms first try, ${next10 / 10} ms average)")
+    }
+
+    val numElements = 25000000 // 25 mil
+    val rand = new XORShiftRandom(123)
+
+    val keys = Array.tabulate[JFloat](numElements) { i =>
+      new JFloat(rand.nextFloat())
+    }
+
+    // Test our key-value pairs where each element is a Tuple2[Float, Integer)
+    val kvTupleArray = Array.tabulate[AnyRef](numElements) { i =>
+      (keys(i / 2): Float, i / 2: Int)
+    }
+    runExperiment("Tuple-sort using Arrays.sort()") {
+      Arrays.sort(kvTupleArray, new Comparator[AnyRef] {
+        override def compare(x: AnyRef, y: AnyRef): Int =
+          Ordering.Float.compare(x.asInstanceOf[(Float, _)]._1, y.asInstanceOf[(Float, _)]._1)
+      })
+    }
+
+    // Test our Sorter where each element alternates between Float and Integer, non-primitive
+    val keyValueArray = Array.tabulate[AnyRef](numElements * 2) { i =>
+      if (i % 2 == 0) keys(i / 2) else new Integer(i / 2)
+    }
+    val sorter = new Sorter(new KVArraySortDataFormat[JFloat, AnyRef])
+    runExperiment("KV-sort using Sorter") {
+      sorter.sort(keyValueArray, 0, keys.length, new Comparator[JFloat] {
+        override def compare(x: JFloat, y: JFloat): Int = Ordering.Float.compare(x, y)
+      })
+    }
+
+    // Test non-primitive sort on float array
+    runExperiment("Java Arrays.sort()") {
+      Arrays.sort(keys, new Comparator[JFloat] {
+        override def compare(x: JFloat, y: JFloat): Int = Ordering.Float.compare(x, y)
+      })
+    }
+
+    // Test primitive sort on float array
+    val primitiveKeys = Array.tabulate[Float](numElements) { i => rand.nextFloat() }
+    runExperiment("Java Arrays.sort() on primitive keys") {
+      Arrays.sort(primitiveKeys)
+    }
+  }
+}
+
+
+/** Format to sort a simple Array[Int]. Could be easily generified and specialized. */
+class IntArraySortDataFormat extends SortDataFormat[Int, Array[Int]] {
+  override protected def getKey(data: Array[Int], pos: Int): Int = {
+    data(pos)
+  }
+
+  override protected def swap(data: Array[Int], pos0: Int, pos1: Int): Unit = {
+    val tmp = data(pos0)
+    data(pos0) = data(pos1)
+    data(pos1) = tmp
+  }
+
+  override protected def copyElement(src: Array[Int], srcPos: Int, dst: Array[Int], dstPos: Int) {
+    dst(dstPos) = src(srcPos)
+  }
+
+  /** Copy a range of elements starting at src(srcPos) to dest, starting at destPos. */
+  override protected def copyRange(src: Array[Int], srcPos: Int,
+                                   dst: Array[Int], dstPos: Int, length: Int) {
+    System.arraycopy(src, srcPos, dst, dstPos, length)
+  }
+
+  /** Allocates a new structure that can hold up to 'length' elements. */
+  override protected def allocate(length: Int): Array[Int] = {
+    new Array[Int](length)
+  }
+}