You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ho...@apache.org on 2016/03/21 01:43:48 UTC

[26/50] lucene-solr:jira/SOLR-445: LUCENE-7101: OfflineSorter had O(N^2) merge cost, and used too many temporary file descriptors, for large sorts

LUCENE-7101: OfflineSorter had O(N^2) merge cost, and used too many temporary file descriptors, for large sorts


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/82c06190
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/82c06190
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/82c06190

Branch: refs/heads/jira/SOLR-445
Commit: 82c06190a35a8159288c2fb48d8d38d6d81dbbf2
Parents: 5801caa
Author: Mike McCandless <mi...@apache.org>
Authored: Tue Mar 15 07:36:02 2016 -0400
Committer: Mike McCandless <mi...@apache.org>
Committed: Tue Mar 15 07:36:02 2016 -0400

----------------------------------------------------------------------
 lucene/CHANGES.txt                              |  3 ++
 .../org/apache/lucene/util/OfflineSorter.java   | 44 ++++++++++++++------
 .../org/apache/lucene/util/bkd/BKDWriter.java   | 14 +++----
 .../org/apache/lucene/index/Test2BPoints.java   | 21 +---------
 .../apache/lucene/util/TestOfflineSorter.java   |  2 +-
 .../apache/lucene/util/bkd/Test2BBKDPoints.java |  4 +-
 6 files changed, 44 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/82c06190/lucene/CHANGES.txt
----------------------------------------------------------------------
diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt
index db08eb3..61737d9 100644
--- a/lucene/CHANGES.txt
+++ b/lucene/CHANGES.txt
@@ -196,6 +196,9 @@ Bug Fixes
   On top of that with score mode average, the explain would fail with a NPE.
   (Martijn van Groningen)
 
