You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by jp...@apache.org on 2014/12/29 14:44:45 UTC
svn commit: r1648342 - in
/lucene/dev/branches/branch_5x/lucene/backward-codecs/src:
java/org/apache/lucene/codecs/lucene41/Lucene41StoredFieldsReader.java
test/org/apache/lucene/codecs/lucene41/TestLucene41StoredFieldsFormat.java
Author: jpountz
Date: Mon Dec 29 13:44:45 2014
New Revision: 1648342
URL: http://svn.apache.org/r1648342
Log:
LUCENE-6142: Faster merges with Lucene41StoredFieldsFormat.
Modified:
lucene/dev/branches/branch_5x/lucene/backward-codecs/src/java/org/apache/lucene/codecs/lucene41/Lucene41StoredFieldsReader.java
lucene/dev/branches/branch_5x/lucene/backward-codecs/src/test/org/apache/lucene/codecs/lucene41/TestLucene41StoredFieldsFormat.java
Modified: lucene/dev/branches/branch_5x/lucene/backward-codecs/src/java/org/apache/lucene/codecs/lucene41/Lucene41StoredFieldsReader.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/lucene/backward-codecs/src/java/org/apache/lucene/codecs/lucene41/Lucene41StoredFieldsReader.java?rev=1648342&r1=1648341&r2=1648342&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/lucene/backward-codecs/src/java/org/apache/lucene/codecs/lucene41/Lucene41StoredFieldsReader.java (original)
+++ lucene/dev/branches/branch_5x/lucene/backward-codecs/src/java/org/apache/lucene/codecs/lucene41/Lucene41StoredFieldsReader.java Mon Dec 29 13:44:45 2014
@@ -20,12 +20,14 @@ package org.apache.lucene.codecs.lucene4
import java.io.EOFException;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
import java.util.Collections;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.codecs.StoredFieldsReader;
import org.apache.lucene.codecs.compressing.CompressionMode;
import org.apache.lucene.codecs.compressing.Decompressor;
+import org.apache.lucene.document.Document;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.FieldInfos;
@@ -41,8 +43,10 @@ import org.apache.lucene.store.IOContext
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.Accountables;
+import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.IOUtils;
+import org.apache.lucene.util.IntsRef;
import org.apache.lucene.util.packed.PackedInts;
/**
@@ -88,12 +92,13 @@ final class Lucene41StoredFieldsReader e
private final int packedIntsVersion;
private final CompressionMode compressionMode;
private final Decompressor decompressor;
- private final BytesRef bytes;
private final int numDocs;
+ private final boolean merging;
+ private final BlockState state;
private boolean closed;
// used by clone
- private Lucene41StoredFieldsReader(Lucene41StoredFieldsReader reader) {
+ private Lucene41StoredFieldsReader(Lucene41StoredFieldsReader reader, boolean merging) {
this.version = reader.version;
this.fieldInfos = reader.fieldInfos;
this.fieldsStream = reader.fieldsStream.clone();
@@ -104,7 +109,8 @@ final class Lucene41StoredFieldsReader e
this.compressionMode = reader.compressionMode;
this.decompressor = reader.decompressor.clone();
this.numDocs = reader.numDocs;
- this.bytes = new BytesRef(reader.bytes.bytes.length);
+ this.merging = merging;
+ this.state = new BlockState();
this.closed = false;
}
@@ -162,7 +168,8 @@ final class Lucene41StoredFieldsReader e
}
packedIntsVersion = fieldsStream.readVInt();
decompressor = compressionMode.newDecompressor();
- this.bytes = new BytesRef();
+ this.merging = false;
+ this.state = new BlockState();
if (version >= VERSION_CHECKSUM) {
// NOTE: data file is too costly to verify checksum against all the bytes on open,
@@ -251,123 +258,250 @@ final class Lucene41StoredFieldsReader e
}
}
- @Override
- public void visitDocument(int docID, StoredFieldVisitor visitor)
- throws IOException {
- fieldsStream.seek(indexReader.getStartPointer(docID));
+ /**
+ * A serialized document, you need to decode its input in order to get an actual
+ * {@link Document}.
+ */
+ static class SerializedDocument {
- final int docBase = fieldsStream.readVInt();
- final int chunkDocs = fieldsStream.readVInt();
- if (docID < docBase
- || docID >= docBase + chunkDocs
- || docBase + chunkDocs > numDocs) {
- throw new CorruptIndexException("Corrupted: docID=" + docID
- + ", docBase=" + docBase + ", chunkDocs=" + chunkDocs
- + ", numDocs=" + numDocs, fieldsStream);
- }
-
- final int numStoredFields, offset, length, totalLength;
- if (chunkDocs == 1) {
- numStoredFields = fieldsStream.readVInt();
- offset = 0;
- length = fieldsStream.readVInt();
- totalLength = length;
- } else {
- final int bitsPerStoredFields = fieldsStream.readVInt();
- if (bitsPerStoredFields == 0) {
- numStoredFields = fieldsStream.readVInt();
- } else if (bitsPerStoredFields > 31) {
- throw new CorruptIndexException("bitsPerStoredFields=" + bitsPerStoredFields, fieldsStream);
- } else {
- final long filePointer = fieldsStream.getFilePointer();
- final PackedInts.Reader reader = PackedInts.getDirectReaderNoHeader(fieldsStream, PackedInts.Format.PACKED, packedIntsVersion, chunkDocs, bitsPerStoredFields);
- numStoredFields = (int) (reader.get(docID - docBase));
- fieldsStream.seek(filePointer + PackedInts.Format.PACKED.byteCount(packedIntsVersion, chunkDocs, bitsPerStoredFields));
- }
+ // the serialized data
+ final DataInput in;
- final int bitsPerLength = fieldsStream.readVInt();
- if (bitsPerLength == 0) {
- length = fieldsStream.readVInt();
- offset = (docID - docBase) * length;
- totalLength = chunkDocs * length;
- } else if (bitsPerStoredFields > 31) {
- throw new CorruptIndexException("bitsPerLength=" + bitsPerLength, fieldsStream);
- } else {
- final PackedInts.ReaderIterator it = PackedInts.getReaderIteratorNoHeader(fieldsStream, PackedInts.Format.PACKED, packedIntsVersion, chunkDocs, bitsPerLength, 1);
- int off = 0;
- for (int i = 0; i < docID - docBase; ++i) {
- off += it.next();
- }
- offset = off;
- length = (int) it.next();
- off += length;
- for (int i = docID - docBase + 1; i < chunkDocs; ++i) {
- off += it.next();
- }
- totalLength = off;
- }
- }
+ // the number of bytes on which the document is encoded
+ final int length;
- if ((length == 0) != (numStoredFields == 0)) {
- throw new CorruptIndexException("length=" + length + ", numStoredFields=" + numStoredFields, fieldsStream);
+ // the number of stored fields
+ final int numStoredFields;
+
+ private SerializedDocument(DataInput in, int length, int numStoredFields) {
+ this.in = in;
+ this.length = length;
+ this.numStoredFields = numStoredFields;
}
- if (numStoredFields == 0) {
- // nothing to do
- return;
+
+ }
+
+ /**
+ * Keeps state about the current block of documents.
+ */
+ private class BlockState {
+
+ private int docBase, chunkDocs;
+
+ // whether the block has been sliced, this happens for large documents
+ private boolean sliced;
+
+ private int[] offsets = IntsRef.EMPTY_INTS;
+ private int[] numStoredFields = IntsRef.EMPTY_INTS;
+
+ // the start pointer at which you can read the compressed documents
+ private long startPointer;
+
+ private final BytesRef spare = new BytesRef();
+ private final BytesRef bytes = new BytesRef();
+
+ boolean contains(int docID) {
+ return docID >= docBase && docID < docBase + chunkDocs;
+ }
+
+ /**
+ * Reset this block so that it stores state for the block
+ * that contains the given doc id.
+ */
+ void reset(int docID) throws IOException {
+ boolean success = false;
+ try {
+ doReset(docID);
+ success = true;
+ } finally {
+ if (success == false) {
+ // if the read failed, set chunkDocs to 0 so that it does not
+ // contain any docs anymore and is not reused. This should help
+ // get consistent exceptions when trying to get several
+ // documents which are in the same corrupted block since it will
+ // force the header to be decoded again
+ chunkDocs = 0;
+ }
+ }
}
- final DataInput documentInput;
- if (version >= VERSION_BIG_CHUNKS && totalLength >= 2 * chunkSize) {
- assert chunkSize > 0;
- assert offset < chunkSize;
+ private void doReset(int docID) throws IOException {
+ docBase = fieldsStream.readVInt();
+ chunkDocs = fieldsStream.readVInt();
+ if (contains(docID) == false
+ || docBase + chunkDocs > numDocs) {
+ throw new CorruptIndexException("Corrupted: docID=" + docID
+ + ", docBase=" + docBase + ", chunkDocs=" + chunkDocs
+ + ", numDocs=" + numDocs, fieldsStream);
+ }
- decompressor.decompress(fieldsStream, chunkSize, offset, Math.min(length, chunkSize - offset), bytes);
- documentInput = new DataInput() {
+ offsets = ArrayUtil.grow(offsets, chunkDocs + 1);
+ numStoredFields = ArrayUtil.grow(numStoredFields, chunkDocs);
- int decompressed = bytes.length;
+ if (chunkDocs == 1) {
+ numStoredFields[0] = fieldsStream.readVInt();
+ offsets[1] = fieldsStream.readVInt();
+ } else {
+ // Number of stored fields per document
+ final int bitsPerStoredFields = fieldsStream.readVInt();
+ if (bitsPerStoredFields == 0) {
+ Arrays.fill(numStoredFields, 0, chunkDocs, fieldsStream.readVInt());
+ } else if (bitsPerStoredFields > 31) {
+ throw new CorruptIndexException("bitsPerStoredFields=" + bitsPerStoredFields, fieldsStream);
+ } else {
+ final PackedInts.ReaderIterator it = PackedInts.getReaderIteratorNoHeader(fieldsStream, PackedInts.Format.PACKED, packedIntsVersion, chunkDocs, bitsPerStoredFields, 1);
+ for (int i = 0; i < chunkDocs; ++i) {
+ numStoredFields[i] = (int) it.next();
+ }
+ }
- void fillBuffer() throws IOException {
- assert decompressed <= length;
- if (decompressed == length) {
- throw new EOFException();
+ // The stream encodes the length of each document and we decode
+ // it into a list of monotonically increasing offsets
+ final int bitsPerLength = fieldsStream.readVInt();
+ if (bitsPerLength == 0) {
+ final int length = fieldsStream.readVInt();
+ for (int i = 0; i < chunkDocs; ++i) {
+ offsets[1 + i] = (1 + i) * length;
+ }
+ } else if (bitsPerStoredFields > 31) {
+ throw new CorruptIndexException("bitsPerLength=" + bitsPerLength, fieldsStream);
+ } else {
+ final PackedInts.ReaderIterator it = PackedInts.getReaderIteratorNoHeader(fieldsStream, PackedInts.Format.PACKED, packedIntsVersion, chunkDocs, bitsPerLength, 1);
+ for (int i = 0; i < chunkDocs; ++i) {
+ offsets[i + 1] = (int) it.next();
+ }
+ for (int i = 0; i < chunkDocs; ++i) {
+ offsets[i + 1] += offsets[i];
}
- final int toDecompress = Math.min(length - decompressed, chunkSize);
- decompressor.decompress(fieldsStream, toDecompress, 0, toDecompress, bytes);
- decompressed += toDecompress;
}
- @Override
- public byte readByte() throws IOException {
- if (bytes.length == 0) {
- fillBuffer();
+ // Additional validation: only the empty document has a serialized length of 0
+ for (int i = 0; i < chunkDocs; ++i) {
+ final int len = offsets[i + 1] - offsets[i];
+ final int storedFields = numStoredFields[i];
+ if ((len == 0) != (storedFields == 0)) {
+ throw new CorruptIndexException("length=" + len + ", numStoredFields=" + storedFields, fieldsStream);
}
- --bytes.length;
- return bytes.bytes[bytes.offset++];
}
+ }
+ sliced = version >= VERSION_BIG_CHUNKS && offsets[chunkDocs] >= 2 * chunkSize;
- @Override
- public void readBytes(byte[] b, int offset, int len) throws IOException {
- while (len > bytes.length) {
- System.arraycopy(bytes.bytes, bytes.offset, b, offset, bytes.length);
- len -= bytes.length;
- offset += bytes.length;
- fillBuffer();
+ startPointer = fieldsStream.getFilePointer();
+
+ if (merging) {
+ final int totalLength = offsets[chunkDocs];
+ // decompress eagerly
+ if (sliced) {
+ bytes.offset = bytes.length = 0;
+ for (int decompressed = 0; decompressed < totalLength; ) {
+ final int toDecompress = Math.min(totalLength - decompressed, chunkSize);
+ decompressor.decompress(fieldsStream, toDecompress, 0, toDecompress, spare);
+ bytes.bytes = ArrayUtil.grow(bytes.bytes, bytes.length + spare.length);
+ System.arraycopy(spare.bytes, spare.offset, bytes.bytes, bytes.length, spare.length);
+ bytes.length += spare.length;
+ decompressed += toDecompress;
}
- System.arraycopy(bytes.bytes, bytes.offset, b, offset, len);
- bytes.offset += len;
- bytes.length -= len;
+ } else {
+ decompressor.decompress(fieldsStream, totalLength, 0, totalLength, bytes);
+ }
+ if (bytes.length != totalLength) {
+ throw new CorruptIndexException("Corrupted: expected chunk size = " + totalLength + ", got " + bytes.length, fieldsStream);
}
+ }
+ }
+
+ /**
+ * Get the serialized representation of the given docID. This docID has
+ * to be contained in the current block.
+ */
+ SerializedDocument document(int docID) throws IOException {
+ if (contains(docID) == false) {
+ throw new IllegalArgumentException();
+ }
+
+ final int index = docID - docBase;
+ final int offset = offsets[index];
+ final int length = offsets[index+1] - offset;
+ final int totalLength = offsets[chunkDocs];
+ final int numStoredFields = this.numStoredFields[index];
+
+ fieldsStream.seek(startPointer);
+
+ final DataInput documentInput;
+ if (length == 0) {
+ // empty
+ documentInput = new ByteArrayDataInput();
+ } else if (merging) {
+ // already decompressed
+ documentInput = new ByteArrayDataInput(bytes.bytes, bytes.offset + offset, length);
+ } else if (sliced) {
+ decompressor.decompress(fieldsStream, chunkSize, offset, Math.min(length, chunkSize - offset), bytes);
+ documentInput = new DataInput() {
+
+ int decompressed = bytes.length;
+
+ void fillBuffer() throws IOException {
+ assert decompressed <= length;
+ if (decompressed == length) {
+ throw new EOFException();
+ }
+ final int toDecompress = Math.min(length - decompressed, chunkSize);
+ decompressor.decompress(fieldsStream, toDecompress, 0, toDecompress, bytes);
+ decompressed += toDecompress;
+ }
+
+ @Override
+ public byte readByte() throws IOException {
+ if (bytes.length == 0) {
+ fillBuffer();
+ }
+ --bytes.length;
+ return bytes.bytes[bytes.offset++];
+ }
+
+ @Override
+ public void readBytes(byte[] b, int offset, int len) throws IOException {
+ while (len > bytes.length) {
+ System.arraycopy(bytes.bytes, bytes.offset, b, offset, bytes.length);
+ len -= bytes.length;
+ offset += bytes.length;
+ fillBuffer();
+ }
+ System.arraycopy(bytes.bytes, bytes.offset, b, offset, len);
+ bytes.offset += len;
+ bytes.length -= len;
+ }
+
+ };
+ } else {
+ final BytesRef bytes = totalLength <= BUFFER_REUSE_THRESHOLD ? this.bytes : new BytesRef();
+ decompressor.decompress(fieldsStream, totalLength, offset, length, bytes);
+ assert bytes.length == length;
+ documentInput = new ByteArrayDataInput(bytes.bytes, bytes.offset, bytes.length);
+ }
- };
- } else {
- final BytesRef bytes = totalLength <= BUFFER_REUSE_THRESHOLD ? this.bytes : new BytesRef();
- decompressor.decompress(fieldsStream, totalLength, offset, length, bytes);
- assert bytes.length == length;
- documentInput = new ByteArrayDataInput(bytes.bytes, bytes.offset, bytes.length);
+ return new SerializedDocument(documentInput, length, numStoredFields);
}
- for (int fieldIDX = 0; fieldIDX < numStoredFields; fieldIDX++) {
- final long infoAndBits = documentInput.readVLong();
+ }
+
+ SerializedDocument document(int docID) throws IOException {
+ if (state.contains(docID) == false) {
+ fieldsStream.seek(indexReader.getStartPointer(docID));
+ state.reset(docID);
+ }
+ assert state.contains(docID);
+ return state.document(docID);
+ }
+
+ @Override
+ public void visitDocument(int docID, StoredFieldVisitor visitor)
+ throws IOException {
+
+ final SerializedDocument doc = document(docID);
+
+ for (int fieldIDX = 0; fieldIDX < doc.numStoredFields; fieldIDX++) {
+ final long infoAndBits = doc.in.readVLong();
final int fieldNumber = (int) (infoAndBits >>> TYPE_BITS);
final FieldInfo fieldInfo = fieldInfos.fieldInfo(fieldNumber);
@@ -376,10 +510,10 @@ final class Lucene41StoredFieldsReader e
switch(visitor.needsField(fieldInfo)) {
case YES:
- readField(documentInput, visitor, fieldInfo, bits);
+ readField(doc.in, visitor, fieldInfo, bits);
break;
case NO:
- skipField(documentInput, bits);
+ skipField(doc.in, bits);
break;
case STOP:
return;
@@ -390,7 +524,13 @@ final class Lucene41StoredFieldsReader e
@Override
public StoredFieldsReader clone() {
ensureOpen();
- return new Lucene41StoredFieldsReader(this);
+ return new Lucene41StoredFieldsReader(this, false);
+ }
+
+ @Override
+ public StoredFieldsReader getMergeInstance() {
+ ensureOpen();
+ return new Lucene41StoredFieldsReader(this, true);
}
@Override
Modified: lucene/dev/branches/branch_5x/lucene/backward-codecs/src/test/org/apache/lucene/codecs/lucene41/TestLucene41StoredFieldsFormat.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/lucene/backward-codecs/src/test/org/apache/lucene/codecs/lucene41/TestLucene41StoredFieldsFormat.java?rev=1648342&r1=1648341&r2=1648342&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/lucene/backward-codecs/src/test/org/apache/lucene/codecs/lucene41/TestLucene41StoredFieldsFormat.java (original)
+++ lucene/dev/branches/branch_5x/lucene/backward-codecs/src/test/org/apache/lucene/codecs/lucene41/TestLucene41StoredFieldsFormat.java Mon Dec 29 13:44:45 2014
@@ -17,12 +17,67 @@ package org.apache.lucene.codecs.lucene4
* limitations under the License.
*/
+import java.io.IOException;
+
+import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.codecs.Codec;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field.Store;
+import org.apache.lucene.document.StoredField;
+import org.apache.lucene.document.StringField;
import org.apache.lucene.index.BaseStoredFieldsFormatTestCase;
+import org.apache.lucene.index.NoMergePolicy;
+import org.apache.lucene.index.RandomIndexWriter;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.util.TestUtil;
+
+import com.carrotsearch.randomizedtesting.generators.RandomStrings;
public class TestLucene41StoredFieldsFormat extends BaseStoredFieldsFormatTestCase {
@Override
protected Codec getCodec() {
return new Lucene41RWCodec();
}
+
+ public void testMergeLargeDocuments() throws IOException {
+ // this format has a different logic for blocks that are greater than 32KB (2 * chunkSize)
+ // so we need to explicitely test it.
+ final int numDocs = atLeast(200);
+ Directory dir = newDirectory();
+ RandomIndexWriter w = new RandomIndexWriter(random(), dir, newIndexWriterConfig(new MockAnalyzer(random())).setMergePolicy(NoMergePolicy.INSTANCE));
+ for (int i = 0; i < numDocs; ++i) {
+ Document doc = new Document();
+ doc.add(new StringField("delete", random().nextBoolean() ? "yes" : "no", Store.YES));
+ final int length;
+ switch (random().nextInt(4)) {
+ case 0:
+ length = 16; // small docs so that some large block contain several docs (a small doc followed by a large one)
+ break;
+ case 1:
+ length = 1 << 14; // a document that will cause the block NOT to be sliced to make sure that we make the distinction correctly
+ break;
+ case 2:
+ length = 1 << 15; // a document that will make the block sliced into 2 slices
+ break;
+ case 3:
+ length = 1 << 17; // a document that will make the block sliced into more than 2 slices
+ break;
+ default:
+ throw new AssertionError();
+ }
+ doc.add(new StoredField("f", RandomStrings.randomAsciiOfLength(random(), length)));
+ w.addDocument(doc);
+ }
+ w.deleteDocuments(new Term("delete", "yes"));
+ w.commit();
+ w.close();
+
+ w = new RandomIndexWriter(random(), dir);
+ w.forceMerge(TestUtil.nextInt(random(), 1, 3));
+ w.commit();
+ w.close();
+ TestUtil.checkIndex(dir);
+ dir.close();
+ }
}