You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by jp...@apache.org on 2013/09/04 16:00:34 UTC
svn commit: r1520025 - in /lucene/dev/trunk/lucene: ./
core/src/java/org/apache/lucene/codecs/compressing/
core/src/java/org/apache/lucene/codecs/lucene41/
Author: jpountz
Date: Wed Sep 4 14:00:34 2013
New Revision: 1520025
URL: http://svn.apache.org/r1520025
Log:
LUCENE-5188: Make CompressingStoredFieldsFormat more friendly to StoredFieldVisitors.
Modified:
lucene/dev/trunk/lucene/CHANGES.txt
lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressingStoredFieldsReader.java
lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressingStoredFieldsWriter.java
lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/lucene41/Lucene41StoredFieldsFormat.java
Modified: lucene/dev/trunk/lucene/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/CHANGES.txt?rev=1520025&r1=1520024&r2=1520025&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/CHANGES.txt (original)
+++ lucene/dev/trunk/lucene/CHANGES.txt Wed Sep 4 14:00:34 2013
@@ -259,6 +259,10 @@ Optimizations
* LUCENE-5182: Terminate phrase searches early if max phrase window is
exceeded in FastVectorHighlighter to prevent very long running phrase
extraction if phrase terms are high frequent. (Simon Willnauer)
+
+* LUCENE-5188: CompressingStoredFieldsFormat now slices chunks containing big
+ documents into fixed-size blocks so that requesting a single field does not
+ necessarily force to decompress the whole chunk. (Adrien Grand)
Documentation
Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressingStoredFieldsReader.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressingStoredFieldsReader.java?rev=1520025&r1=1520024&r2=1520025&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressingStoredFieldsReader.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressingStoredFieldsReader.java Wed Sep 4 14:00:34 2013
@@ -27,11 +27,13 @@ import static org.apache.lucene.codecs.c
import static org.apache.lucene.codecs.compressing.CompressingStoredFieldsWriter.STRING;
import static org.apache.lucene.codecs.compressing.CompressingStoredFieldsWriter.TYPE_BITS;
import static org.apache.lucene.codecs.compressing.CompressingStoredFieldsWriter.TYPE_MASK;
+import static org.apache.lucene.codecs.compressing.CompressingStoredFieldsWriter.VERSION_BIG_CHUNKS;
import static org.apache.lucene.codecs.compressing.CompressingStoredFieldsWriter.VERSION_CURRENT;
import static org.apache.lucene.codecs.compressing.CompressingStoredFieldsWriter.VERSION_START;
import static org.apache.lucene.codecs.lucene40.Lucene40StoredFieldsWriter.FIELDS_EXTENSION;
import static org.apache.lucene.codecs.lucene40.Lucene40StoredFieldsWriter.FIELDS_INDEX_EXTENSION;
+import java.io.EOFException;
import java.io.IOException;
import java.util.Arrays;
@@ -45,6 +47,7 @@ import org.apache.lucene.index.SegmentIn
import org.apache.lucene.index.StoredFieldVisitor;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.ByteArrayDataInput;
+import org.apache.lucene.store.DataInput;
import org.apache.lucene.store.DataOutput;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
@@ -63,9 +66,23 @@ public final class CompressingStoredFiel
// Do not reuse the decompression buffer when there is more than 32kb to decompress
private static final int BUFFER_REUSE_THRESHOLD = 1 << 15;
+ private static final byte[] SKIP_BUFFER = new byte[1024];
+
+ // TODO: should this be a method on DataInput?
+ private static void skipBytes(DataInput in, long numBytes) throws IOException {
+ assert numBytes >= 0;
+ for (long skipped = 0; skipped < numBytes; ) {
+ final int toRead = (int) Math.min(numBytes - skipped, SKIP_BUFFER.length);
+ in.readBytes(SKIP_BUFFER, 0, toRead);
+ skipped += toRead;
+ }
+ }
+
+ private final int version;
private final FieldInfos fieldInfos;
private final CompressingStoredFieldsIndexReader indexReader;
private final IndexInput fieldsStream;
+ private final int chunkSize;
private final int packedIntsVersion;
private final CompressionMode compressionMode;
private final Decompressor decompressor;
@@ -75,9 +92,11 @@ public final class CompressingStoredFiel
// used by clone
private CompressingStoredFieldsReader(CompressingStoredFieldsReader reader) {
+ this.version = reader.version;
this.fieldInfos = reader.fieldInfos;
this.fieldsStream = reader.fieldsStream.clone();
this.indexReader = reader.indexReader.clone();
+ this.chunkSize = reader.chunkSize;
this.packedIntsVersion = reader.packedIntsVersion;
this.compressionMode = reader.compressionMode;
this.decompressor = reader.decompressor.clone();
@@ -100,7 +119,7 @@ public final class CompressingStoredFiel
final String indexStreamFN = IndexFileNames.segmentFileName(segment, segmentSuffix, FIELDS_INDEX_EXTENSION);
indexStream = d.openInput(indexStreamFN, context);
final String codecNameIdx = formatName + CODEC_SFX_IDX;
- CodecUtil.checkHeader(indexStream, codecNameIdx, VERSION_START, VERSION_CURRENT);
+ version = CodecUtil.checkHeader(indexStream, codecNameIdx, VERSION_START, VERSION_CURRENT);
assert CodecUtil.headerLength(codecNameIdx) == indexStream.getFilePointer();
indexReader = new CompressingStoredFieldsIndexReader(indexStream, si);
indexStream.close();
@@ -110,9 +129,17 @@ public final class CompressingStoredFiel
final String fieldsStreamFN = IndexFileNames.segmentFileName(segment, segmentSuffix, FIELDS_EXTENSION);
fieldsStream = d.openInput(fieldsStreamFN, context);
final String codecNameDat = formatName + CODEC_SFX_DAT;
- CodecUtil.checkHeader(fieldsStream, codecNameDat, VERSION_START, VERSION_CURRENT);
+ final int fieldsVersion = CodecUtil.checkHeader(fieldsStream, codecNameDat, VERSION_START, VERSION_CURRENT);
+ if (version != fieldsVersion) {
+ throw new CorruptIndexException("Version mismatch between stored fields index and data: " + version + " != " + fieldsVersion);
+ }
assert CodecUtil.headerLength(codecNameDat) == fieldsStream.getFilePointer();
+ if (version >= VERSION_BIG_CHUNKS) {
+ chunkSize = fieldsStream.readVInt();
+ } else {
+ chunkSize = -1;
+ }
packedIntsVersion = fieldsStream.readVInt();
decompressor = compressionMode.newDecompressor();
this.bytes = new BytesRef();
@@ -145,7 +172,7 @@ public final class CompressingStoredFiel
}
}
- private static void readField(ByteArrayDataInput in, StoredFieldVisitor visitor, FieldInfo info, int bits) throws IOException {
+ private static void readField(DataInput in, StoredFieldVisitor visitor, FieldInfo info, int bits) throws IOException {
switch (bits & TYPE_MASK) {
case BYTE_ARR:
int length = in.readVInt();
@@ -176,12 +203,12 @@ public final class CompressingStoredFiel
}
}
- private static void skipField(ByteArrayDataInput in, int bits) throws IOException {
+ private static void skipField(DataInput in, int bits) throws IOException {
switch (bits & TYPE_MASK) {
case BYTE_ARR:
case STRING:
final int length = in.readVInt();
- in.skipBytes(length);
+ skipBytes(in, length);
break;
case NUMERIC_INT:
case NUMERIC_FLOAT:
@@ -261,11 +288,56 @@ public final class CompressingStoredFiel
return;
}
- final BytesRef bytes = totalLength <= BUFFER_REUSE_THRESHOLD ? this.bytes : new BytesRef();
- decompressor.decompress(fieldsStream, totalLength, offset, length, bytes);
- assert bytes.length == length;
+ final DataInput documentInput;
+ if (version >= VERSION_BIG_CHUNKS && totalLength >= 2 * chunkSize) {
+ assert chunkSize > 0;
+ assert offset < chunkSize;
+
+ decompressor.decompress(fieldsStream, chunkSize, offset, Math.min(length, chunkSize - offset), bytes);
+ documentInput = new DataInput() {
+
+ int decompressed = bytes.length;
+
+ void fillBuffer() throws IOException {
+ assert decompressed <= length;
+ if (decompressed == length) {
+ throw new EOFException();
+ }
+ final int toDecompress = Math.min(length - decompressed, chunkSize);
+ decompressor.decompress(fieldsStream, toDecompress, 0, toDecompress, bytes);
+ decompressed += toDecompress;
+ }
+
+ @Override
+ public byte readByte() throws IOException {
+ if (bytes.length == 0) {
+ fillBuffer();
+ }
+ --bytes.length;
+ return bytes.bytes[bytes.offset++];
+ }
+
+ @Override
+ public void readBytes(byte[] b, int offset, int len) throws IOException {
+ while (len > bytes.length) {
+ System.arraycopy(bytes.bytes, bytes.offset, b, offset, bytes.length);
+ len -= bytes.length;
+ offset += bytes.length;
+ fillBuffer();
+ }
+ System.arraycopy(bytes.bytes, bytes.offset, b, offset, len);
+ bytes.offset += len;
+ bytes.length -= len;
+ }
+
+ };
+ } else {
+ final BytesRef bytes = totalLength <= BUFFER_REUSE_THRESHOLD ? this.bytes : new BytesRef();
+ decompressor.decompress(fieldsStream, totalLength, offset, length, bytes);
+ assert bytes.length == length;
+ documentInput = new ByteArrayDataInput(bytes.bytes, bytes.offset, bytes.length);
+ }
- final ByteArrayDataInput documentInput = new ByteArrayDataInput(bytes.bytes, bytes.offset, bytes.length);
for (int fieldIDX = 0; fieldIDX < numStoredFields; fieldIDX++) {
final long infoAndBits = documentInput.readVLong();
final int fieldNumber = (int) (infoAndBits >>> TYPE_BITS);
@@ -277,17 +349,14 @@ public final class CompressingStoredFiel
switch(visitor.needsField(fieldInfo)) {
case YES:
readField(documentInput, visitor, fieldInfo, bits);
- assert documentInput.getPosition() <= bytes.offset + bytes.length : documentInput.getPosition() + " " + bytes.offset + bytes.length;
break;
case NO:
skipField(documentInput, bits);
- assert documentInput.getPosition() <= bytes.offset + bytes.length : documentInput.getPosition() + " " + bytes.offset + bytes.length;
break;
case STOP:
return;
}
}
- assert documentInput.getPosition() == bytes.offset + bytes.length : documentInput.getPosition() + " " + bytes.offset + " " + bytes.length;
}
@Override
@@ -296,6 +365,10 @@ public final class CompressingStoredFiel
return new CompressingStoredFieldsReader(this);
}
+ int getVersion() {
+ return version;
+ }
+
CompressionMode getCompressionMode() {
return compressionMode;
}
@@ -308,6 +381,7 @@ public final class CompressingStoredFiel
final class ChunkIterator {
+ BytesRef spare;
BytesRef bytes;
int docBase;
int chunkDocs;
@@ -317,6 +391,7 @@ public final class CompressingStoredFiel
private ChunkIterator() {
this.docBase = -1;
bytes = new BytesRef();
+ spare = new BytesRef();
numStoredFields = new int[1];
lengths = new int[1];
}
@@ -392,7 +467,19 @@ public final class CompressingStoredFiel
void decompress() throws IOException {
// decompress data
final int chunkSize = chunkSize();
- decompressor.decompress(fieldsStream, chunkSize, 0, chunkSize, bytes);
+ if (version >= VERSION_BIG_CHUNKS && chunkSize >= 2 * CompressingStoredFieldsReader.this.chunkSize) {
+ bytes.offset = bytes.length = 0;
+ for (int decompressed = 0; decompressed < chunkSize; ) {
+ final int toDecompress = Math.min(chunkSize - decompressed, CompressingStoredFieldsReader.this.chunkSize);
+ decompressor.decompress(fieldsStream, toDecompress, 0, toDecompress, spare);
+ bytes.bytes = ArrayUtil.grow(bytes.bytes, bytes.length + spare.length);
+ System.arraycopy(spare.bytes, spare.offset, bytes.bytes, bytes.length, spare.length);
+ bytes.length += spare.length;
+ decompressed += toDecompress;
+ }
+ } else {
+ decompressor.decompress(fieldsStream, chunkSize, 0, chunkSize, bytes);
+ }
if (bytes.length != chunkSize) {
throw new CorruptIndexException("Corrupted: expected chunk size = " + chunkSize() + ", got " + bytes.length + " (resource=" + fieldsStream + ")");
}
Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressingStoredFieldsWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressingStoredFieldsWriter.java?rev=1520025&r1=1520024&r2=1520025&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressingStoredFieldsWriter.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressingStoredFieldsWriter.java Wed Sep 4 14:00:34 2013
@@ -70,7 +70,8 @@ public final class CompressingStoredFiel
static final String CODEC_SFX_IDX = "Index";
static final String CODEC_SFX_DAT = "Data";
static final int VERSION_START = 0;
- static final int VERSION_CURRENT = VERSION_START;
+ static final int VERSION_BIG_CHUNKS = 1;
+ static final int VERSION_CURRENT = VERSION_BIG_CHUNKS;
private final Directory directory;
private final String segment;
@@ -119,6 +120,7 @@ public final class CompressingStoredFiel
indexWriter = new CompressingStoredFieldsIndexWriter(indexStream);
indexStream = null;
+ fieldsStream.writeVInt(chunkSize);
fieldsStream.writeVInt(PackedInts.VERSION_CURRENT);
success = true;
@@ -219,7 +221,14 @@ public final class CompressingStoredFiel
writeHeader(docBase, numBufferedDocs, numStoredFields, lengths);
// compress stored fields to fieldsStream
- compressor.compress(bufferedDocs.bytes, 0, bufferedDocs.length, fieldsStream);
+ if (bufferedDocs.length >= 2 * chunkSize) {
+ // big chunk, slice it
+ for (int compressed = 0; compressed < bufferedDocs.length; compressed += chunkSize) {
+ compressor.compress(bufferedDocs.bytes, compressed, Math.min(chunkSize, bufferedDocs.length - compressed), fieldsStream);
+ }
+ } else {
+ compressor.compress(bufferedDocs.bytes, 0, bufferedDocs.length, fieldsStream);
+ }
// reset
docBase += numBufferedDocs;
@@ -327,7 +336,8 @@ public final class CompressingStoredFiel
final int maxDoc = reader.maxDoc();
final Bits liveDocs = reader.getLiveDocs();
- if (matchingFieldsReader == null) {
+ if (matchingFieldsReader == null
+ || matchingFieldsReader.getVersion() != VERSION_CURRENT) { // means reader version is not the same as the writer version
// naive merge...
for (int i = nextLiveDoc(0, liveDocs, maxDoc); i < maxDoc; i = nextLiveDoc(i + 1, liveDocs, maxDoc)) {
StoredDocument doc = reader.document(i);
Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/lucene41/Lucene41StoredFieldsFormat.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/lucene41/Lucene41StoredFieldsFormat.java?rev=1520025&r1=1520024&r2=1520025&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/lucene41/Lucene41StoredFieldsFormat.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/lucene41/Lucene41StoredFieldsFormat.java Wed Sep 4 14:00:34 2013
@@ -23,6 +23,7 @@ import org.apache.lucene.codecs.compress
import org.apache.lucene.codecs.compressing.CompressingStoredFieldsIndexWriter;
import org.apache.lucene.codecs.compressing.CompressionMode;
import org.apache.lucene.codecs.lucene40.Lucene40StoredFieldsFormat;
+import org.apache.lucene.index.StoredFieldVisitor;
import org.apache.lucene.store.DataOutput;
import org.apache.lucene.util.packed.PackedInts;
@@ -88,6 +89,11 @@ import org.apache.lucene.util.packed.Pac
* <li>If documents are larger than 16KB then chunks will likely contain only
* one document. However, documents can never spread across several chunks (all
* fields of a single document are in the same chunk).</li>
+ * <li>When at least one document in a chunk is large enough so that the chunk
+ * is larger than 32KB, the chunk will actually be compressed in several LZ4
+ * blocks of 16KB. This allows {@link StoredFieldVisitor}s which are only
+ * interested in the first fields of a document to not have to decompress 10MB
+ * of data if the document is 10MB, but only 16KB.</li>
* <li>Given that the original lengths are written in the metadata of the chunk,
* the decompressor can leverage this information to stop decoding as soon as
* enough data has been decompressed.</li>