You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by mi...@apache.org on 2017/04/26 10:39:12 UTC

lucene-solr:branch_6x: LUCENE-7792: add optional concurrency to OfflineSorter

Repository: lucene-solr
Updated Branches:
  refs/heads/branch_6x b90bfaba1 -> bb8dd7a9f


LUCENE-7792: add optional concurrency to OfflineSorter


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/bb8dd7a9
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/bb8dd7a9
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/bb8dd7a9

Branch: refs/heads/branch_6x
Commit: bb8dd7a9f98dd172a66fe6686402500164d69162
Parents: b90bfab
Author: Mike McCandless <mi...@apache.org>
Authored: Wed Apr 26 06:38:28 2017 -0400
Committer: Mike McCandless <mi...@apache.org>
Committed: Wed Apr 26 06:38:49 2017 -0400

----------------------------------------------------------------------
 lucene/CHANGES.txt                              |   3 +
 .../codecs/simpletext/SimpleTextBKDWriter.java  |   2 +-
 .../org/apache/lucene/util/OfflineSorter.java   | 381 ++++++++++++-------
 .../lucene/util/SameThreadExecutorService.java  |  69 ++++
 .../org/apache/lucene/util/bkd/BKDWriter.java   |   2 +-
 .../apache/lucene/util/TestOfflineSorter.java   |  50 ++-
 6 files changed, 353 insertions(+), 154 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bb8dd7a9/lucene/CHANGES.txt
----------------------------------------------------------------------
diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt
index d4aee39..5cc7138 100644
--- a/lucene/CHANGES.txt
+++ b/lucene/CHANGES.txt
@@ -26,6 +26,9 @@ Improvements
   Accountable so you can see how much RAM it's using (Robert Muir,
   Mike McCandless)
 
+* LUCENE-7792: OfflineSorter can now run concurrently if you pass it
+  an optional ExecutorService (Dawid Weiss, Mike McCandless)
+
 Optimizations
 
 * LUCENE-7787: spatial-extras HeatmapFacetCounter will now short-circuit it's

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bb8dd7a9/lucene/codecs/src/java/org/apache/lucene/codecs/simpletext/SimpleTextBKDWriter.java
----------------------------------------------------------------------
diff --git a/lucene/codecs/src/java/org/apache/lucene/codecs/simpletext/SimpleTextBKDWriter.java b/lucene/codecs/src/java/org/apache/lucene/codecs/simpletext/SimpleTextBKDWriter.java
index 31e25c5..b918571 100644
--- a/lucene/codecs/src/java/org/apache/lucene/codecs/simpletext/SimpleTextBKDWriter.java
+++ b/lucene/codecs/src/java/org/apache/lucene/codecs/simpletext/SimpleTextBKDWriter.java
@@ -877,7 +877,7 @@ final class SimpleTextBKDWriter implements Closeable {
         };
       }
 
