You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by jp...@apache.org on 2014/12/18 10:13:22 UTC
svn commit: r1646413 - in /lucene/dev/trunk/lucene:
core/src/java/org/apache/lucene/codecs/compressing/
test-framework/src/java/org/apache/lucene/codecs/compressing/
test-framework/src/java/org/apache/lucene/index/
Author: jpountz
Date: Thu Dec 18 09:13:21 2014
New Revision: 1646413
URL: http://svn.apache.org/r1646413
Log:
LUCENE-6115: Add getMergeInstance to CompressingStoredFieldsReader.
Modified:
lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressingStoredFieldsReader.java
lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressingStoredFieldsWriter.java
lucene/dev/trunk/lucene/test-framework/src/java/org/apache/lucene/codecs/compressing/CompressingCodec.java
lucene/dev/trunk/lucene/test-framework/src/java/org/apache/lucene/index/BaseStoredFieldsFormatTestCase.java
Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressingStoredFieldsReader.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressingStoredFieldsReader.java?rev=1646413&r1=1646412&r2=1646413&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressingStoredFieldsReader.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressingStoredFieldsReader.java Thu Dec 18 09:13:21 2014
@@ -46,6 +46,7 @@ import java.util.Collections;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.codecs.StoredFieldsReader;
+import org.apache.lucene.document.Document;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.FieldInfos;
@@ -53,7 +54,6 @@ import org.apache.lucene.index.IndexFile
import org.apache.lucene.index.SegmentInfo;
import org.apache.lucene.index.StoredFieldVisitor;
import org.apache.lucene.store.AlreadyClosedException;
-import org.apache.lucene.store.BufferedChecksumIndexInput;
import org.apache.lucene.store.ByteArrayDataInput;
import org.apache.lucene.store.ChecksumIndexInput;
import org.apache.lucene.store.DataInput;
@@ -66,6 +66,7 @@ import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.BitUtil;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.IOUtils;
+import org.apache.lucene.util.IntsRef;
import org.apache.lucene.util.packed.PackedInts;
/**
@@ -74,9 +75,6 @@ import org.apache.lucene.util.packed.Pac
*/
public final class CompressingStoredFieldsReader extends StoredFieldsReader {
- // Do not reuse the decompression buffer when there is more than 32kb to decompress
- private static final int BUFFER_REUSE_THRESHOLD = 1 << 15;
-
private final int version;
private final FieldInfos fieldInfos;
private final CompressingStoredFieldsIndexReader indexReader;
@@ -86,12 +84,13 @@ public final class CompressingStoredFiel
private final int packedIntsVersion;
private final CompressionMode compressionMode;
private final Decompressor decompressor;
- private final BytesRef bytes;
private final int numDocs;
+ private final boolean merging;
+ private final BlockState state;
private boolean closed;
// used by clone
- private CompressingStoredFieldsReader(CompressingStoredFieldsReader reader) {
+ private CompressingStoredFieldsReader(CompressingStoredFieldsReader reader, boolean merging) {
this.version = reader.version;
this.fieldInfos = reader.fieldInfos;
this.fieldsStream = reader.fieldsStream.clone();
@@ -102,7 +101,8 @@ public final class CompressingStoredFiel
this.compressionMode = reader.compressionMode;
this.decompressor = reader.decompressor.clone();
this.numDocs = reader.numDocs;
- this.bytes = new BytesRef(reader.bytes.bytes.length);
+ this.merging = merging;
+ this.state = new BlockState();
this.closed = false;
}
@@ -157,7 +157,8 @@ public final class CompressingStoredFiel
chunkSize = fieldsStream.readVInt();
packedIntsVersion = fieldsStream.readVInt();
decompressor = compressionMode.newDecompressor();
- this.bytes = new BytesRef();
+ this.merging = false;
+ this.state = new BlockState();
// NOTE: data file is too costly to verify checksum against all the bytes on open,
// but for now we at least verify proper structure of the checksum footer: which looks
@@ -324,227 +325,94 @@ public final class CompressingStoredFiel
return l;
}
- @Override
- public void visitDocument(int docID, StoredFieldVisitor visitor)
- throws IOException {
- fieldsStream.seek(indexReader.getStartPointer(docID));
-
- final int docBase = fieldsStream.readVInt();
- final int chunkDocs = fieldsStream.readVInt();
- if (docID < docBase
- || docID >= docBase + chunkDocs
- || docBase + chunkDocs > numDocs) {
- throw new CorruptIndexException("Corrupted: docID=" + docID
- + ", docBase=" + docBase + ", chunkDocs=" + chunkDocs
- + ", numDocs=" + numDocs, fieldsStream);
- }
-
- final int numStoredFields, offset, length, totalLength;
- if (chunkDocs == 1) {
- numStoredFields = fieldsStream.readVInt();
- offset = 0;
- length = fieldsStream.readVInt();
- totalLength = length;
- } else {
- final int bitsPerStoredFields = fieldsStream.readVInt();
- if (bitsPerStoredFields == 0) {
- numStoredFields = fieldsStream.readVInt();
- } else if (bitsPerStoredFields > 31) {
- throw new CorruptIndexException("bitsPerStoredFields=" + bitsPerStoredFields, fieldsStream);
- } else {
- final long filePointer = fieldsStream.getFilePointer();
- final PackedInts.Reader reader = PackedInts.getDirectReaderNoHeader(fieldsStream, PackedInts.Format.PACKED, packedIntsVersion, chunkDocs, bitsPerStoredFields);
- numStoredFields = (int) (reader.get(docID - docBase));
- fieldsStream.seek(filePointer + PackedInts.Format.PACKED.byteCount(packedIntsVersion, chunkDocs, bitsPerStoredFields));
- }
-
- final int bitsPerLength = fieldsStream.readVInt();
- if (bitsPerLength == 0) {
- length = fieldsStream.readVInt();
- offset = (docID - docBase) * length;
- totalLength = chunkDocs * length;
- } else if (bitsPerStoredFields > 31) {
- throw new CorruptIndexException("bitsPerLength=" + bitsPerLength, fieldsStream);
- } else {
- final PackedInts.ReaderIterator it = PackedInts.getReaderIteratorNoHeader(fieldsStream, PackedInts.Format.PACKED, packedIntsVersion, chunkDocs, bitsPerLength, 1);
- int off = 0;
- for (int i = 0; i < docID - docBase; ++i) {
- off += it.next();
- }
- offset = off;
- length = (int) it.next();
- off += length;
- for (int i = docID - docBase + 1; i < chunkDocs; ++i) {
- off += it.next();
- }
- totalLength = off;
- }
- }
-
- if ((length == 0) != (numStoredFields == 0)) {
- throw new CorruptIndexException("length=" + length + ", numStoredFields=" + numStoredFields, fieldsStream);
- }
- if (numStoredFields == 0) {
- // nothing to do
- return;
- }
-
- final DataInput documentInput;
- if (totalLength >= 2 * chunkSize) {
- assert chunkSize > 0;
- assert offset < chunkSize;
+ /**
+ * A serialized document, you need to decode its input in order to get an actual
+ * {@link Document}.
+ */
+ static class SerializedDocument {
- decompressor.decompress(fieldsStream, chunkSize, offset, Math.min(length, chunkSize - offset), bytes);
- documentInput = new DataInput() {
+ // the serialized data
+ final DataInput in;
- int decompressed = bytes.length;
+ // the number of bytes on which the document is encoded
+ final int length;
- void fillBuffer() throws IOException {
- assert decompressed <= length;
- if (decompressed == length) {
- throw new EOFException();
- }
- final int toDecompress = Math.min(length - decompressed, chunkSize);
- decompressor.decompress(fieldsStream, toDecompress, 0, toDecompress, bytes);
- decompressed += toDecompress;
- }
+ // the number of stored fields
+ final int numStoredFields;
- @Override
- public byte readByte() throws IOException {
- if (bytes.length == 0) {
- fillBuffer();
- }
- --bytes.length;
- return bytes.bytes[bytes.offset++];
- }
-
- @Override
- public void readBytes(byte[] b, int offset, int len) throws IOException {
- while (len > bytes.length) {
- System.arraycopy(bytes.bytes, bytes.offset, b, offset, bytes.length);
- len -= bytes.length;
- offset += bytes.length;
- fillBuffer();
- }
- System.arraycopy(bytes.bytes, bytes.offset, b, offset, len);
- bytes.offset += len;
- bytes.length -= len;
- }
-
- };
- } else {
- final BytesRef bytes = totalLength <= BUFFER_REUSE_THRESHOLD ? this.bytes : new BytesRef();
- decompressor.decompress(fieldsStream, totalLength, offset, length, bytes);
- assert bytes.length == length;
- documentInput = new ByteArrayDataInput(bytes.bytes, bytes.offset, bytes.length);
+ private SerializedDocument(DataInput in, int length, int numStoredFields) {
+ this.in = in;
+ this.length = length;
+ this.numStoredFields = numStoredFields;
}
- for (int fieldIDX = 0; fieldIDX < numStoredFields; fieldIDX++) {
- final long infoAndBits = documentInput.readVLong();
- final int fieldNumber = (int) (infoAndBits >>> TYPE_BITS);
- final FieldInfo fieldInfo = fieldInfos.fieldInfo(fieldNumber);
-
- final int bits = (int) (infoAndBits & TYPE_MASK);
- assert bits <= NUMERIC_DOUBLE: "bits=" + Integer.toHexString(bits);
-
- switch(visitor.needsField(fieldInfo)) {
- case YES:
- readField(documentInput, visitor, fieldInfo, bits);
- break;
- case NO:
- skipField(documentInput, bits);
- break;
- case STOP:
- return;
- }
- }
}
- @Override
- public StoredFieldsReader clone() {
- ensureOpen();
- return new CompressingStoredFieldsReader(this);
- }
+ /**
+ * Keeps state about the current block of documents.
+ */
+ private class BlockState {
- int getVersion() {
- return version;
- }
+ private int docBase, chunkDocs;
- CompressionMode getCompressionMode() {
- return compressionMode;
- }
+ // whether the block has been sliced, this happens for large documents
+ private boolean sliced;
- int getChunkSize() {
- return chunkSize;
- }
+ private int[] offsets = IntsRef.EMPTY_INTS;
+ private int[] numStoredFields = IntsRef.EMPTY_INTS;
- ChunkIterator chunkIterator(int startDocID) throws IOException {
- ensureOpen();
- return new ChunkIterator(startDocID);
- }
+ // the start pointer at which you can read the compressed documents
+ private long startPointer;
- final class ChunkIterator {
+ private final BytesRef spare = new BytesRef();
+ private final BytesRef bytes = new BytesRef();
- final ChecksumIndexInput fieldsStream;
- final BytesRef spare;
- final BytesRef bytes;
- int docBase;
- int chunkDocs;
- int[] numStoredFields;
- int[] lengths;
-
- private ChunkIterator(int startDocId) throws IOException {
- this.docBase = -1;
- bytes = new BytesRef();
- spare = new BytesRef();
- numStoredFields = new int[1];
- lengths = new int[1];
-
- IndexInput in = CompressingStoredFieldsReader.this.fieldsStream;
- in.seek(0);
- fieldsStream = new BufferedChecksumIndexInput(in);
- fieldsStream.seek(indexReader.getStartPointer(startDocId));
+ boolean contains(int docID) {
+ return docID >= docBase && docID < docBase + chunkDocs;
}
/**
- * Return the decompressed size of the chunk
+ * Reset this block so that it stores state for the block
+ * that contains the given doc id.
*/
- int chunkSize() {
- int sum = 0;
- for (int i = 0; i < chunkDocs; ++i) {
- sum += lengths[i];
+ void reset(int docID) throws IOException {
+ boolean success = false;
+ try {
+ doReset(docID);
+ success = true;
+ } finally {
+ if (success == false) {
+ // if the read failed, set chunkDocs to 0 so that it does not
+ // contain any docs anymore and is not reused. This should help
+ // get consistent exceptions when trying to get several
+ // documents which are in the same corrupted block since it will
+ // force the header to be decoded again
+ chunkDocs = 0;
+ }
}
- return sum;
}
- /**
- * Go to the chunk containing the provided doc ID.
- */
- void next(int doc) throws IOException {
- assert doc >= docBase + chunkDocs : doc + " " + docBase + " " + chunkDocs;
- fieldsStream.seek(indexReader.getStartPointer(doc));
-
- final int docBase = fieldsStream.readVInt();
- final int chunkDocs = fieldsStream.readVInt();
- if (docBase < this.docBase + this.chunkDocs
+ private void doReset(int docID) throws IOException {
+ docBase = fieldsStream.readVInt();
+ final int token = fieldsStream.readVInt();
+ chunkDocs = token >>> 1;
+ if (contains(docID) == false
|| docBase + chunkDocs > numDocs) {
- throw new CorruptIndexException("Corrupted: current docBase=" + this.docBase
- + ", current numDocs=" + this.chunkDocs + ", new docBase=" + docBase
- + ", new numDocs=" + chunkDocs, fieldsStream);
- }
- this.docBase = docBase;
- this.chunkDocs = chunkDocs;
-
- if (chunkDocs > numStoredFields.length) {
- final int newLength = ArrayUtil.oversize(chunkDocs, 4);
- numStoredFields = new int[newLength];
- lengths = new int[newLength];
+ throw new CorruptIndexException("Corrupted: docID=" + docID
+ + ", docBase=" + docBase + ", chunkDocs=" + chunkDocs
+ + ", numDocs=" + numDocs, fieldsStream);
}
+ sliced = (token & 1) != 0;
+
+ offsets = ArrayUtil.grow(offsets, chunkDocs + 1);
+ numStoredFields = ArrayUtil.grow(numStoredFields, chunkDocs);
+
if (chunkDocs == 1) {
numStoredFields[0] = fieldsStream.readVInt();
- lengths[0] = fieldsStream.readVInt();
+ offsets[1] = fieldsStream.readVInt();
} else {
+ // Number of stored fields per document
final int bitsPerStoredFields = fieldsStream.readVInt();
if (bitsPerStoredFields == 0) {
Arrays.fill(numStoredFields, 0, chunkDocs, fieldsStream.readVInt());
@@ -557,52 +425,193 @@ public final class CompressingStoredFiel
}
}
+ // The stream encodes the length of each document and we decode
+ // it into a list of monotonically increasing offsets
final int bitsPerLength = fieldsStream.readVInt();
if (bitsPerLength == 0) {
- Arrays.fill(lengths, 0, chunkDocs, fieldsStream.readVInt());
- } else if (bitsPerLength > 31) {
+ final int length = fieldsStream.readVInt();
+ for (int i = 0; i < chunkDocs; ++i) {
+ offsets[1 + i] = (1 + i) * length;
+ }
+ } else if (bitsPerStoredFields > 31) {
throw new CorruptIndexException("bitsPerLength=" + bitsPerLength, fieldsStream);
} else {
final PackedInts.ReaderIterator it = PackedInts.getReaderIteratorNoHeader(fieldsStream, PackedInts.Format.PACKED, packedIntsVersion, chunkDocs, bitsPerLength, 1);
for (int i = 0; i < chunkDocs; ++i) {
- lengths[i] = (int) it.next();
+ offsets[i + 1] = (int) it.next();
+ }
+ for (int i = 0; i < chunkDocs; ++i) {
+ offsets[i + 1] += offsets[i];
+ }
+ }
+
+ // Additional validation: only the empty document has a serialized length of 0
+ for (int i = 0; i < chunkDocs; ++i) {
+ final int len = offsets[i + 1] - offsets[i];
+ final int storedFields = numStoredFields[i];
+ if ((len == 0) != (storedFields == 0)) {
+ throw new CorruptIndexException("length=" + len + ", numStoredFields=" + storedFields, fieldsStream);
+ }
+ }
+
+ }
+
+ startPointer = fieldsStream.getFilePointer();
+
+ if (merging) {
+ final int totalLength = offsets[chunkDocs];
+ // decompress eagerly
+ if (sliced) {
+ bytes.offset = bytes.length = 0;
+ for (int decompressed = 0; decompressed < totalLength; ) {
+ final int toDecompress = Math.min(totalLength - decompressed, chunkSize);
+ decompressor.decompress(fieldsStream, toDecompress, 0, toDecompress, spare);
+ bytes.bytes = ArrayUtil.grow(bytes.bytes, bytes.length + spare.length);
+ System.arraycopy(spare.bytes, spare.offset, bytes.bytes, bytes.length, spare.length);
+ bytes.length += spare.length;
+ decompressed += toDecompress;
}
+ } else {
+ decompressor.decompress(fieldsStream, totalLength, 0, totalLength, bytes);
+ }
+ if (bytes.length != totalLength) {
+ throw new CorruptIndexException("Corrupted: expected chunk size = " + totalLength + ", got " + bytes.length, fieldsStream);
}
}
}
/**
- * Decompress the chunk.
+ * Get the serialized representation of the given docID. This docID has
+ * to be contained in the current block.
*/
- void decompress() throws IOException {
- // decompress data
- final int chunkSize = chunkSize();
- if (chunkSize >= 2 * CompressingStoredFieldsReader.this.chunkSize) {
- bytes.offset = bytes.length = 0;
- for (int decompressed = 0; decompressed < chunkSize; ) {
- final int toDecompress = Math.min(chunkSize - decompressed, CompressingStoredFieldsReader.this.chunkSize);
- decompressor.decompress(fieldsStream, toDecompress, 0, toDecompress, spare);
- bytes.bytes = ArrayUtil.grow(bytes.bytes, bytes.length + spare.length);
- System.arraycopy(spare.bytes, spare.offset, bytes.bytes, bytes.length, spare.length);
- bytes.length += spare.length;
- decompressed += toDecompress;
- }
+ SerializedDocument document(int docID) throws IOException {
+ if (contains(docID) == false) {
+ throw new IllegalArgumentException();
+ }
+
+ final int index = docID - docBase;
+ final int offset = offsets[index];
+ final int length = offsets[index+1] - offset;
+ final int totalLength = offsets[chunkDocs];
+ final int numStoredFields = this.numStoredFields[index];
+
+ fieldsStream.seek(startPointer);
+
+ final DataInput documentInput;
+ if (length == 0) {
+ // empty
+ documentInput = new ByteArrayDataInput();
+ } else if (merging) {
+ // already decompressed
+ documentInput = new ByteArrayDataInput(bytes.bytes, bytes.offset + offset, length);
+ } else if (sliced) {
+ decompressor.decompress(fieldsStream, chunkSize, offset, Math.min(length, chunkSize - offset), bytes);
+ documentInput = new DataInput() {
+
+ int decompressed = bytes.length;
+
+ void fillBuffer() throws IOException {
+ assert decompressed <= length;
+ if (decompressed == length) {
+ throw new EOFException();
+ }
+ final int toDecompress = Math.min(length - decompressed, chunkSize);
+ decompressor.decompress(fieldsStream, toDecompress, 0, toDecompress, bytes);
+ decompressed += toDecompress;
+ }
+
+ @Override
+ public byte readByte() throws IOException {
+ if (bytes.length == 0) {
+ fillBuffer();
+ }
+ --bytes.length;
+ return bytes.bytes[bytes.offset++];
+ }
+
+ @Override
+ public void readBytes(byte[] b, int offset, int len) throws IOException {
+ while (len > bytes.length) {
+ System.arraycopy(bytes.bytes, bytes.offset, b, offset, bytes.length);
+ len -= bytes.length;
+ offset += bytes.length;
+ fillBuffer();
+ }
+ System.arraycopy(bytes.bytes, bytes.offset, b, offset, len);
+ bytes.offset += len;
+ bytes.length -= len;
+ }
+
+ };
} else {
- decompressor.decompress(fieldsStream, chunkSize, 0, chunkSize, bytes);
- }
- if (bytes.length != chunkSize) {
- throw new CorruptIndexException("Corrupted: expected chunk size = " + chunkSize() + ", got " + bytes.length, fieldsStream);
+ decompressor.decompress(fieldsStream, totalLength, offset, length, bytes);
+ assert bytes.length == length;
+ documentInput = new ByteArrayDataInput(bytes.bytes, bytes.offset, bytes.length);
}
+
+ return new SerializedDocument(documentInput, length, numStoredFields);
}
- /**
- * Check integrity of the data. The iterator is not usable after this method has been called.
- */
- void checkIntegrity() throws IOException {
- fieldsStream.seek(fieldsStream.length() - CodecUtil.footerLength());
- CodecUtil.checkFooter(fieldsStream);
+ }
+
+ SerializedDocument document(int docID) throws IOException {
+ if (state.contains(docID) == false) {
+ fieldsStream.seek(indexReader.getStartPointer(docID));
+ state.reset(docID);
}
+ assert state.contains(docID);
+ return state.document(docID);
+ }
+
+ @Override
+ public void visitDocument(int docID, StoredFieldVisitor visitor)
+ throws IOException {
+
+ final SerializedDocument doc = document(docID);
+ for (int fieldIDX = 0; fieldIDX < doc.numStoredFields; fieldIDX++) {
+ final long infoAndBits = doc.in.readVLong();
+ final int fieldNumber = (int) (infoAndBits >>> TYPE_BITS);
+ final FieldInfo fieldInfo = fieldInfos.fieldInfo(fieldNumber);
+
+ final int bits = (int) (infoAndBits & TYPE_MASK);
+ assert bits <= NUMERIC_DOUBLE: "bits=" + Integer.toHexString(bits);
+
+ switch(visitor.needsField(fieldInfo)) {
+ case YES:
+ readField(doc.in, visitor, fieldInfo, bits);
+ break;
+ case NO:
+ skipField(doc.in, bits);
+ break;
+ case STOP:
+ return;
+ }
+ }
+ }
+
+ @Override
+ public StoredFieldsReader clone() {
+ ensureOpen();
+ return new CompressingStoredFieldsReader(this, false);
+ }
+
+ @Override
+ public StoredFieldsReader getMergeInstance() {
+ ensureOpen();
+ return new CompressingStoredFieldsReader(this, true);
+ }
+
+ int getVersion() {
+ return version;
+ }
+
+ CompressionMode getCompressionMode() {
+ return compressionMode;
+ }
+
+ int getChunkSize() {
+ return chunkSize;
}
@Override
Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressingStoredFieldsWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressingStoredFieldsWriter.java?rev=1646413&r1=1646412&r2=1646413&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressingStoredFieldsWriter.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressingStoredFieldsWriter.java Thu Dec 18 09:13:21 2014
@@ -23,9 +23,8 @@ import java.util.Arrays;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.codecs.StoredFieldsReader;
import org.apache.lucene.codecs.StoredFieldsWriter;
-import org.apache.lucene.codecs.compressing.CompressingStoredFieldsReader.ChunkIterator;
+import org.apache.lucene.codecs.compressing.CompressingStoredFieldsReader.SerializedDocument;
import org.apache.lucene.document.DocumentStoredFieldVisitor;
-import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.FieldInfos;
import org.apache.lucene.index.IndexFileNames;
@@ -192,10 +191,12 @@ public final class CompressingStoredFiel
}
}
- private void writeHeader(int docBase, int numBufferedDocs, int[] numStoredFields, int[] lengths) throws IOException {
+ private void writeHeader(int docBase, int numBufferedDocs, int[] numStoredFields, int[] lengths, boolean sliced) throws IOException {
+ final int slicedBit = sliced ? 1 : 0;
+
// save docBase and numBufferedDocs
fieldsStream.writeVInt(docBase);
- fieldsStream.writeVInt(numBufferedDocs);
+ fieldsStream.writeVInt((numBufferedDocs) << 1 | slicedBit);
// save numStoredFields
saveInts(numStoredFields, numBufferedDocs, fieldsStream);
@@ -218,10 +219,11 @@ public final class CompressingStoredFiel
lengths[i] = endOffsets[i] - endOffsets[i - 1];
assert lengths[i] >= 0;
}
- writeHeader(docBase, numBufferedDocs, numStoredFields, lengths);
+ final boolean sliced = bufferedDocs.length >= 2 * chunkSize;
+ writeHeader(docBase, numBufferedDocs, numStoredFields, lengths, sliced);
// compress stored fields to fieldsStream
- if (bufferedDocs.length >= 2 * chunkSize) {
+ if (sliced) {
// big chunk, slice it
for (int compressed = 0; compressed < bufferedDocs.length; compressed += chunkSize) {
compressor.compress(bufferedDocs.bytes, compressed, Math.min(chunkSize, bufferedDocs.length - compressed), fieldsStream);
@@ -493,62 +495,35 @@ public final class CompressingStoredFiel
if (storedFieldsReader != null) {
storedFieldsReader.checkIntegrity();
}
- for (int i = nextLiveDoc(0, liveDocs, maxDoc); i < maxDoc; i = nextLiveDoc(i + 1, liveDocs, maxDoc)) {
+ for (int docID = 0; docID < maxDoc; docID++) {
+ if (liveDocs != null && liveDocs.get(docID) == false) {
+ continue;
+ }
DocumentStoredFieldVisitor visitor = new DocumentStoredFieldVisitor();
- storedFieldsReader.visitDocument(i, visitor);
+ storedFieldsReader.visitDocument(docID, visitor);
addDocument(visitor.getDocument(), mergeState.mergeFieldInfos);
++docCount;
mergeState.checkAbort.work(300);
}
} else {
- int docID = nextLiveDoc(0, liveDocs, maxDoc);
- if (docID < maxDoc) {
- // not all docs were deleted
- final ChunkIterator it = matchingFieldsReader.chunkIterator(docID);
- int[] startOffsets = new int[0];
- do {
- // go to the next chunk that contains docID
- it.next(docID);
- // transform lengths into offsets
- if (startOffsets.length < it.chunkDocs) {
- startOffsets = new int[ArrayUtil.oversize(it.chunkDocs, 4)];
- }
- for (int i = 1; i < it.chunkDocs; ++i) {
- startOffsets[i] = startOffsets[i - 1] + it.lengths[i - 1];
- }
-
- // decompress
- it.decompress();
- if (startOffsets[it.chunkDocs - 1] + it.lengths[it.chunkDocs - 1] != it.bytes.length) {
- throw new CorruptIndexException("Corrupted: expected chunk size=" + startOffsets[it.chunkDocs - 1] + it.lengths[it.chunkDocs - 1] + ", got " + it.bytes.length, it.fieldsStream);
- }
- // copy non-deleted docs
- for (; docID < it.docBase + it.chunkDocs; docID = nextLiveDoc(docID + 1, liveDocs, maxDoc)) {
- final int diff = docID - it.docBase;
- startDocument();
- bufferedDocs.writeBytes(it.bytes.bytes, it.bytes.offset + startOffsets[diff], it.lengths[diff]);
- numStoredFieldsInDoc = it.numStoredFields[diff];
- finishDocument();
- ++docCount;
- mergeState.checkAbort.work(300);
- }
- } while (docID < maxDoc);
-
- it.checkIntegrity();
+ // optimized merge, we copy serialized (but decompressed) bytes directly
+ // even on simple docs (1 stored field), it seems to help by about 20%
+ matchingFieldsReader.checkIntegrity();
+ for (int docID = 0; docID < maxDoc; docID++) {
+ if (liveDocs != null && liveDocs.get(docID) == false) {
+ continue;
+ }
+ SerializedDocument doc = matchingFieldsReader.document(docID);
+ startDocument();
+ bufferedDocs.copyBytes(doc.in, doc.length);
+ numStoredFieldsInDoc = doc.numStoredFields;
+ finishDocument();
+ ++docCount;
+ mergeState.checkAbort.work(300);
}
}
}
finish(mergeState.mergeFieldInfos, docCount);
return docCount;
}
-
- private static int nextLiveDoc(int doc, Bits liveDocs, int maxDoc) {
- if (liveDocs == null) {
- return doc;
- }
- while (doc < maxDoc && !liveDocs.get(doc)) {
- ++doc;
- }
- return doc;
- }
}
Modified: lucene/dev/trunk/lucene/test-framework/src/java/org/apache/lucene/codecs/compressing/CompressingCodec.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/test-framework/src/java/org/apache/lucene/codecs/compressing/CompressingCodec.java?rev=1646413&r1=1646412&r2=1646413&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/test-framework/src/java/org/apache/lucene/codecs/compressing/CompressingCodec.java (original)
+++ lucene/dev/trunk/lucene/test-framework/src/java/org/apache/lucene/codecs/compressing/CompressingCodec.java Thu Dec 18 09:13:21 2014
@@ -56,7 +56,9 @@ public abstract class CompressingCodec e
* suffix
*/
public static CompressingCodec randomInstance(Random random) {
- return randomInstance(random, RandomInts.randomIntBetween(random, 1, 1 << 15), RandomInts.randomIntBetween(random, 64, 1024), false);
+ final int chunkSize = random.nextBoolean() ? RandomInts.randomIntBetween(random, 1, 10) : RandomInts.randomIntBetween(random, 1, 1 << 15);
+ final int chunkDocs = random.nextBoolean() ? RandomInts.randomIntBetween(random, 1, 10) : RandomInts.randomIntBetween(random, 64, 1024);
+ return randomInstance(random, chunkSize, chunkDocs, false);
}
/**
Modified: lucene/dev/trunk/lucene/test-framework/src/java/org/apache/lucene/index/BaseStoredFieldsFormatTestCase.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/test-framework/src/java/org/apache/lucene/index/BaseStoredFieldsFormatTestCase.java?rev=1646413&r1=1646412&r2=1646413&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/test-framework/src/java/org/apache/lucene/index/BaseStoredFieldsFormatTestCase.java (original)
+++ lucene/dev/trunk/lucene/test-framework/src/java/org/apache/lucene/index/BaseStoredFieldsFormatTestCase.java Thu Dec 18 09:13:21 2014
@@ -57,8 +57,10 @@ import org.apache.lucene.store.MockDirec
import org.apache.lucene.store.MockDirectoryWrapper;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.TestUtil;
+
import com.carrotsearch.randomizedtesting.generators.RandomInts;
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
+import com.carrotsearch.randomizedtesting.generators.RandomStrings;
/**
* Base class aiming at testing {@link StoredFieldsFormat stored fields formats}.
@@ -579,7 +581,94 @@ public abstract class BaseStoredFieldsFo
iw.close();
dir.close();
}
-
+
+ /** A dummy filter reader that reverse the order of documents in stored fields. */
+ private static class DummyFilterLeafReader extends FilterLeafReader {
+
+ public DummyFilterLeafReader(LeafReader in) {
+ super(in);
+ }
+
+ @Override
+ public void document(int docID, StoredFieldVisitor visitor) throws IOException {
+ super.document(maxDoc() - 1 - docID, visitor);
+ }
+
+ }
+
+ private static class DummyFilterDirectoryReader extends FilterDirectoryReader {
+
+ public DummyFilterDirectoryReader(DirectoryReader in) {
+ super(in, new SubReaderWrapper() {
+ @Override
+ public LeafReader wrap(LeafReader reader) {
+ return new DummyFilterLeafReader(reader);
+ }
+ });
+ }
+
+ @Override
+ protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) {
+ return new DummyFilterDirectoryReader(in);
+ }
+
+ }
+
+ public void testMergeFilterReader() throws IOException {
+ Directory dir = newDirectory();
+ RandomIndexWriter w = new RandomIndexWriter(random(), dir);
+ final int numDocs = atLeast(200);
+ final String[] stringValues = new String[10];
+ for (int i = 0; i < stringValues.length; ++i) {
+ stringValues[i] = RandomStrings.randomRealisticUnicodeOfLength(random(), 10);
+ }
+ Document[] docs = new Document[numDocs];
+ for (int i = 0; i < numDocs; ++i) {
+ Document doc = new Document();
+ doc.add(new StringField("to_delete", random().nextBoolean() ? "yes" : "no", Store.NO));
+ doc.add(new StoredField("id", i));
+ doc.add(new StoredField("i", random().nextInt(50)));
+ doc.add(new StoredField("l", random().nextLong()));
+ doc.add(new StoredField("d", random().nextDouble()));
+ doc.add(new StoredField("f", random().nextFloat()));
+ doc.add(new StoredField("s", RandomPicks.randomFrom(random(), stringValues)));
+ doc.add(new StoredField("b", new BytesRef(RandomPicks.randomFrom(random(), stringValues))));
+ docs[i] = doc;
+ w.addDocument(doc);
+ }
+ if (random().nextBoolean()) {
+ w.deleteDocuments(new Term("to_delete", "yes"));
+ }
+ w.commit();
+ w.close();
+
+ DirectoryReader reader = new DummyFilterDirectoryReader(DirectoryReader.open(dir));
+
+ Directory dir2 = newDirectory();
+ w = new RandomIndexWriter(random(), dir2);
+ w.addIndexes(reader);
+ reader.close();
+ dir.close();
+
+ reader = w.getReader();
+ for (int i = 0; i < reader.maxDoc(); ++i) {
+ final StoredDocument doc = reader.document(i);
+ final int id = doc.getField("id").numericValue().intValue();
+ final Document expected = docs[id];
+ assertEquals(expected.get("s"), doc.get("s"));
+ assertEquals(expected.getField("i").numericValue(), doc.getField("i").numericValue());
+ assertEquals(expected.getField("l").numericValue(), doc.getField("l").numericValue());
+ assertEquals(expected.getField("d").numericValue(), doc.getField("d").numericValue());
+ assertEquals(expected.getField("f").numericValue(), doc.getField("f").numericValue());
+ assertEquals(expected.getField("b").binaryValue(), doc.getField("b").binaryValue());
+ }
+
+ reader.close();
+ w.close();
+ TestUtil.checkIndex(dir2);
+ dir2.close();
+ }
+
@Nightly
public void testBigDocuments() throws IOException {
// "big" as "much bigger than the chunk size"