You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by mi...@apache.org on 2016/03/21 10:48:35 UTC

lucene-solr:master: LUCENE-7120: re-use readers while recursing in BKDWriter so we get more thorough checksum verification on its temp files

Repository: lucene-solr
Updated Branches:
  refs/heads/master 5a40ae030 -> 0e189bca2


LUCENE-7120: re-use readers while recursing in BKDWriter so we get more thorough checksum verification on its temp files


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/0e189bca
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/0e189bca
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/0e189bca

Branch: refs/heads/master
Commit: 0e189bca2d2af8bbf3dacaa27212dc9824a3b845
Parents: 5a40ae0
Author: Mike McCandless <mi...@apache.org>
Authored: Mon Mar 21 05:49:47 2016 -0400
Committer: Mike McCandless <mi...@apache.org>
Committed: Mon Mar 21 05:49:47 2016 -0400

----------------------------------------------------------------------
 .../org/apache/lucene/util/bkd/BKDWriter.java   | 47 +++++++++++---------
 .../apache/lucene/util/bkd/HeapPointWriter.java |  6 +++
 .../lucene/util/bkd/OfflinePointReader.java     |  5 +++
 .../lucene/util/bkd/OfflinePointWriter.java     | 37 ++++++++++++++-
 .../org/apache/lucene/util/bkd/PointWriter.java |  4 ++
 5 files changed, 78 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0e189bca/lucene/core/src/java/org/apache/lucene/util/bkd/BKDWriter.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/util/bkd/BKDWriter.java b/lucene/core/src/java/org/apache/lucene/util/bkd/BKDWriter.java