-      OfflineSorter sorter = new OfflineSorter(tempDir, tempFileNamePrefix + "_bkd" + dim, cmp, offlineSorterBufferMB, offlineSorterMaxTempFiles, bytesPerDoc) {
+      OfflineSorter sorter = new OfflineSorter(tempDir, tempFileNamePrefix + "_bkd" + dim, cmp, offlineSorterBufferMB, offlineSorterMaxTempFiles, bytesPerDoc, null, 0) {
 
           /** We write/read fixed-byte-width file that {@link OfflinePointReader} can read. */
           @Override

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bb8dd7a9/lucene/core/src/java/org/apache/lucene/util/OfflineSorter.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/util/OfflineSorter.java b/lucene/core/src/java/org/apache/lucene/util/OfflineSorter.java
index d273057..b28752a 100644
--- a/lucene/core/src/java/org/apache/lucene/util/OfflineSorter.java
+++ b/lucene/core/src/java/org/apache/lucene/util/OfflineSorter.java
@@ -24,7 +24,12 @@ import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Locale;
-import java.util.stream.Collectors;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.lucene.codecs.CodecUtil;
 import org.apache.lucene.store.ChecksumIndexInput;
@@ -73,6 +78,9 @@ public class OfflineSorter {
   private final int valueLength;
   private final String tempFileNamePrefix;
 
+  private final ExecutorService exec;
+  private final Semaphore partitionsInRAM;
+
   /** 
    * A bit more descriptive unit for constructors.
    * 
@@ -145,13 +153,13 @@ public class OfflineSorter {
     /** number of lines of data read */
     public int lineCount;
     /** time spent merging sorted partitions (in milliseconds) */
-    public long mergeTime;
+    public final AtomicLong mergeTimeMS = new AtomicLong();
     /** time spent sorting data (in milliseconds) */
-    public long sortTime;
+    public final AtomicLong sortTimeMS = new AtomicLong();
     /** total time spent (in milliseconds) */
-    public long totalTime;
+    public long totalTimeMS;
     /** time spent in i/o read (in milliseconds) */
-    public long readTime;
+    public long readTimeMS;
     /** read buffer size (in bytes) */
     public final long bufferSize = ramBufferSize.bytes;
     
@@ -161,17 +169,15 @@ public class OfflineSorter {
     @Override
     public String toString() {
       return String.format(Locale.ROOT,
-          "time=%.2f sec. total (%.2f reading, %.2f sorting, %.2f merging), lines=%d, temp files=%d, merges=%d, soft ram limit=%.2f MB",
-          totalTime / 1000.0d, readTime / 1000.0d, sortTime / 1000.0d, mergeTime / 1000.0d,
-          lineCount, tempMergeFiles, mergeRounds,
-          (double) bufferSize / MB);
+                           "time=%.2f sec. total (%.2f reading, %.2f sorting, %.2f merging), lines=%d, temp files=%d, merges=%d, soft ram limit=%.2f MB",
+                           totalTimeMS / 1000.0d, readTimeMS / 1000.0d, sortTimeMS.get() / 1000.0d, mergeTimeMS.get() / 1000.0d,
+                           lineCount, tempMergeFiles, mergeRounds,
+                           (double) bufferSize / MB);
     }
   }
 
   private final BufferSize ramBufferSize;
   
-  private final Counter bufferBytesUsed = Counter.newCounter();
-  private final SortableBytesRefArray buffer;
   SortInfo sortInfo;
   private int maxTempFiles;
   private final Comparator<BytesRef> comparator;
@@ -185,7 +191,7 @@ public class OfflineSorter {
    * @see BufferSize#automatic()
    */
   public OfflineSorter(Directory dir, String tempFileNamePrefix) throws IOException {
-    this(dir, tempFileNamePrefix, DEFAULT_COMPARATOR, BufferSize.automatic(), MAX_TEMPFILES, -1);
+    this(dir, tempFileNamePrefix, DEFAULT_COMPARATOR, BufferSize.automatic(), MAX_TEMPFILES, -1, null, 0);
   }
   
   /**
@@ -194,14 +200,30 @@ public class OfflineSorter {
    * @see BufferSize#automatic()
    */
   public OfflineSorter(Directory dir, String tempFileNamePrefix, Comparator<BytesRef> comparator) throws IOException {
-    this(dir, tempFileNamePrefix, comparator, BufferSize.automatic(), MAX_TEMPFILES, -1);
+    this(dir, tempFileNamePrefix, comparator, BufferSize.automatic(), MAX_TEMPFILES, -1, null, 0);
   }
 
   /**
    * All-details constructor.  If {@code valueLength} is -1 (the default), the length of each value differs; otherwise,
-   * all values have the specified length.
+   * all values have the specified length.  If you pass a non-null {@code ExecutorService} then it will be
+   * used to run sorting operations that can be run concurrently, and maxPartitionsInRAM is the maximum
+   * concurrent in-memory partitions.  Thus the maximum possible RAM used by this class while sorting is
+   * {@code maxPartitionsInRAM * ramBufferSize}.
    */
-  public OfflineSorter(Directory dir, String tempFileNamePrefix, Comparator<BytesRef> comparator, BufferSize ramBufferSize, int maxTempfiles, int valueLength) {
+  public OfflineSorter(Directory dir, String tempFileNamePrefix, Comparator<BytesRef> comparator,
+                       BufferSize ramBufferSize, int maxTempfiles, int valueLength, ExecutorService exec,
+                       int maxPartitionsInRAM) {
+    if (exec != null) {
+      this.exec = exec;
+      if (maxPartitionsInRAM <= 0) {
+        throw new IllegalArgumentException("maxPartitionsInRAM must be > 0; got " + maxPartitionsInRAM);
+      }
+    } else {
+      this.exec = new SameThreadExecutorService();
+      maxPartitionsInRAM = 1;
+    }
+    this.partitionsInRAM = new Semaphore(maxPartitionsInRAM);
+
     if (ramBufferSize.bytes < ABSOLUTE_MIN_SORT_BUFFER_SIZE) {
       throw new IllegalArgumentException(MIN_BUFFER_SIZE_MSG + ": " + ramBufferSize.bytes);
     }
@@ -209,14 +231,11 @@ public class OfflineSorter {
     if (maxTempfiles < 2) {
       throw new IllegalArgumentException("maxTempFiles must be >= 2");
     }
-    if (valueLength == -1) {
-      buffer = new BytesRefArray(bufferBytesUsed);
-    } else {
-      if (valueLength == 0 || valueLength > Short.MAX_VALUE) {
-        throw new IllegalArgumentException("valueLength must be 1 .. " + Short.MAX_VALUE + "; got: " + valueLength);
-      }
-      buffer = new FixedLengthBytesRefArray(valueLength);
+
+    if (valueLength != -1 && (valueLength == 0 || valueLength > Short.MAX_VALUE)) {
+      throw new IllegalArgumentException("valueLength must be 1 .. " + Short.MAX_VALUE + "; got: " + valueLength);
     }
+    
     this.valueLength = valueLength;
     this.ramBufferSize = ramBufferSize;
     this.maxTempFiles = maxTempfiles;
@@ -241,26 +260,28 @@ public class OfflineSorter {
   public String sort(String inputFileName) throws IOException {
     
     sortInfo = new SortInfo();
-    sortInfo.totalTime = System.currentTimeMillis();
+    long startMS = System.currentTimeMillis();
 
-    List<PartitionAndCount> segments = new ArrayList<>();
+    List<Future<Partition>> segments = new ArrayList<>();
     int[] levelCounts = new int[1];
 
     // So we can remove any partially written temp files on exception:
     TrackingDirectoryWrapper trackingDir = new TrackingDirectoryWrapper(dir);
 
     boolean success = false;
-    boolean[] isExhausted = new boolean[1];
     try (ByteSequencesReader is = getReader(dir.openChecksumInput(inputFileName, IOContext.READONCE), inputFileName)) {
-      while (isExhausted[0] == false) {
-        int lineCount = readPartition(is, isExhausted);
-        if (lineCount == 0) {
-          assert isExhausted[0];
+      while (true) {
+        Partition part = readPartition(is);
+        if (part.count == 0) {
+          assert part.exhausted;
           break;
         }
-        segments.add(sortPartition(trackingDir, lineCount));
+
+        Callable<Partition> job = new SortPartitionTask(trackingDir, part);
+
+        segments.add(exec.submit(job));
         sortInfo.tempMergeFiles++;
-        sortInfo.lineCount += lineCount;
+        sortInfo.lineCount += part.count;
         levelCounts[0]++;
 
         // Handle intermediate merges; we need a while loop to "cascade" the merge when necessary:
@@ -274,6 +295,10 @@ public class OfflineSorter {
           levelCounts[mergeLevel] = 0;
           mergeLevel++;
         }
+
+        if (part.exhausted) {
+          break;
+        }
       }
       
       // TODO: we shouldn't have to do this?  Can't we return a merged reader to
@@ -292,13 +317,22 @@ public class OfflineSorter {
           result = out.getName();
         }
       } else {
-        result = segments.get(0).fileName;
+        try {
+          result = segments.get(0).get().fileName;
+        } catch (InterruptedException ie) {
+          throw new ThreadInterruptedException(ie);
+        } catch (ExecutionException ee) {
+          IOUtils.reThrow(ee.getCause());
+
+          // dead code but javac disagrees:
+          result = null;
+        }
       }
 
       // We should be explicitly removing all intermediate files ourselves unless there is an exception:
       assert trackingDir.getCreatedFiles().size() == 1 && trackingDir.getCreatedFiles().contains(result);
 
-      sortInfo.totalTime = System.currentTimeMillis() - sortInfo.totalTime; 
+      sortInfo.totalTimeMS = System.currentTimeMillis() - startMS;
 
       CodecUtil.checkFooter(is.in);
 
@@ -306,6 +340,8 @@ public class OfflineSorter {
 
       return result;
 
+    } catch (InterruptedException ie) {
+      throw new ThreadInterruptedException(ie);
     } finally {
       if (success == false) {
         IOUtils.deleteFilesIgnoringExceptions(trackingDir, trackingDir.getCreatedFiles());
@@ -313,36 +349,6 @@ public class OfflineSorter {
     }
   }
 
-  /** Sort a single partition in-memory. */
-  protected PartitionAndCount sortPartition(TrackingDirectoryWrapper trackingDir, int lineCount) throws IOException {
-
-    try (IndexOutput tempFile = trackingDir.createTempOutput(tempFileNamePrefix, "sort", IOContext.DEFAULT);
-         ByteSequencesWriter out = getWriter(tempFile, lineCount);) {
-      
-      BytesRef spare;
-
-      long start = System.currentTimeMillis();
-      BytesRefIterator iter = buffer.iterator(comparator);
-      sortInfo.sortTime += System.currentTimeMillis() - start;
-
-      int count = 0;
-      while ((spare = iter.next()) != null) {
-        assert spare.length <= Short.MAX_VALUE;
-        out.write(spare);
-        count++;
-      }
-
-      assert count == lineCount;
-      
-      // Clean up the buffer for the next partition.
-      buffer.clear();
-
-      CodecUtil.writeFooter(out.out);
-
-      return new PartitionAndCount(lineCount, tempFile.getName());
-    }
-  }
-
   /** Called on exception, to check whether the checksum is also corrupt in this source, and add that 
    *  information (checksum matched or didn't) as a suppressed exception. */
   private void verifyChecksum(Throwable priorException, ByteSequencesReader reader) throws IOException {
@@ -352,93 +358,61 @@ public class OfflineSorter {
   }
 
   /** Merge the most recent {@code maxTempFile} partitions into a new partition. */
-  void mergePartitions(Directory trackingDir, List<PartitionAndCount> segments) throws IOException {
+  void mergePartitions(Directory trackingDir, List<Future<Partition>> segments) throws IOException {
     long start = System.currentTimeMillis();
-
-    List<PartitionAndCount> segmentsToMerge;
+    List<Future<Partition>> segmentsToMerge;
     if (segments.size() > maxTempFiles) {
       segmentsToMerge = segments.subList(segments.size() - maxTempFiles, segments.size());
     } else {
       segmentsToMerge = segments;
     }
 
-    long totalCount = 0;
-    for (PartitionAndCount segment : segmentsToMerge) {
-      totalCount += segment.count;
-    }
-
-    PriorityQueue<FileAndTop> queue = new PriorityQueue<FileAndTop>(segmentsToMerge.size()) {
-      @Override
-      protected boolean lessThan(FileAndTop a, FileAndTop b) {
-        return comparator.compare(a.current, b.current) < 0;
-      }
-    };
-
-    ByteSequencesReader[] streams = new ByteSequencesReader[segmentsToMerge.size()];
-
-    String newSegmentName = null;
+    sortInfo.mergeRounds++;
 
-    try (ByteSequencesWriter writer = getWriter(trackingDir.createTempOutput(tempFileNamePrefix, "sort", IOContext.DEFAULT), totalCount)) {
-
-      newSegmentName = writer.out.getName();
-      
-      // Open streams and read the top for each file
-      for (int i = 0; i < segmentsToMerge.size(); i++) {
-        streams[i] = getReader(dir.openChecksumInput(segmentsToMerge.get(i).fileName, IOContext.READONCE), segmentsToMerge.get(i).fileName);
-        BytesRef item = null;
-        try {
-          item = streams[i].next();
-        } catch (Throwable t) {
-          verifyChecksum(t, streams[i]);
-        }
-        assert item != null;
-        queue.insertWithOverflow(new FileAndTop(i, item));
-      }
-  
-      // Unix utility sort() uses ordered array of files to pick the next line from, updating
-      // it as it reads new lines. The PQ used here is a more elegant solution and has 
-      // a nicer theoretical complexity bound :) The entire sorting process is I/O bound anyway
-      // so it shouldn't make much of a difference (didn't check).
-      FileAndTop top;
-      while ((top = queue.top()) != null) {
-        writer.write(top.current);
-        try {
-          top.current = streams[top.fd].next();
-        } catch (Throwable t) {
-          verifyChecksum(t, streams[top.fd]);
-        }
+    MergePartitionsTask task = new MergePartitionsTask(trackingDir, new ArrayList<>(segmentsToMerge));
 
-        if (top.current != null) {
-          queue.updateTop();
-        } else {
-          queue.pop();
-        }
-      }
+    segmentsToMerge.clear();
+    segments.add(exec.submit(task));
 
-      CodecUtil.writeFooter(writer.out);
+    sortInfo.tempMergeFiles++;
+  }
 
-      for(ByteSequencesReader reader : streams) {
-        CodecUtil.checkFooter(reader.in);
-      }
-  
-      sortInfo.mergeTime += System.currentTimeMillis() - start;
-      sortInfo.mergeRounds++;
-    } finally {
-      IOUtils.close(streams);
+  /** Holds one partition of items, either loaded into memory or based on a file. */
+  private static class Partition {
+    public final SortableBytesRefArray buffer;
+    public final boolean exhausted;
+    public final long count;
+    public final String fileName;
+
+    /** A partition loaded into memory. */
+    public Partition(SortableBytesRefArray buffer, boolean exhausted) {
+      this.buffer = buffer;
+      this.fileName = null;
+      this.count = buffer.size();
+      this.exhausted = exhausted;
     }
 
-    IOUtils.deleteFiles(trackingDir, segmentsToMerge.stream().map(segment -> segment.fileName).collect(Collectors.toList()));
-
-    segmentsToMerge.clear();
-    segments.add(new PartitionAndCount(totalCount, newSegmentName));
-
-    sortInfo.tempMergeFiles++;
+    /** An on-disk partition. */
+    public Partition(String fileName, long count) {
+      this.buffer = null;
+      this.fileName = fileName;
+      this.count = count;
+      this.exhausted = true;
+    }
   }
 
   /** Read in a single partition of data, setting isExhausted[0] to true if there are no more items. */
-  int readPartition(ByteSequencesReader reader, boolean[] isExhausted) throws IOException {
+  Partition readPartition(ByteSequencesReader reader) throws IOException, InterruptedException {
+    if (partitionsInRAM != null) {
+      partitionsInRAM.acquire();
+    }
     long start = System.currentTimeMillis();
+    SortableBytesRefArray buffer;
+    boolean exhausted = false;
+    int count;
     if (valueLength != -1) {
+      // fixed length case
+      buffer = new FixedLengthBytesRefArray(valueLength);
       int limit = ramBufferSize.bytes / valueLength;
       for(int i=0;i<limit;i++) {
         BytesRef item = null;
@@ -448,12 +422,14 @@ public class OfflineSorter {
           verifyChecksum(t, reader);
         }
         if (item == null) {
-          isExhausted[0] = true;
+          exhausted = true;
           break;
         }
         buffer.append(item);
       }
     } else {
+      Counter bufferBytesUsed = Counter.newCounter();
+      buffer = new BytesRefArray(bufferBytesUsed);
       while (true) {
         BytesRef item = null;
         try {
@@ -462,7 +438,7 @@ public class OfflineSorter {
           verifyChecksum(t, reader);
         }
         if (item == null) {
-          isExhausted[0] = true;
+          exhausted = true;
           break;
         }
         buffer.append(item);
@@ -473,8 +449,9 @@ public class OfflineSorter {
         }
       }
     }
-    sortInfo.readTime += System.currentTimeMillis() - start;
-    return buffer.size();
+    sortInfo.readTimeMS += System.currentTimeMillis() - start;
+
+    return new Partition(buffer, exhausted);
   }
 
   static class FileAndTop {
@@ -606,13 +583,133 @@ public class OfflineSorter {
     return comparator;
   }
 
-  private static class PartitionAndCount {
-    final long count;
-    final String fileName;
+  /** Sorts one in-memory partition, writes it to disk, and returns the resulting file-based partition. */
+  private class SortPartitionTask implements Callable<Partition> {
 
-    public PartitionAndCount(long count, String fileName) {
-      this.count = count;
-      this.fileName = fileName;
+    private final Directory dir;
+    private final Partition part;
+      
+    public SortPartitionTask(Directory dir, Partition part) {
+      this.dir = dir;
+      this.part = part;
+    }
+    
+    @Override
+    public Partition call() throws IOException {
+      try (IndexOutput tempFile = dir.createTempOutput(tempFileNamePrefix, "sort", IOContext.DEFAULT);
+           ByteSequencesWriter out = getWriter(tempFile, part.buffer.size());) {
+      
+        BytesRef spare;
+
+        long startMS = System.currentTimeMillis();
+        BytesRefIterator iter = part.buffer.iterator(comparator);
+        sortInfo.sortTimeMS.addAndGet(System.currentTimeMillis() - startMS);
+
+        int count = 0;
+        while ((spare = iter.next()) != null) {
+          assert spare.length <= Short.MAX_VALUE;
+          out.write(spare);
+          count++;
+        }
+
+        assert count == part.count;
+
+        CodecUtil.writeFooter(out.out);
+        part.buffer.clear();
+        if (partitionsInRAM != null) {
+          partitionsInRAM.release();
+        }
+
+        return new Partition(tempFile.getName(), part.count);
+      }
+    }
+  }
+
+  /** Merges multiple file-based partitions to a single on-disk partition. */
+  private class MergePartitionsTask implements Callable<Partition> {
+    private final Directory dir;
+    private final List<Future<Partition>> segmentsToMerge;
+    
+    public MergePartitionsTask(Directory dir, List<Future<Partition>> segmentsToMerge) {
+      this.dir = dir;
+      this.segmentsToMerge = segmentsToMerge;
+    }
+
+    @Override
+    public Partition call() throws IOException, InterruptedException, ExecutionException {
+      long totalCount = 0;
+      for (Future<Partition> segment : segmentsToMerge) {
+        totalCount += segment.get().count;
+      }
+
+      PriorityQueue<FileAndTop> queue = new PriorityQueue<FileAndTop>(segmentsToMerge.size()) {
+          @Override
+          protected boolean lessThan(FileAndTop a, FileAndTop b) {
+            return comparator.compare(a.current, b.current) < 0;
+          }
+        };
+
+      ByteSequencesReader[] streams = new ByteSequencesReader[segmentsToMerge.size()];
+
+      String newSegmentName = null;
+
+      long startMS = System.currentTimeMillis();
+      try (ByteSequencesWriter writer = getWriter(dir.createTempOutput(tempFileNamePrefix, "sort", IOContext.DEFAULT), totalCount)) {
+
+        newSegmentName = writer.out.getName();
+      
+        // Open streams and read the top for each file
+        for (int i = 0; i < segmentsToMerge.size(); i++) {
+          Partition segment = segmentsToMerge.get(i).get();
+          streams[i] = getReader(dir.openChecksumInput(segment.fileName, IOContext.READONCE), segment.fileName);
+              
+          BytesRef item = null;
+          try {
+            item = streams[i].next();
+          } catch (Throwable t) {
+            verifyChecksum(t, streams[i]);
+          }
+          assert item != null;
+          queue.insertWithOverflow(new FileAndTop(i, item));
+        }
+  
+        // Unix utility sort() uses ordered array of files to pick the next line from, updating
+        // it as it reads new lines. The PQ used here is a more elegant solution and has 
+        // a nicer theoretical complexity bound :) The entire sorting process is I/O bound anyway
+        // so it shouldn't make much of a difference (didn't check).
+        FileAndTop top;
+        while ((top = queue.top()) != null) {
+          writer.write(top.current);
+          try {
+            top.current = streams[top.fd].next();
+          } catch (Throwable t) {
+            verifyChecksum(t, streams[top.fd]);
+          }
+
+          if (top.current != null) {
+            queue.updateTop();
+          } else {
+            queue.pop();
+          }
+        }
+
+        CodecUtil.writeFooter(writer.out);
+
+        for(ByteSequencesReader reader : streams) {
+          CodecUtil.checkFooter(reader.in);
+        }
+
+        sortInfo.mergeTimeMS.addAndGet(System.currentTimeMillis() - startMS);
+      } finally {
+        IOUtils.close(streams);
+      }
+      List<String> toDelete = new ArrayList<>();
+      for (Future<Partition> segment : segmentsToMerge) {
+        toDelete.add(segment.get().fileName);
+      }
+      IOUtils.deleteFiles(dir, toDelete);
+
+      return new Partition(newSegmentName, totalCount);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bb8dd7a9/lucene/core/src/java/org/apache/lucene/util/SameThreadExecutorService.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/util/SameThreadExecutorService.java b/lucene/core/src/java/org/apache/lucene/util/SameThreadExecutorService.java
new file mode 100644
index 0000000..169b9f8
--- /dev/null
+++ b/lucene/core/src/java/org/apache/lucene/util/SameThreadExecutorService.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.lucene.util;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+
+/** An {@code ExecutorService} that executes tasks immediately in the calling thread during submit. */
+class SameThreadExecutorService extends AbstractExecutorService {
+  private volatile boolean shutdown;
+
+  @Override
+  public void execute(Runnable command) {
+    checkShutdown();
+    command.run();
+  }
+
+  @Override
+  public List<Runnable> shutdownNow() {
+    shutdown();
+    return Collections.emptyList();
+  }
+
+  @Override
+  public void shutdown() {
+    this.shutdown = true;
+  }
+
+  @Override
+  public boolean isTerminated() {
+    // Simplified: we don't check for any threads hanging in execute (we could
+    // introduce an atomic counter, but there seems to be no point).
+    return shutdown == true;
+  }
+
+  @Override
+  public boolean isShutdown() {
+    return shutdown == true;
+  }
+
+  @Override
+  public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+    // See comment in isTerminated();
+    return true;
+  }
+
+  private void checkShutdown() {
+    if (shutdown) {
+      throw new RejectedExecutionException("Executor is shut down.");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bb8dd7a9/lucene/core/src/java/org/apache/lucene/util/bkd/BKDWriter.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/util/bkd/BKDWriter.java b/lucene/core/src/java/org/apache/lucene/util/bkd/BKDWriter.java
index 93e42c8..ffef970 100644
--- a/lucene/core/src/java/org/apache/lucene/util/bkd/BKDWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/util/bkd/BKDWriter.java
@@ -884,7 +884,7 @@ public class BKDWriter implements Closeable {
         };
       }
 
-      OfflineSorter sorter = new OfflineSorter(tempDir, tempFileNamePrefix + "_bkd" + dim, cmp, offlineSorterBufferMB, offlineSorterMaxTempFiles, bytesPerDoc) {
+      OfflineSorter sorter = new OfflineSorter(tempDir, tempFileNamePrefix + "_bkd" + dim, cmp, offlineSorterBufferMB, offlineSorterMaxTempFiles, bytesPerDoc, null, 0) {
 
           /** We write/read fixed-byte-width file that {@link OfflinePointReader} can read. */
           @Override

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bb8dd7a9/lucene/core/src/test/org/apache/lucene/util/TestOfflineSorter.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/test/org/apache/lucene/util/TestOfflineSorter.java b/lucene/core/src/test/org/apache/lucene/util/TestOfflineSorter.java
index 839f103..68ac0a2 100644
--- a/lucene/core/src/test/org/apache/lucene/util/TestOfflineSorter.java
+++ b/lucene/core/src/test/org/apache/lucene/util/TestOfflineSorter.java
@@ -24,6 +24,10 @@ import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.lucene.codecs.CodecUtil;
@@ -73,11 +77,25 @@ public class TestOfflineSorter extends LuceneTestCase {
     }
   }
 
+  private ExecutorService randomExecutorServiceOrNull() {
+    if (random().nextBoolean()) {
+      return null;
+    } else {
+      return new ThreadPoolExecutor(1, TestUtil.nextInt(random(), 2, 6), Long.MAX_VALUE, TimeUnit.MILLISECONDS,
+                                    new LinkedBlockingQueue<Runnable>(),
+                                    new NamedThreadFactory("TestIndexSearcher"));
+    }
+  }
+
   public void testIntermediateMerges() throws Exception {
     // Sort 20 mb worth of data with 1mb buffer, binary merging.
     try (Directory dir = newDirectory()) {
-      SortInfo info = checkSort(dir, new OfflineSorter(dir, "foo", OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(1), 2, -1), 
+      ExecutorService exec = randomExecutorServiceOrNull();
+      SortInfo info = checkSort(dir, new OfflineSorter(dir, "foo", OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(1), 2, -1, exec, TestUtil.nextInt(random(), 1, 4)),
                                 generateRandom((int)OfflineSorter.MB * 20));
+      if (exec != null) {
+        exec.shutdownNow();
+      }
       assertTrue(info.mergeRounds > 10);
     }
   }
@@ -85,8 +103,12 @@ public class TestOfflineSorter extends LuceneTestCase {
   public void testSmallRandom() throws Exception {
     // Sort 20 mb worth of data with 1mb buffer.
     try (Directory dir = newDirectory()) {
-      SortInfo sortInfo = checkSort(dir, new OfflineSorter(dir, "foo", OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(1), OfflineSorter.MAX_TEMPFILES, -1),
+      ExecutorService exec = randomExecutorServiceOrNull();
+      SortInfo sortInfo = checkSort(dir, new OfflineSorter(dir, "foo", OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(1), OfflineSorter.MAX_TEMPFILES, -1, exec, TestUtil.nextInt(random(), 1, 4)),
                                     generateRandom((int)OfflineSorter.MB * 20));
+      if (exec != null) {
+        exec.shutdownNow();
+      }
       assertEquals(3, sortInfo.mergeRounds);
     }
   }
@@ -95,8 +117,12 @@ public class TestOfflineSorter extends LuceneTestCase {
   public void testLargerRandom() throws Exception {
     // Sort 100MB worth of data with 15mb buffer.
     try (Directory dir = newFSDirectory(createTempDir())) {
-      checkSort(dir, new OfflineSorter(dir, "foo", OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(16), OfflineSorter.MAX_TEMPFILES, -1), 
+      ExecutorService exec = randomExecutorServiceOrNull();
+      checkSort(dir, new OfflineSorter(dir, "foo", OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(16), OfflineSorter.MAX_TEMPFILES, -1, exec, TestUtil.nextInt(random(), 1, 4)),
                 generateRandom((int)OfflineSorter.MB * 100));
+      if (exec != null) {
+        exec.shutdownNow();
+      }
     }
   }
 
@@ -358,7 +384,7 @@ public class TestOfflineSorter extends LuceneTestCase {
       writeAll(unsorted, generateFixed((int) (OfflineSorter.MB * 3)));
 
       CorruptIndexException e = expectThrows(CorruptIndexException.class, () -> {
-          new OfflineSorter(dir, "foo", OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(1), 10, -1).sort(unsorted.getName());
+          new OfflineSorter(dir, "foo", OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(1), 10, -1, null, 0).sort(unsorted.getName());
         });
       assertTrue(e.getMessage().contains("checksum failed (hardware problem?)"));
     }
@@ -408,7 +434,7 @@ public class TestOfflineSorter extends LuceneTestCase {
       writeAll(unsorted, generateFixed((int) (OfflineSorter.MB * 3)));
 
       EOFException e = expectThrows(EOFException.class, () -> {
-          new OfflineSorter(dir, "foo", OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(1), 10, -1).sort(unsorted.getName());
+          new OfflineSorter(dir, "foo", OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(1), 10, -1, null, 0).sort(unsorted.getName());
         });
       assertEquals(1, e.getSuppressed().length);
       assertTrue(e.getSuppressed()[0] instanceof CorruptIndexException);
@@ -430,8 +456,12 @@ public class TestOfflineSorter extends LuceneTestCase {
       CodecUtil.writeFooter(out);
     }
 
-    OfflineSorter sorter = new OfflineSorter(dir, "foo", OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(4), OfflineSorter.MAX_TEMPFILES, Integer.BYTES);
+    ExecutorService exec = randomExecutorServiceOrNull();
+    OfflineSorter sorter = new OfflineSorter(dir, "foo", OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(4), OfflineSorter.MAX_TEMPFILES, Integer.BYTES, exec, TestUtil.nextInt(random(), 1, 4));
     sorter.sort(out.getName());
+    if (exec != null) {
+      exec.shutdownNow();
+    }
     // 1 MB of ints with 4 MH heap allowed should have been sorted in a single heap partition:
     assertEquals(0, sorter.sortInfo.mergeRounds);
     dir.close();
@@ -448,7 +478,7 @@ public class TestOfflineSorter extends LuceneTestCase {
       CodecUtil.writeFooter(out);
     }
 
-    OfflineSorter sorter = new OfflineSorter(dir, "foo", OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(4), OfflineSorter.MAX_TEMPFILES, Long.BYTES);
+    OfflineSorter sorter = new OfflineSorter(dir, "foo", OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(4), OfflineSorter.MAX_TEMPFILES, Long.BYTES, null, 0);
     IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> {
       sorter.sort(out.getName());
       });
@@ -467,7 +497,7 @@ public class TestOfflineSorter extends LuceneTestCase {
       CodecUtil.writeFooter(out);
     }
 
-    new OfflineSorter(dir, "foo", OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(4), OfflineSorter.MAX_TEMPFILES, Integer.BYTES) {
+    new OfflineSorter(dir, "foo", OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(4), OfflineSorter.MAX_TEMPFILES, Integer.BYTES, null, 0) {
       @Override
       protected ByteSequencesReader getReader(ChecksumIndexInput in, String name) throws IOException {
         ByteSequencesReader other = super.getReader(in, name);
@@ -502,13 +532,13 @@ public class TestOfflineSorter extends LuceneTestCase {
     e = expectThrows(IllegalArgumentException.class,
                      () -> {
                        new OfflineSorter(null, "foo", OfflineSorter.DEFAULT_COMPARATOR,
-                                         BufferSize.megabytes(1), OfflineSorter.MAX_TEMPFILES, 0);
+                                         BufferSize.megabytes(1), OfflineSorter.MAX_TEMPFILES, 0, null, 0);
                      });
     assertEquals("valueLength must be 1 .. 32767; got: 0", e.getMessage());
     e = expectThrows(IllegalArgumentException.class,
                      () -> {
                        new OfflineSorter(null, "foo", OfflineSorter.DEFAULT_COMPARATOR,
-                                         BufferSize.megabytes(1), OfflineSorter.MAX_TEMPFILES, Integer.MAX_VALUE);
+                                         BufferSize.megabytes(1), OfflineSorter.MAX_TEMPFILES, Integer.MAX_VALUE, null, 0);
                      });
     assertEquals("valueLength must be 1 .. 32767; got: 2147483647", e.getMessage());
   }