+* LUCENE-7101: OfflineSorter had O(N^2) merge cost, and used too many
+  temporary file descriptors, for large sorts (Mike McCandless)
+
 Other
 
 * LUCENE-7035: Upgrade icu4j to 56.1/unicode 8. (Robert Muir)

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/82c06190/lucene/core/src/java/org/apache/lucene/util/OfflineSorter.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/util/OfflineSorter.java b/lucene/core/src/java/org/apache/lucene/util/OfflineSorter.java
index 18e421b..3a22e33 100644
--- a/lucene/core/src/java/org/apache/lucene/util/OfflineSorter.java
+++ b/lucene/core/src/java/org/apache/lucene/util/OfflineSorter.java
@@ -64,7 +64,7 @@ public class OfflineSorter {
   /**
    * Maximum number of temporary files before doing an intermediate merge.
    */
-  public final static int MAX_TEMPFILES = 128;
+  public final static int MAX_TEMPFILES = 10;
 
   private final Directory dir;
 
@@ -232,6 +232,7 @@ public class OfflineSorter {
     sortInfo.totalTime = System.currentTimeMillis();
 
     List<String> segments = new ArrayList<>();
+    int[] levelCounts = new int[1];
 
     // So we can remove any partially written temp files on exception:
     TrackingDirectoryWrapper trackingDir = new TrackingDirectoryWrapper(dir);
@@ -244,15 +245,26 @@ public class OfflineSorter {
         segments.add(sortPartition(trackingDir));
         sortInfo.tempMergeFiles++;
         sortInfo.lineCount += lineCount;
+        levelCounts[0]++;
 
-        // Handle intermediate merges.
-        if (segments.size() == maxTempFiles) {
+        // Handle intermediate merges; we need a while loop to "cascade" the merge when necessary:
+        int mergeLevel = 0;
+        while (levelCounts[mergeLevel] == maxTempFiles) {
           mergePartitions(trackingDir, segments);
+          if (mergeLevel+2 > levelCounts.length) {
+            levelCounts = ArrayUtil.grow(levelCounts, mergeLevel+2);
+          }
+          levelCounts[mergeLevel+1]++;
+          levelCounts[mergeLevel] = 0;
+          mergeLevel++;
         }
       }
+      
+      // TODO: we shouldn't have to do this?  Can't we return a merged reader to
+      // the caller, who often consumes the result just once, instead?
 
-      // Merge the partitions to the output file with a priority queue.
-      if (segments.size() > 1) {     
+      // Merge all partitions down to 1 (basically a forceMerge(1)):
+      while (segments.size() > 1) {     
         mergePartitions(trackingDir, segments);
       }
 
@@ -304,19 +316,25 @@ public class OfflineSorter {
     }
   }
 
-  /** Merge a list of sorted temporary files (partitions) into an output file.  Note that this closes the
-   *  incoming {@link IndexOutput}. */
+  /** Merge the most recent {@code maxTempFile} partitions into a new partition. */
   void mergePartitions(Directory trackingDir, List<String> segments) throws IOException {
     long start = System.currentTimeMillis();
 
-    PriorityQueue<FileAndTop> queue = new PriorityQueue<FileAndTop>(segments.size()) {
+    List<String> segmentsToMerge;
+    if (segments.size() > maxTempFiles) {
+      segmentsToMerge = segments.subList(segments.size() - maxTempFiles, segments.size());
+    } else {
+      segmentsToMerge = segments;
+    }
+
+    PriorityQueue<FileAndTop> queue = new PriorityQueue<FileAndTop>(segmentsToMerge.size()) {
       @Override
       protected boolean lessThan(FileAndTop a, FileAndTop b) {
         return comparator.compare(a.current.get(), b.current.get()) < 0;
       }
     };
 
-    ByteSequencesReader[] streams = new ByteSequencesReader[segments.size()];
+    ByteSequencesReader[] streams = new ByteSequencesReader[segmentsToMerge.size()];
 
     String newSegmentName = null;
 
@@ -326,8 +344,8 @@ public class OfflineSorter {
       newSegmentName = out.getName();
       
       // Open streams and read the top for each file
-      for (int i = 0; i < segments.size(); i++) {
-        streams[i] = getReader(dir.openInput(segments.get(i), IOContext.READONCE));
+      for (int i = 0; i < segmentsToMerge.size(); i++) {
+        streams[i] = getReader(dir.openInput(segmentsToMerge.get(i), IOContext.READONCE));
         BytesRefBuilder bytes = new BytesRefBuilder();
         boolean result = streams[i].read(bytes);
         assert result;
@@ -354,9 +372,9 @@ public class OfflineSorter {
       IOUtils.close(streams);
     }
 
-    IOUtils.deleteFiles(trackingDir, segments);
+    IOUtils.deleteFiles(trackingDir, segmentsToMerge);
 
-    segments.clear();
+    segmentsToMerge.clear();
     segments.add(newSegmentName);
 
     sortInfo.tempMergeFiles++;

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/82c06190/lucene/core/src/java/org/apache/lucene/util/bkd/BKDWriter.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/util/bkd/BKDWriter.java b/lucene/core/src/java/org/apache/lucene/util/bkd/BKDWriter.java
index c5cdc30..10d97e3 100644
--- a/lucene/core/src/java/org/apache/lucene/util/bkd/BKDWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/util/bkd/BKDWriter.java
@@ -212,11 +212,11 @@ public class BKDWriter implements Closeable {
     }
   }
 
-  /** If the current segment has too many points then we switchover to temp files / offline sort. */
-  private void switchToOffline() throws IOException {
+  /** If the current segment has too many points then we spill over to temp files / offline sort. */
+  private void spillToOffline() throws IOException {
 
     // For each .add we just append to this input file, then in .finish we sort this input and resursively build the tree:
-    offlinePointWriter = new OfflinePointWriter(tempDir, tempFileNamePrefix, packedBytesLength, longOrds, "switch");
+    offlinePointWriter = new OfflinePointWriter(tempDir, tempFileNamePrefix, packedBytesLength, longOrds, "spill");
     tempInput = offlinePointWriter.out;
     PointReader reader = heapPointWriter.getReader(0);
     for(int i=0;i<pointCount;i++) {
@@ -235,7 +235,7 @@ public class BKDWriter implements Closeable {
 
     if (pointCount >= maxPointsSortInHeap) {
       if (offlinePointWriter == null) {
-        switchToOffline();
+        spillToOffline();
       }
       offlinePointWriter.append(packedValue, pointCount, docID);
     } else {
@@ -733,7 +733,7 @@ public class BKDWriter implements Closeable {
       // TODO: this is sort of sneaky way to get the final OfflinePointWriter from OfflineSorter:
       IndexOutput[] lastWriter = new IndexOutput[1];
 
-      OfflineSorter sorter = new OfflineSorter(tempDir, tempFileNamePrefix, cmp, OfflineSorter.BufferSize.megabytes(Math.max(1, (long) maxMBSortInHeap)), OfflineSorter.MAX_TEMPFILES) {
+      OfflineSorter sorter = new OfflineSorter(tempDir, tempFileNamePrefix + "_bkd" + dim, cmp, OfflineSorter.BufferSize.megabytes(Math.max(1, (long) maxMBSortInHeap)), OfflineSorter.MAX_TEMPFILES) {
 
           /** We write/read fixed-byte-width file that {@link OfflinePointReader} can read. */
           @Override
@@ -742,9 +742,7 @@ public class BKDWriter implements Closeable {
             return new ByteSequencesWriter(out) {
               @Override
               public void write(byte[] bytes, int off, int len) throws IOException {
-                if (len != bytesPerDoc) {
-                  throw new IllegalArgumentException("len=" + len + " bytesPerDoc=" + bytesPerDoc);
-                }
+                assert len == bytesPerDoc: "len=" + len + " bytesPerDoc=" + bytesPerDoc;
                 out.writeBytes(bytes, off, len);
               }
             };

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/82c06190/lucene/core/src/test/org/apache/lucene/index/Test2BPoints.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/test/org/apache/lucene/index/Test2BPoints.java b/lucene/core/src/test/org/apache/lucene/index/Test2BPoints.java
index 5766b91..f15ba19 100644
--- a/lucene/core/src/test/org/apache/lucene/index/Test2BPoints.java
+++ b/lucene/core/src/test/org/apache/lucene/index/Test2BPoints.java
@@ -49,7 +49,6 @@ import com.carrotsearch.randomizedtesting.annotations.TimeoutSuite;
 public class Test2BPoints extends LuceneTestCase {
   public void test1D() throws Exception {
     Directory dir = FSDirectory.open(createTempDir("2BPoints1D"));
-    System.out.println("DIR: " + ((FSDirectory) dir).getDirectory());
 
     IndexWriterConfig iwc = new IndexWriterConfig(new MockAnalyzer(random()))
       .setCodec(getCodec())
@@ -144,24 +143,6 @@ public class Test2BPoints extends LuceneTestCase {
   }
 
   private static Codec getCodec() {
-
-    return new FilterCodec("Lucene60", Codec.forName("Lucene60")) {
-      @Override
-      public PointsFormat pointsFormat() {
-        return new PointsFormat() {
-          @Override
-          public PointsWriter fieldsWriter(SegmentWriteState writeState) throws IOException {
-            int maxPointsInLeafNode = 1024;
-            double maxMBSortInHeap = 256.0;
-            return new Lucene60PointsWriter(writeState, maxPointsInLeafNode, maxMBSortInHeap);
-          }
-
-          @Override
-          public PointsReader fieldsReader(SegmentReadState readState) throws IOException {
-            return new Lucene60PointsReader(readState);
-          }
-        };
-      }
-    };
+    return Codec.forName("Lucene60");
   }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/82c06190/lucene/core/src/test/org/apache/lucene/util/TestOfflineSorter.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/test/org/apache/lucene/util/TestOfflineSorter.java b/lucene/core/src/test/org/apache/lucene/util/TestOfflineSorter.java
index 5322531..b45a3bb 100644
--- a/lucene/core/src/test/org/apache/lucene/util/TestOfflineSorter.java
+++ b/lucene/core/src/test/org/apache/lucene/util/TestOfflineSorter.java
@@ -82,7 +82,7 @@ public class TestOfflineSorter extends LuceneTestCase {
     try (Directory dir = newDirectory()) {
       SortInfo sortInfo = checkSort(dir, new OfflineSorter(dir, "foo", OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(1), OfflineSorter.MAX_TEMPFILES),
                                     generateRandom((int)OfflineSorter.MB * 20));
-      assertEquals(1, sortInfo.mergeRounds);
+      assertEquals(3, sortInfo.mergeRounds);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/82c06190/lucene/core/src/test/org/apache/lucene/util/bkd/Test2BBKDPoints.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/test/org/apache/lucene/util/bkd/Test2BBKDPoints.java b/lucene/core/src/test/org/apache/lucene/util/bkd/Test2BBKDPoints.java
index 4df2ebe..cfb98b0 100644
--- a/lucene/core/src/test/org/apache/lucene/util/bkd/Test2BBKDPoints.java
+++ b/lucene/core/src/test/org/apache/lucene/util/bkd/Test2BBKDPoints.java
@@ -55,7 +55,7 @@ public class Test2BBKDPoints extends LuceneTestCase {
 
     final int numDocs = (Integer.MAX_VALUE / 26) + 100;
 
-    BKDWriter w = new BKDWriter(numDocs, dir, "_0", 1, Long.BYTES, 1024, 256, 26L * numDocs);
+    BKDWriter w = new BKDWriter(numDocs, dir, "_0", 1, Long.BYTES, 26L * numDocs);
     int counter = 0;
     byte[] packedBytes = new byte[Long.BYTES];
     for (int docID = 0; docID < numDocs; docID++) {
@@ -88,7 +88,7 @@ public class Test2BBKDPoints extends LuceneTestCase {
 
     final int numDocs = (Integer.MAX_VALUE / 26) + 100;
 
-    BKDWriter w = new BKDWriter(numDocs, dir, "_0", 2, Long.BYTES, 1024, 256, 26L * numDocs);
+    BKDWriter w = new BKDWriter(numDocs, dir, "_0", 2, Long.BYTES, 26L * numDocs);
     int counter = 0;
     byte[] packedBytes = new byte[2*Long.BYTES];
     for (int docID = 0; docID < numDocs; docID++) {