You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by jp...@apache.org on 2014/12/29 14:44:45 UTC

svn commit: r1648342 - in /lucene/dev/branches/branch_5x/lucene/backward-codecs/src: java/org/apache/lucene/codecs/lucene41/Lucene41StoredFieldsReader.java test/org/apache/lucene/codecs/lucene41/TestLucene41StoredFieldsFormat.java

Author: jpountz
Date: Mon Dec 29 13:44:45 2014
New Revision: 1648342

URL: http://svn.apache.org/r1648342
Log:
LUCENE-6142: Faster merges with Lucene41StoredFieldsFormat.

Modified:
    lucene/dev/branches/branch_5x/lucene/backward-codecs/src/java/org/apache/lucene/codecs/lucene41/Lucene41StoredFieldsReader.java
    lucene/dev/branches/branch_5x/lucene/backward-codecs/src/test/org/apache/lucene/codecs/lucene41/TestLucene41StoredFieldsFormat.java

Modified: lucene/dev/branches/branch_5x/lucene/backward-codecs/src/java/org/apache/lucene/codecs/lucene41/Lucene41StoredFieldsReader.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/lucene/backward-codecs/src/java/org/apache/lucene/codecs/lucene41/Lucene41StoredFieldsReader.java?rev=1648342&r1=1648341&r2=1648342&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/lucene/backward-codecs/src/java/org/apache/lucene/codecs/lucene41/Lucene41StoredFieldsReader.java (original)
+++ lucene/dev/branches/branch_5x/lucene/backward-codecs/src/java/org/apache/lucene/codecs/lucene41/Lucene41StoredFieldsReader.java Mon Dec 29 13:44:45 2014
@@ -20,12 +20,14 @@ package org.apache.lucene.codecs.lucene4
 import java.io.EOFException;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
 import java.util.Collections;
 
 import org.apache.lucene.codecs.CodecUtil;
 import org.apache.lucene.codecs.StoredFieldsReader;
 import org.apache.lucene.codecs.compressing.CompressionMode;
 import org.apache.lucene.codecs.compressing.Decompressor;
+import org.apache.lucene.document.Document;
 import org.apache.lucene.index.CorruptIndexException;
 import org.apache.lucene.index.FieldInfo;
 import org.apache.lucene.index.FieldInfos;
@@ -41,8 +43,10 @@ import org.apache.lucene.store.IOContext
 import org.apache.lucene.store.IndexInput;
 import org.apache.lucene.util.Accountable;
 import org.apache.lucene.util.Accountables;
+import org.apache.lucene.util.ArrayUtil;
 import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.IOUtils;