index 488fa43..5f36155 100644
--- a/lucene/core/src/java/org/apache/lucene/util/bkd/BKDWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/util/bkd/BKDWriter.java
@@ -227,7 +227,7 @@ public class BKDWriter implements Closeable {
   private void spillToOffline() throws IOException {
 
     // For each .add we just append to this input file, then in .finish we sort this input and resursively build the tree:
-    offlinePointWriter = new OfflinePointWriter(tempDir, tempFileNamePrefix, packedBytesLength, longOrds, "spill", singleValuePerDoc);
+    offlinePointWriter = new OfflinePointWriter(tempDir, tempFileNamePrefix, packedBytesLength, longOrds, "spill", 0, singleValuePerDoc);
     tempInput = offlinePointWriter.out;
     PointReader reader = heapPointWriter.getReader(0, pointCount);
     for(int i=0;i<pointCount;i++) {
@@ -851,6 +851,9 @@ public class BKDWriter implements Closeable {
     // Sort all docs once by each dimension:
     PathSlice[] sortedPointWriters = new PathSlice[numDims];
 
+    // This is only used on exception; on normal code paths we close all files we opened:
+    List<Closeable> toCloseHeroically = new ArrayList<>();
+
     boolean success = false;
     try {
       //long t0 = System.nanoTime();
@@ -872,7 +875,8 @@ public class BKDWriter implements Closeable {
             ordBitSet, out,
             minPackedValue, maxPackedValue,
             splitPackedValues,
-            leafBlockFPs);
+            leafBlockFPs,
+            toCloseHeroically);
 
       for(PathSlice slice : sortedPointWriters) {
         slice.writer.destroy();
@@ -886,11 +890,8 @@ public class BKDWriter implements Closeable {
       success = true;
     } finally {
       if (success == false) {
-        if (tempInput != null) {
-          IOUtils.closeWhileHandlingException(tempInput);
-        }
         IOUtils.deleteFilesIgnoringExceptions(tempDir, tempDir.getCreatedFiles());
-        tempInput = null;
+        IOUtils.closeWhileHandlingException(toCloseHeroically);
       }
     }
 
@@ -1010,6 +1011,8 @@ public class BKDWriter implements Closeable {
     // Now we mark ords that fall into the right half, so we can partition on all other dims that are not the split dim:
 
     // Read the split value, then mark all ords in the right tree (larger than the split value):
+
+    // TODO: find a way to also checksum this reader?  If we changed to markLeftTree, and scanned the final chunk, it could work?
     try (PointReader reader = source.writer.getReader(source.start + source.count - rightCount, rightCount)) {
       boolean result = reader.next();
       assert result;
@@ -1069,12 +1072,11 @@ public class BKDWriter implements Closeable {
   }
 
   /** Pull a partition back into heap once the point count is low enough while recursing. */
-  private PathSlice switchToHeap(PathSlice source) throws IOException {
+  private PathSlice switchToHeap(PathSlice source, List<Closeable> toCloseHeroically) throws IOException {
     int count = Math.toIntExact(source.count);
-    try (
-       PointWriter writer = new HeapPointWriter(count, count, packedBytesLength, longOrds, singleValuePerDoc);
-       PointReader reader = source.writer.getReader(source.start, source.count);
-       ) {
+    // Not inside the try because we don't want to close it here:
+    PointReader reader = source.writer.getSharedReader(source.start, source.count, toCloseHeroically);
+    try (PointWriter writer = new HeapPointWriter(count, count, packedBytesLength, longOrds, singleValuePerDoc)) {
       for(int i=0;i<count;i++) {
         boolean hasNext = reader.next();
         assert hasNext;
@@ -1096,7 +1098,8 @@ public class BKDWriter implements Closeable {
                      IndexOutput out,
                      byte[] minPackedValue, byte[] maxPackedValue,
                      byte[] splitPackedValues,
-                     long[] leafBlockFPs) throws IOException {
+                     long[] leafBlockFPs,
+                     List<Closeable> toCloseHeroically) throws IOException {
 
     for(PathSlice slice : slices) {
       assert slice.count == slices[0].count;
@@ -1104,7 +1107,7 @@ public class BKDWriter implements Closeable {
 
     if (numDims == 1 && slices[0].writer instanceof OfflinePointWriter && slices[0].count <= maxPointsSortInHeap) {
       // Special case for 1D, to cutover to heap once we recurse deeply enough:
-      slices[0] = switchToHeap(slices[0]);
+      slices[0] = switchToHeap(slices[0], toCloseHeroically);
     }
 
     if (nodeID >= leafNodeOffset) {
@@ -1114,7 +1117,7 @@ public class BKDWriter implements Closeable {
         if (slices[dim].writer instanceof HeapPointWriter == false) {
           // Adversarial cases can cause this, e.g. very lopsided data, all equal points, such that we started
           // offline, but then kept splitting only in one dimension, and so never had to rewrite into heap writer
-          slices[dim] = switchToHeap(slices[dim]);
+          slices[dim] = switchToHeap(slices[dim], toCloseHeroically);
         }
 
         PathSlice source = slices[dim];
@@ -1212,7 +1215,8 @@ public class BKDWriter implements Closeable {
       for(int dim=0;dim<numDims;dim++) {
 
         if (dim == splitDim) {
-          // No need to partition on this dim since it's a simple slice of the incoming already sorted slice.
+          // No need to partition on this dim since it's a simple slice of the incoming already sorted slice, and we
+          // will re-use its shared reader when visiting it as we recurse:
           leftSlices[dim] = new PathSlice(source.writer, source.start, leftCount);
           rightSlices[dim] = new PathSlice(source.writer, source.start + leftCount, rightCount);
           System.arraycopy(splitValue, 0, minSplitPackedValue, dim*bytesPerDim, bytesPerDim);
@@ -1220,9 +1224,12 @@ public class BKDWriter implements Closeable {
           continue;
         }
 
+        // Not inside the try because we don't want to close this one now, so that after recursion is done,
+        // we will have done a singel full sweep of the file:
+        PointReader reader = slices[dim].writer.getSharedReader(slices[dim].start, slices[dim].count, toCloseHeroically);
+
         try (PointWriter leftPointWriter = getPointWriter(leftCount, "left" + dim);
-             PointWriter rightPointWriter = getPointWriter(source.count - leftCount, "right" + dim);
-             PointReader reader = slices[dim].writer.getReader(slices[dim].start, slices[dim].count);) {
+             PointWriter rightPointWriter = getPointWriter(source.count - leftCount, "right" + dim)) {
 
           long nextRightCount = reader.split(source.count, ordBitSet, leftPointWriter, rightPointWriter, dim == dimToClear);
           if (rightCount != nextRightCount) {
@@ -1240,7 +1247,7 @@ public class BKDWriter implements Closeable {
       build(2*nodeID, leafNodeOffset, leftSlices,
             ordBitSet, out,
             minPackedValue, maxSplitPackedValue,
-            splitPackedValues, leafBlockFPs);
+            splitPackedValues, leafBlockFPs, toCloseHeroically);
       for(int dim=0;dim<numDims;dim++) {
         // Don't destroy the dim we split on because we just re-used what our caller above gave us for that dim:
         if (dim != splitDim) {
@@ -1253,7 +1260,7 @@ public class BKDWriter implements Closeable {
       build(2*nodeID+1, leafNodeOffset, rightSlices,
             ordBitSet, out,
             minSplitPackedValue, maxPackedValue,
-            splitPackedValues, leafBlockFPs);
+            splitPackedValues, leafBlockFPs, toCloseHeroically);
       for(int dim=0;dim<numDims;dim++) {
         // Don't destroy the dim we split on because we just re-used what our caller above gave us for that dim:
         if (dim != splitDim) {
@@ -1277,7 +1284,7 @@ public class BKDWriter implements Closeable {
       int size = Math.toIntExact(count);
       return new HeapPointWriter(size, size, packedBytesLength, longOrds, singleValuePerDoc);
     } else {
-      return new OfflinePointWriter(tempDir, tempFileNamePrefix, packedBytesLength, longOrds, desc, singleValuePerDoc);
+      return new OfflinePointWriter(tempDir, tempFileNamePrefix, packedBytesLength, longOrds, desc, count, singleValuePerDoc);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0e189bca/lucene/core/src/java/org/apache/lucene/util/bkd/HeapPointWriter.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/util/bkd/HeapPointWriter.java b/lucene/core/src/java/org/apache/lucene/util/bkd/HeapPointWriter.java
index 1bcf836..470f585 100644
--- a/lucene/core/src/java/org/apache/lucene/util/bkd/HeapPointWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/util/bkd/HeapPointWriter.java
@@ -16,6 +16,7 @@
  */
 package org.apache.lucene.util.bkd;
 
+import java.io.Closeable;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -154,6 +155,11 @@ final class HeapPointWriter implements PointWriter {
   }
 
   @Override
+  public PointReader getSharedReader(long start, long length, List<Closeable> toCloseHeroically) {
+    return new HeapPointReader(blocks, valuesPerBlock, packedBytesLength, ords, ordsLong, docIDs, (int) start, nextWrite, singleValuePerDoc);
+  }
+
+  @Override
   public void close() {
     closed = true;
   }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0e189bca/lucene/core/src/java/org/apache/lucene/util/bkd/OfflinePointReader.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/util/bkd/OfflinePointReader.java b/lucene/core/src/java/org/apache/lucene/util/bkd/OfflinePointReader.java
index 18afda4..ebb2fd1 100644
--- a/lucene/core/src/java/org/apache/lucene/util/bkd/OfflinePointReader.java
+++ b/lucene/core/src/java/org/apache/lucene/util/bkd/OfflinePointReader.java
@@ -39,6 +39,9 @@ final class OfflinePointReader extends PointReader {
   private boolean longOrds;
   private boolean checked;
 
+  // File name we are reading
+  final String name;
+
   OfflinePointReader(Directory tempDir, String tempFileName, int packedBytesLength, long start, long length,
                      boolean longOrds, boolean singleValuePerDoc) throws IOException {
     this.singleValuePerDoc = singleValuePerDoc;
@@ -67,6 +70,7 @@ final class OfflinePointReader extends PointReader {
       // at another level of the BKDWriter recursion
       in = tempDir.openInput(tempFileName, IOContext.READONCE);
     }
+    name = tempFileName;
 
     long seekFP = start * bytesPerDoc;
     in.seek(seekFP);
@@ -121,6 +125,7 @@ final class OfflinePointReader extends PointReader {
   public void close() throws IOException {
     try {
       if (countLeft == 0 && in instanceof ChecksumIndexInput && checked == false) {
+        //System.out.println("NOW CHECK: " + name);
         checked = true;
         CodecUtil.checkFooter((ChecksumIndexInput) in);
       }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0e189bca/lucene/core/src/java/org/apache/lucene/util/bkd/OfflinePointWriter.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/util/bkd/OfflinePointWriter.java b/lucene/core/src/java/org/apache/lucene/util/bkd/OfflinePointWriter.java
index 5314876..2261b81 100644
--- a/lucene/core/src/java/org/apache/lucene/util/bkd/OfflinePointWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/util/bkd/OfflinePointWriter.java
@@ -16,9 +16,12 @@
  */
 package org.apache.lucene.util.bkd;
 
+import java.io.Closeable;
 import java.io.IOException;
+import java.util.List;
 
 import org.apache.lucene.codecs.CodecUtil;
+import org.apache.lucene.store.ChecksumIndexInput;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.IOContext;
 import org.apache.lucene.store.IndexOutput;
@@ -34,13 +37,19 @@ final class OfflinePointWriter implements PointWriter {
   private boolean closed;
   // true if ords are written as long (8 bytes), else 4 bytes
   private boolean longOrds;
+  private OfflinePointReader sharedReader;
+  private long nextSharedRead;
+  final long expectedCount;
 
-  public OfflinePointWriter(Directory tempDir, String tempFileNamePrefix, int packedBytesLength, boolean longOrds, String desc, boolean singleValuePerDoc) throws IOException {
+  /** Create a new writer with an unknown number of incoming points */
+  public OfflinePointWriter(Directory tempDir, String tempFileNamePrefix, int packedBytesLength,
+                            boolean longOrds, String desc, long expectedCount, boolean singleValuePerDoc) throws IOException {
     this.out = tempDir.createTempOutput(tempFileNamePrefix, "bkd_" + desc, IOContext.DEFAULT);
     this.tempDir = tempDir;
     this.packedBytesLength = packedBytesLength;
     this.longOrds = longOrds;
     this.singleValuePerDoc = singleValuePerDoc;
+    this.expectedCount = expectedCount;
   }
 
   /** Initializes on an already written/closed file, just so consumers can use {@link #getReader} to read the file. */
@@ -52,6 +61,7 @@ final class OfflinePointWriter implements PointWriter {
     closed = true;
     this.longOrds = longOrds;
     this.singleValuePerDoc = singleValuePerDoc;
+    this.expectedCount = 0;
   }
     
   @Override
@@ -68,18 +78,37 @@ final class OfflinePointWriter implements PointWriter {
     }
     out.writeInt(docID);
     count++;
+    assert expectedCount == 0 || count <= expectedCount;
   }
 
   @Override
   public PointReader getReader(long start, long length) throws IOException {
     assert closed;
     assert start + length <= count: "start=" + start + " length=" + length + " count=" + count;
+    assert expectedCount == 0 || count == expectedCount;
     return new OfflinePointReader(tempDir, out.getName(), packedBytesLength, start, length, longOrds, singleValuePerDoc);
   }
 
   @Override
+  public PointReader getSharedReader(long start, long length, List<Closeable> toCloseHeroically) throws IOException {
+    if (sharedReader == null) {
+      assert start == 0;
+      assert length <= count;
+      sharedReader = new OfflinePointReader(tempDir, out.getName(), packedBytesLength, 0, count, longOrds, singleValuePerDoc);
+      toCloseHeroically.add(sharedReader);
+      // Make sure the OfflinePointReader intends to verify its checksum:
+      assert sharedReader.in instanceof ChecksumIndexInput;
+    } else {
+      assert start == nextSharedRead: "start=" + start + " length=" + length + " nextSharedRead=" + nextSharedRead;
+    }
+    nextSharedRead += length;
+    return sharedReader;
+  }
+
+  @Override
   public void close() throws IOException {
     if (closed == false) {
+      assert sharedReader == null;
       try {
         CodecUtil.writeFooter(out);
       } finally {
@@ -91,6 +120,12 @@ final class OfflinePointWriter implements PointWriter {
 
   @Override
   public void destroy() throws IOException {
+    if (sharedReader != null) {
+      // At this point, the shared reader should have done a full sweep of the file:
+      assert nextSharedRead == count;
+      sharedReader.close();
+      sharedReader = null;
+    }
     tempDir.deleteFile(out.getName());
   }
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0e189bca/lucene/core/src/java/org/apache/lucene/util/bkd/PointWriter.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/util/bkd/PointWriter.java b/lucene/core/src/java/org/apache/lucene/util/bkd/PointWriter.java
index 2f94967..d19f6e5 100644
--- a/lucene/core/src/java/org/apache/lucene/util/bkd/PointWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/util/bkd/PointWriter.java
@@ -19,6 +19,7 @@ package org.apache.lucene.util.bkd;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.List;
 
 /** Appends many points, and then at the end provides a {@link PointReader} to iterate
  *  those points.  This abstracts away whether we write to disk, or use simple arrays
@@ -30,6 +31,9 @@ interface PointWriter extends Closeable {
   /** Returns a {@link PointReader} iterator to step through all previously added points */
   PointReader getReader(long startPoint, long length) throws IOException;
 
+  /** Returns the single shared reader, used at multiple times during the recursion, to read previously added points */
+  PointReader getSharedReader(long startPoint, long length, List<Closeable> toCloseHeroically) throws IOException;
+
   /** Removes any temp files behind this writer */
   void destroy() throws IOException;
 }