+import org.apache.lucene.util.IntsRef;
 import org.apache.lucene.util.packed.PackedInts;
 
 /**
@@ -88,12 +92,13 @@ final class Lucene41StoredFieldsReader e
   private final int packedIntsVersion;
   private final CompressionMode compressionMode;
   private final Decompressor decompressor;
-  private final BytesRef bytes;
   private final int numDocs;
+  private final boolean merging;
+  private final BlockState state;
   private boolean closed;
 
   // used by clone
-  private Lucene41StoredFieldsReader(Lucene41StoredFieldsReader reader) {
+  private Lucene41StoredFieldsReader(Lucene41StoredFieldsReader reader, boolean merging) {
     this.version = reader.version;
     this.fieldInfos = reader.fieldInfos;
     this.fieldsStream = reader.fieldsStream.clone();
@@ -104,7 +109,8 @@ final class Lucene41StoredFieldsReader e
     this.compressionMode = reader.compressionMode;
     this.decompressor = reader.decompressor.clone();
     this.numDocs = reader.numDocs;
-    this.bytes = new BytesRef(reader.bytes.bytes.length);
+    this.merging = merging;
+    this.state = new BlockState();
     this.closed = false;
   }
 
@@ -162,7 +168,8 @@ final class Lucene41StoredFieldsReader e
       }
       packedIntsVersion = fieldsStream.readVInt();
       decompressor = compressionMode.newDecompressor();
-      this.bytes = new BytesRef();
+      this.merging = false;
+      this.state = new BlockState();
       
       if (version >= VERSION_CHECKSUM) {
         // NOTE: data file is too costly to verify checksum against all the bytes on open,
@@ -251,123 +258,250 @@ final class Lucene41StoredFieldsReader e
     }
   }
 
-  @Override
-  public void visitDocument(int docID, StoredFieldVisitor visitor)
-      throws IOException {
-    fieldsStream.seek(indexReader.getStartPointer(docID));
+  /**
+   * A serialized document, you need to decode its input in order to get an actual
+   * {@link Document}.
+   */
+  static class SerializedDocument {
 
-    final int docBase = fieldsStream.readVInt();
-    final int chunkDocs = fieldsStream.readVInt();
-    if (docID < docBase
-        || docID >= docBase + chunkDocs
-        || docBase + chunkDocs > numDocs) {
-      throw new CorruptIndexException("Corrupted: docID=" + docID
-          + ", docBase=" + docBase + ", chunkDocs=" + chunkDocs
-          + ", numDocs=" + numDocs, fieldsStream);
-    }
-
-    final int numStoredFields, offset, length, totalLength;
-    if (chunkDocs == 1) {
-      numStoredFields = fieldsStream.readVInt();
-      offset = 0;
-      length = fieldsStream.readVInt();
-      totalLength = length;
-    } else {
-      final int bitsPerStoredFields = fieldsStream.readVInt();
-      if (bitsPerStoredFields == 0) {
-        numStoredFields = fieldsStream.readVInt();
-      } else if (bitsPerStoredFields > 31) {
-        throw new CorruptIndexException("bitsPerStoredFields=" + bitsPerStoredFields, fieldsStream);
-      } else {
-        final long filePointer = fieldsStream.getFilePointer();
-        final PackedInts.Reader reader = PackedInts.getDirectReaderNoHeader(fieldsStream, PackedInts.Format.PACKED, packedIntsVersion, chunkDocs, bitsPerStoredFields);
-        numStoredFields = (int) (reader.get(docID - docBase));
-        fieldsStream.seek(filePointer + PackedInts.Format.PACKED.byteCount(packedIntsVersion, chunkDocs, bitsPerStoredFields));
-      }
+    // the serialized data
+    final DataInput in;
 
-      final int bitsPerLength = fieldsStream.readVInt();
-      if (bitsPerLength == 0) {
-        length = fieldsStream.readVInt();
-        offset = (docID - docBase) * length;
-        totalLength = chunkDocs * length;
-      } else if (bitsPerStoredFields > 31) {
-        throw new CorruptIndexException("bitsPerLength=" + bitsPerLength, fieldsStream);
-      } else {
-        final PackedInts.ReaderIterator it = PackedInts.getReaderIteratorNoHeader(fieldsStream, PackedInts.Format.PACKED, packedIntsVersion, chunkDocs, bitsPerLength, 1);
-        int off = 0;
-        for (int i = 0; i < docID - docBase; ++i) {
-          off += it.next();
-        }
-        offset = off;
-        length = (int) it.next();
-        off += length;
-        for (int i = docID - docBase + 1; i < chunkDocs; ++i) {
-          off += it.next();
-        }
-        totalLength = off;
-      }
-    }
+    // the number of bytes on which the document is encoded
+    final int length;
 
-    if ((length == 0) != (numStoredFields == 0)) {
-      throw new CorruptIndexException("length=" + length + ", numStoredFields=" + numStoredFields, fieldsStream);
+    // the number of stored fields
+    final int numStoredFields;
+
+    private SerializedDocument(DataInput in, int length, int numStoredFields) {
+      this.in = in;
+      this.length = length;
+      this.numStoredFields = numStoredFields;
     }
-    if (numStoredFields == 0) {
-      // nothing to do
-      return;
+
+  }
+
+  /**
+   * Keeps state about the current block of documents.
+   */
+  private class BlockState {
+
+    private int docBase, chunkDocs;
+
+    // whether the block has been sliced, this happens for large documents
+    private boolean sliced;
+
+    private int[] offsets = IntsRef.EMPTY_INTS;
+    private int[] numStoredFields = IntsRef.EMPTY_INTS;
+
+    // the start pointer at which you can read the compressed documents
+    private long startPointer;
+
+    private final BytesRef spare = new BytesRef();
+    private final BytesRef bytes = new BytesRef();
+
+    boolean contains(int docID) {
+      return docID >= docBase && docID < docBase + chunkDocs;
+    }
+
+    /**
+     * Reset this block so that it stores state for the block
+     * that contains the given doc id.
+     */
+    void reset(int docID) throws IOException {
+      boolean success = false;
+      try {
+        doReset(docID);
+        success = true;
+      } finally {
+        if (success == false) {
+          // if the read failed, set chunkDocs to 0 so that it does not
+          // contain any docs anymore and is not reused. This should help
+          // get consistent exceptions when trying to get several
+          // documents which are in the same corrupted block since it will
+          // force the header to be decoded again
+          chunkDocs = 0;
+        }
+      }
     }
 
-    final DataInput documentInput;
-    if (version >= VERSION_BIG_CHUNKS && totalLength >= 2 * chunkSize) {
-      assert chunkSize > 0;
-      assert offset < chunkSize;
+    private void doReset(int docID) throws IOException {
+      docBase = fieldsStream.readVInt();
+      chunkDocs = fieldsStream.readVInt();
+      if (contains(docID) == false
+          || docBase + chunkDocs > numDocs) {
+        throw new CorruptIndexException("Corrupted: docID=" + docID
+            + ", docBase=" + docBase + ", chunkDocs=" + chunkDocs
+            + ", numDocs=" + numDocs, fieldsStream);
+      }
 
-      decompressor.decompress(fieldsStream, chunkSize, offset, Math.min(length, chunkSize - offset), bytes);
-      documentInput = new DataInput() {
+      offsets = ArrayUtil.grow(offsets, chunkDocs + 1);
+      numStoredFields = ArrayUtil.grow(numStoredFields, chunkDocs);
 
-        int decompressed = bytes.length;
+      if (chunkDocs == 1) {
+        numStoredFields[0] = fieldsStream.readVInt();
+        offsets[1] = fieldsStream.readVInt();
+      } else {
+        // Number of stored fields per document
+        final int bitsPerStoredFields = fieldsStream.readVInt();
+        if (bitsPerStoredFields == 0) {
+          Arrays.fill(numStoredFields, 0, chunkDocs, fieldsStream.readVInt());
+        } else if (bitsPerStoredFields > 31) {
+          throw new CorruptIndexException("bitsPerStoredFields=" + bitsPerStoredFields, fieldsStream);
+        } else {
+          final PackedInts.ReaderIterator it = PackedInts.getReaderIteratorNoHeader(fieldsStream, PackedInts.Format.PACKED, packedIntsVersion, chunkDocs, bitsPerStoredFields, 1);
+          for (int i = 0; i < chunkDocs; ++i) {
+            numStoredFields[i] = (int) it.next();
+          }
+        }
 
-        void fillBuffer() throws IOException {
-          assert decompressed <= length;
-          if (decompressed == length) {
-            throw new EOFException();
+        // The stream encodes the length of each document and we decode
+        // it into a list of monotonically increasing offsets
+        final int bitsPerLength = fieldsStream.readVInt();
+        if (bitsPerLength == 0) {
+          final int length = fieldsStream.readVInt();
+          for (int i = 0; i < chunkDocs; ++i) {
+            offsets[1 + i] = (1 + i) * length;
+          }
+        } else if (bitsPerStoredFields > 31) {
+          throw new CorruptIndexException("bitsPerLength=" + bitsPerLength, fieldsStream);
+        } else {
+          final PackedInts.ReaderIterator it = PackedInts.getReaderIteratorNoHeader(fieldsStream, PackedInts.Format.PACKED, packedIntsVersion, chunkDocs, bitsPerLength, 1);
+          for (int i = 0; i < chunkDocs; ++i) {
+            offsets[i + 1] = (int) it.next();
+          }
+          for (int i = 0; i < chunkDocs; ++i) {
+            offsets[i + 1] += offsets[i];
           }
-          final int toDecompress = Math.min(length - decompressed, chunkSize);
-          decompressor.decompress(fieldsStream, toDecompress, 0, toDecompress, bytes);
-          decompressed += toDecompress;
         }
 
-        @Override
-        public byte readByte() throws IOException {
-          if (bytes.length == 0) {
-            fillBuffer();
+        // Additional validation: only the empty document has a serialized length of 0
+        for (int i = 0; i < chunkDocs; ++i) {
+          final int len = offsets[i + 1] - offsets[i];
+          final int storedFields = numStoredFields[i];
+          if ((len == 0) != (storedFields == 0)) {
+            throw new CorruptIndexException("length=" + len + ", numStoredFields=" + storedFields, fieldsStream);
           }
-          --bytes.length;
-          return bytes.bytes[bytes.offset++];
         }
+      }
+      sliced = version >= VERSION_BIG_CHUNKS && offsets[chunkDocs] >= 2 * chunkSize;
 
-        @Override
-        public void readBytes(byte[] b, int offset, int len) throws IOException {
-          while (len > bytes.length) {
-            System.arraycopy(bytes.bytes, bytes.offset, b, offset, bytes.length);
-            len -= bytes.length;
-            offset += bytes.length;
-            fillBuffer();
+      startPointer = fieldsStream.getFilePointer();
+
+      if (merging) {
+        final int totalLength = offsets[chunkDocs];
+        // decompress eagerly
+        if (sliced) {
+          bytes.offset = bytes.length = 0;
+          for (int decompressed = 0; decompressed < totalLength; ) {
+            final int toDecompress = Math.min(totalLength - decompressed, chunkSize);
+            decompressor.decompress(fieldsStream, toDecompress, 0, toDecompress, spare);
+            bytes.bytes = ArrayUtil.grow(bytes.bytes, bytes.length + spare.length);
+            System.arraycopy(spare.bytes, spare.offset, bytes.bytes, bytes.length, spare.length);
+            bytes.length += spare.length;
+            decompressed += toDecompress;
           }
-          System.arraycopy(bytes.bytes, bytes.offset, b, offset, len);
-          bytes.offset += len;
-          bytes.length -= len;
+        } else {
+          decompressor.decompress(fieldsStream, totalLength, 0, totalLength, bytes);
+        }
+        if (bytes.length != totalLength) {
+          throw new CorruptIndexException("Corrupted: expected chunk size = " + totalLength + ", got " + bytes.length, fieldsStream);
         }
+      }
+    }
+
+    /**
+     * Get the serialized representation of the given docID. This docID has
+     * to be contained in the current block.
+     */
+    SerializedDocument document(int docID) throws IOException {
+      if (contains(docID) == false) {
+        throw new IllegalArgumentException();
+      }
+
+      final int index = docID - docBase;
+      final int offset = offsets[index];
+      final int length = offsets[index+1] - offset;
+      final int totalLength = offsets[chunkDocs];
+      final int numStoredFields = this.numStoredFields[index];
+
+      fieldsStream.seek(startPointer);
+
+      final DataInput documentInput;
+      if (length == 0) {
+        // empty
+        documentInput = new ByteArrayDataInput();
+      } else if (merging) {
+        // already decompressed
+        documentInput = new ByteArrayDataInput(bytes.bytes, bytes.offset + offset, length);
+      } else if (sliced) {
+        decompressor.decompress(fieldsStream, chunkSize, offset, Math.min(length, chunkSize - offset), bytes);
+        documentInput = new DataInput() {
+
+          int decompressed = bytes.length;
+
+          void fillBuffer() throws IOException {
+            assert decompressed <= length;
+            if (decompressed == length) {
+              throw new EOFException();
+            }
+            final int toDecompress = Math.min(length - decompressed, chunkSize);
+            decompressor.decompress(fieldsStream, toDecompress, 0, toDecompress, bytes);
+            decompressed += toDecompress;
+          }
+
+          @Override
+          public byte readByte() throws IOException {
+            if (bytes.length == 0) {
+              fillBuffer();
+            }
+            --bytes.length;
+            return bytes.bytes[bytes.offset++];
+          }
+
+          @Override
+          public void readBytes(byte[] b, int offset, int len) throws IOException {
+            while (len > bytes.length) {
+              System.arraycopy(bytes.bytes, bytes.offset, b, offset, bytes.length);
+              len -= bytes.length;
+              offset += bytes.length;
+              fillBuffer();
+            }
+            System.arraycopy(bytes.bytes, bytes.offset, b, offset, len);
+            bytes.offset += len;
+            bytes.length -= len;
+          }
+
+        };
+      } else {
+        final BytesRef bytes = totalLength <= BUFFER_REUSE_THRESHOLD ? this.bytes : new BytesRef();
+        decompressor.decompress(fieldsStream, totalLength, offset, length, bytes);
+        assert bytes.length == length;
+        documentInput = new ByteArrayDataInput(bytes.bytes, bytes.offset, bytes.length);
+      }
 
-      };
-    } else {
-      final BytesRef bytes = totalLength <= BUFFER_REUSE_THRESHOLD ? this.bytes : new BytesRef();
-      decompressor.decompress(fieldsStream, totalLength, offset, length, bytes);
-      assert bytes.length == length;
-      documentInput = new ByteArrayDataInput(bytes.bytes, bytes.offset, bytes.length);
+      return new SerializedDocument(documentInput, length, numStoredFields);
     }
 
-    for (int fieldIDX = 0; fieldIDX < numStoredFields; fieldIDX++) {
-      final long infoAndBits = documentInput.readVLong();
+  }
+
+  SerializedDocument document(int docID) throws IOException {
+    if (state.contains(docID) == false) {
+      fieldsStream.seek(indexReader.getStartPointer(docID));
+      state.reset(docID);
+    }
+    assert state.contains(docID);
+    return state.document(docID);
+  }
+
+  @Override
+  public void visitDocument(int docID, StoredFieldVisitor visitor)
+      throws IOException {
+
+    final SerializedDocument doc = document(docID);
+
+    for (int fieldIDX = 0; fieldIDX < doc.numStoredFields; fieldIDX++) {
+      final long infoAndBits = doc.in.readVLong();
       final int fieldNumber = (int) (infoAndBits >>> TYPE_BITS);
       final FieldInfo fieldInfo = fieldInfos.fieldInfo(fieldNumber);
 
@@ -376,10 +510,10 @@ final class Lucene41StoredFieldsReader e
 
       switch(visitor.needsField(fieldInfo)) {
         case YES:
-          readField(documentInput, visitor, fieldInfo, bits);
+          readField(doc.in, visitor, fieldInfo, bits);
           break;
         case NO:
-          skipField(documentInput, bits);
+          skipField(doc.in, bits);
           break;
         case STOP:
           return;
@@ -390,7 +524,13 @@ final class Lucene41StoredFieldsReader e
   @Override
   public StoredFieldsReader clone() {
     ensureOpen();
-    return new Lucene41StoredFieldsReader(this);
+    return new Lucene41StoredFieldsReader(this, false);
+  }
+
+  @Override
+  public StoredFieldsReader getMergeInstance() {
+    ensureOpen();
+    return new Lucene41StoredFieldsReader(this, true);
   }
 
   @Override

Modified: lucene/dev/branches/branch_5x/lucene/backward-codecs/src/test/org/apache/lucene/codecs/lucene41/TestLucene41StoredFieldsFormat.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/lucene/backward-codecs/src/test/org/apache/lucene/codecs/lucene41/TestLucene41StoredFieldsFormat.java?rev=1648342&r1=1648341&r2=1648342&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/lucene/backward-codecs/src/test/org/apache/lucene/codecs/lucene41/TestLucene41StoredFieldsFormat.java (original)
+++ lucene/dev/branches/branch_5x/lucene/backward-codecs/src/test/org/apache/lucene/codecs/lucene41/TestLucene41StoredFieldsFormat.java Mon Dec 29 13:44:45 2014
@@ -17,12 +17,67 @@ package org.apache.lucene.codecs.lucene4
  * limitations under the License.
  */
 
+import java.io.IOException;
+
+import org.apache.lucene.analysis.MockAnalyzer;
 import org.apache.lucene.codecs.Codec;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field.Store;
+import org.apache.lucene.document.StoredField;
+import org.apache.lucene.document.StringField;
 import org.apache.lucene.index.BaseStoredFieldsFormatTestCase;
+import org.apache.lucene.index.NoMergePolicy;
+import org.apache.lucene.index.RandomIndexWriter;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.util.TestUtil;
+
+import com.carrotsearch.randomizedtesting.generators.RandomStrings;
 
 public class TestLucene41StoredFieldsFormat extends BaseStoredFieldsFormatTestCase {
   @Override
   protected Codec getCodec() {
     return new Lucene41RWCodec();
   }
+
+  public void testMergeLargeDocuments() throws IOException {
+    // this format has a different logic for blocks that are greater than 32KB (2 * chunkSize)
+    // so we need to explicitely test it.
+    final int numDocs = atLeast(200);
+    Directory dir = newDirectory();
+    RandomIndexWriter w = new RandomIndexWriter(random(), dir, newIndexWriterConfig(new MockAnalyzer(random())).setMergePolicy(NoMergePolicy.INSTANCE));
+    for (int i = 0; i < numDocs; ++i) {
+      Document doc = new Document();
+      doc.add(new StringField("delete", random().nextBoolean() ? "yes" : "no", Store.YES));
+      final int length;
+      switch (random().nextInt(4)) {
+        case 0:
+          length = 16; // small docs so that some large block contain several docs (a small doc followed by a large one)
+          break;
+        case 1:
+          length = 1 << 14; // a document that will cause the block NOT to be sliced to make sure that we make the distinction correctly
+          break;
+        case 2:
+          length = 1 << 15; // a document that will make the block sliced into 2 slices
+          break;
+        case 3:
+          length = 1 << 17; // a document that will make the block sliced into more than 2 slices
+          break;
+        default:
+          throw new AssertionError();
+      }
+      doc.add(new StoredField("f", RandomStrings.randomAsciiOfLength(random(), length)));
+      w.addDocument(doc);
+    }
+    w.deleteDocuments(new Term("delete", "yes"));
+    w.commit();
+    w.close();
+
+    w = new RandomIndexWriter(random(), dir);
+    w.forceMerge(TestUtil.nextInt(random(), 1, 3));
+    w.commit();
+    w.close();
+    TestUtil.checkIndex(dir);
+    dir.close();
+  }
 }