You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by jp...@apache.org on 2013/09/04 16:00:34 UTC

svn commit: r1520025 - in /lucene/dev/trunk/lucene: ./ core/src/java/org/apache/lucene/codecs/compressing/ core/src/java/org/apache/lucene/codecs/lucene41/

Author: jpountz
Date: Wed Sep  4 14:00:34 2013
New Revision: 1520025

URL: http://svn.apache.org/r1520025
Log:
LUCENE-5188: Make CompressingStoredFieldsFormat more friendly to StoredFieldVisitors.

Modified:
    lucene/dev/trunk/lucene/CHANGES.txt
    lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressingStoredFieldsReader.java
    lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressingStoredFieldsWriter.java
    lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/lucene41/Lucene41StoredFieldsFormat.java

Modified: lucene/dev/trunk/lucene/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/CHANGES.txt?rev=1520025&r1=1520024&r2=1520025&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/CHANGES.txt (original)
+++ lucene/dev/trunk/lucene/CHANGES.txt Wed Sep  4 14:00:34 2013
@@ -259,6 +259,10 @@ Optimizations
 * LUCENE-5182: Terminate phrase searches early if max phrase window is 
   exceeded in FastVectorHighlighter to prevent very long running phrase
   extraction if phrase terms are high frequent. (Simon Willnauer)
+
+* LUCENE-5188: CompressingStoredFieldsFormat now slices chunks containing big
+  documents into fixed-size blocks so that requesting a single field does not
+  necessarily force to decompress the whole chunk. (Adrien Grand)
   
 Documentation
 

Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressingStoredFieldsReader.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressingStoredFieldsReader.java?rev=1520025&r1=1520024&r2=1520025&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressingStoredFieldsReader.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressingStoredFieldsReader.java Wed Sep  4 14:00:34 2013
@@ -27,11 +27,13 @@ import static org.apache.lucene.codecs.c
 import static org.apache.lucene.codecs.compressing.CompressingStoredFieldsWriter.STRING;
 import static org.apache.lucene.codecs.compressing.CompressingStoredFieldsWriter.TYPE_BITS;
 import static org.apache.lucene.codecs.compressing.CompressingStoredFieldsWriter.TYPE_MASK;
+import static org.apache.lucene.codecs.compressing.CompressingStoredFieldsWriter.VERSION_BIG_CHUNKS;
 import static org.apache.lucene.codecs.compressing.CompressingStoredFieldsWriter.VERSION_CURRENT;
 import static org.apache.lucene.codecs.compressing.CompressingStoredFieldsWriter.VERSION_START;
 import static org.apache.lucene.codecs.lucene40.Lucene40StoredFieldsWriter.FIELDS_EXTENSION;
 import static org.apache.lucene.codecs.lucene40.Lucene40StoredFieldsWriter.FIELDS_INDEX_EXTENSION;
 
+import java.io.EOFException;
 import java.io.IOException;
 import java.util.Arrays;
 
@@ -45,6 +47,7 @@ import org.apache.lucene.index.SegmentIn
 import org.apache.lucene.index.StoredFieldVisitor;
 import org.apache.lucene.store.AlreadyClosedException;
 import org.apache.lucene.store.ByteArrayDataInput;
+import org.apache.lucene.store.DataInput;
 import org.apache.lucene.store.DataOutput;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.IOContext;
@@ -63,9 +66,23 @@ public final class CompressingStoredFiel
   // Do not reuse the decompression buffer when there is more than 32kb to decompress
   private static final int BUFFER_REUSE_THRESHOLD = 1 << 15;
 
+  private static final byte[] SKIP_BUFFER = new byte[1024];
+
+  // TODO: should this be a method on DataInput?
+  private static void skipBytes(DataInput in, long numBytes) throws IOException {
+    assert numBytes >= 0;
+    for (long skipped = 0; skipped < numBytes; ) {
+      final int toRead = (int) Math.min(numBytes - skipped, SKIP_BUFFER.length);
+      in.readBytes(SKIP_BUFFER, 0, toRead);
+      skipped += toRead;
+    }
+  }
+
+  private final int version;
   private final FieldInfos fieldInfos;
   private final CompressingStoredFieldsIndexReader indexReader;
   private final IndexInput fieldsStream;
+  private final int chunkSize;
   private final int packedIntsVersion;
   private final CompressionMode compressionMode;
   private final Decompressor decompressor;
@@ -75,9 +92,11 @@ public final class CompressingStoredFiel
 
   // used by clone
   private CompressingStoredFieldsReader(CompressingStoredFieldsReader reader) {
+    this.version = reader.version;
     this.fieldInfos = reader.fieldInfos;
     this.fieldsStream = reader.fieldsStream.clone();
     this.indexReader = reader.indexReader.clone();
+    this.chunkSize = reader.chunkSize;
     this.packedIntsVersion = reader.packedIntsVersion;
     this.compressionMode = reader.compressionMode;
     this.decompressor = reader.decompressor.clone();
@@ -100,7 +119,7 @@ public final class CompressingStoredFiel
       final String indexStreamFN = IndexFileNames.segmentFileName(segment, segmentSuffix, FIELDS_INDEX_EXTENSION);
       indexStream = d.openInput(indexStreamFN, context);
       final String codecNameIdx = formatName + CODEC_SFX_IDX;
-      CodecUtil.checkHeader(indexStream, codecNameIdx, VERSION_START, VERSION_CURRENT);
+      version = CodecUtil.checkHeader(indexStream, codecNameIdx, VERSION_START, VERSION_CURRENT);
       assert CodecUtil.headerLength(codecNameIdx) == indexStream.getFilePointer();
       indexReader = new CompressingStoredFieldsIndexReader(indexStream, si);
       indexStream.close();
@@ -110,9 +129,17 @@ public final class CompressingStoredFiel
       final String fieldsStreamFN = IndexFileNames.segmentFileName(segment, segmentSuffix, FIELDS_EXTENSION);
       fieldsStream = d.openInput(fieldsStreamFN, context);
       final String codecNameDat = formatName + CODEC_SFX_DAT;
-      CodecUtil.checkHeader(fieldsStream, codecNameDat, VERSION_START, VERSION_CURRENT);
+      final int fieldsVersion = CodecUtil.checkHeader(fieldsStream, codecNameDat, VERSION_START, VERSION_CURRENT);
+      if (version != fieldsVersion) {
+        throw new CorruptIndexException("Version mismatch between stored fields index and data: " + version + " != " + fieldsVersion);
+      }
       assert CodecUtil.headerLength(codecNameDat) == fieldsStream.getFilePointer();
 
+      if (version >= VERSION_BIG_CHUNKS) {
+        chunkSize = fieldsStream.readVInt();
+      } else {
+        chunkSize = -1;
+      }
       packedIntsVersion = fieldsStream.readVInt();
       decompressor = compressionMode.newDecompressor();
       this.bytes = new BytesRef();
@@ -145,7 +172,7 @@ public final class CompressingStoredFiel
     }
   }
 
-  private static void readField(ByteArrayDataInput in, StoredFieldVisitor visitor, FieldInfo info, int bits) throws IOException {
+  private static void readField(DataInput in, StoredFieldVisitor visitor, FieldInfo info, int bits) throws IOException {
     switch (bits & TYPE_MASK) {
       case BYTE_ARR:
         int length = in.readVInt();
@@ -176,12 +203,12 @@ public final class CompressingStoredFiel
     }
   }
 
-  private static void skipField(ByteArrayDataInput in, int bits) throws IOException {
+  private static void skipField(DataInput in, int bits) throws IOException {
     switch (bits & TYPE_MASK) {
       case BYTE_ARR:
       case STRING:
         final int length = in.readVInt();
-        in.skipBytes(length);
+        skipBytes(in, length);
         break;
       case NUMERIC_INT:
       case NUMERIC_FLOAT:
@@ -261,11 +288,56 @@ public final class CompressingStoredFiel
       return;
     }
 
-    final BytesRef bytes = totalLength <= BUFFER_REUSE_THRESHOLD ? this.bytes : new BytesRef();
-    decompressor.decompress(fieldsStream, totalLength, offset, length, bytes);
-    assert bytes.length == length;
+    final DataInput documentInput;
+    if (version >= VERSION_BIG_CHUNKS && totalLength >= 2 * chunkSize) {
+      assert chunkSize > 0;
+      assert offset < chunkSize;
+
+      decompressor.decompress(fieldsStream, chunkSize, offset, Math.min(length, chunkSize - offset), bytes);
+      documentInput = new DataInput() {
+
+        int decompressed = bytes.length;
+
+        void fillBuffer() throws IOException {
+          assert decompressed <= length;
+          if (decompressed == length) {
+            throw new EOFException();
+          }
+          final int toDecompress = Math.min(length - decompressed, chunkSize);
+          decompressor.decompress(fieldsStream, toDecompress, 0, toDecompress, bytes);
+          decompressed += toDecompress;
+        }
+
+        @Override
+        public byte readByte() throws IOException {
+          if (bytes.length == 0) {
+            fillBuffer();
+          }
+          --bytes.length;
+          return bytes.bytes[bytes.offset++];
+        }
+
+        @Override
+        public void readBytes(byte[] b, int offset, int len) throws IOException {
+          while (len > bytes.length) {
+            System.arraycopy(bytes.bytes, bytes.offset, b, offset, bytes.length);
+            len -= bytes.length;
+            offset += bytes.length;
+            fillBuffer();
+          }
+          System.arraycopy(bytes.bytes, bytes.offset, b, offset, len);
+          bytes.offset += len;
+          bytes.length -= len;
+        }
+
+      };
+    } else {
+      final BytesRef bytes = totalLength <= BUFFER_REUSE_THRESHOLD ? this.bytes : new BytesRef();
+      decompressor.decompress(fieldsStream, totalLength, offset, length, bytes);
+      assert bytes.length == length;
+      documentInput = new ByteArrayDataInput(bytes.bytes, bytes.offset, bytes.length);
+    }
 
-    final ByteArrayDataInput documentInput = new ByteArrayDataInput(bytes.bytes, bytes.offset, bytes.length);
     for (int fieldIDX = 0; fieldIDX < numStoredFields; fieldIDX++) {
       final long infoAndBits = documentInput.readVLong();
       final int fieldNumber = (int) (infoAndBits >>> TYPE_BITS);
@@ -277,17 +349,14 @@ public final class CompressingStoredFiel
       switch(visitor.needsField(fieldInfo)) {
         case YES:
           readField(documentInput, visitor, fieldInfo, bits);
-          assert documentInput.getPosition() <= bytes.offset + bytes.length : documentInput.getPosition() + " " + bytes.offset + bytes.length;
           break;
         case NO:
           skipField(documentInput, bits);
-          assert documentInput.getPosition() <= bytes.offset + bytes.length : documentInput.getPosition() + " " + bytes.offset + bytes.length;
           break;
         case STOP:
           return;
       }
     }
-    assert documentInput.getPosition() == bytes.offset + bytes.length : documentInput.getPosition() + " " + bytes.offset + " " + bytes.length;
   }
 
   @Override
@@ -296,6 +365,10 @@ public final class CompressingStoredFiel
     return new CompressingStoredFieldsReader(this);
   }
 
+  int getVersion() {
+    return version;
+  }
+
   CompressionMode getCompressionMode() {
     return compressionMode;
   }
@@ -308,6 +381,7 @@ public final class CompressingStoredFiel
 
   final class ChunkIterator {
 
+    BytesRef spare;
     BytesRef bytes;
     int docBase;
     int chunkDocs;
@@ -317,6 +391,7 @@ public final class CompressingStoredFiel
     private ChunkIterator() {
       this.docBase = -1;
       bytes = new BytesRef();
+      spare = new BytesRef();
       numStoredFields = new int[1];
       lengths = new int[1];
     }
@@ -392,7 +467,19 @@ public final class CompressingStoredFiel
     void decompress() throws IOException {
       // decompress data
       final int chunkSize = chunkSize();
-      decompressor.decompress(fieldsStream, chunkSize, 0, chunkSize, bytes);
+      if (version >= VERSION_BIG_CHUNKS && chunkSize >= 2 * CompressingStoredFieldsReader.this.chunkSize) {
+        bytes.offset = bytes.length = 0;
+        for (int decompressed = 0; decompressed < chunkSize; ) {
+          final int toDecompress = Math.min(chunkSize - decompressed, CompressingStoredFieldsReader.this.chunkSize);
+          decompressor.decompress(fieldsStream, toDecompress, 0, toDecompress, spare);
+          bytes.bytes = ArrayUtil.grow(bytes.bytes, bytes.length + spare.length);
+          System.arraycopy(spare.bytes, spare.offset, bytes.bytes, bytes.length, spare.length);
+          bytes.length += spare.length;
+          decompressed += toDecompress;
+        }
+      } else {
+        decompressor.decompress(fieldsStream, chunkSize, 0, chunkSize, bytes);
+      }
       if (bytes.length != chunkSize) {
         throw new CorruptIndexException("Corrupted: expected chunk size = " + chunkSize() + ", got " + bytes.length + " (resource=" + fieldsStream + ")");
       }

Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressingStoredFieldsWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressingStoredFieldsWriter.java?rev=1520025&r1=1520024&r2=1520025&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressingStoredFieldsWriter.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressingStoredFieldsWriter.java Wed Sep  4 14:00:34 2013
@@ -70,7 +70,8 @@ public final class CompressingStoredFiel
   static final String CODEC_SFX_IDX = "Index";
   static final String CODEC_SFX_DAT = "Data";
   static final int VERSION_START = 0;
-  static final int VERSION_CURRENT = VERSION_START;
+  static final int VERSION_BIG_CHUNKS = 1;
+  static final int VERSION_CURRENT = VERSION_BIG_CHUNKS;
 
   private final Directory directory;
   private final String segment;
@@ -119,6 +120,7 @@ public final class CompressingStoredFiel
       indexWriter = new CompressingStoredFieldsIndexWriter(indexStream);
       indexStream = null;
 
+      fieldsStream.writeVInt(chunkSize);
       fieldsStream.writeVInt(PackedInts.VERSION_CURRENT);
 
       success = true;
@@ -219,7 +221,14 @@ public final class CompressingStoredFiel
     writeHeader(docBase, numBufferedDocs, numStoredFields, lengths);
 
     // compress stored fields to fieldsStream
-    compressor.compress(bufferedDocs.bytes, 0, bufferedDocs.length, fieldsStream);
+    if (bufferedDocs.length >= 2 * chunkSize) {
+      // big chunk, slice it
+      for (int compressed = 0; compressed < bufferedDocs.length; compressed += chunkSize) {
+        compressor.compress(bufferedDocs.bytes, compressed, Math.min(chunkSize, bufferedDocs.length - compressed), fieldsStream);
+      }
+    } else {
+      compressor.compress(bufferedDocs.bytes, 0, bufferedDocs.length, fieldsStream);
+    }
 
     // reset
     docBase += numBufferedDocs;
@@ -327,7 +336,8 @@ public final class CompressingStoredFiel
       final int maxDoc = reader.maxDoc();
       final Bits liveDocs = reader.getLiveDocs();
 
-      if (matchingFieldsReader == null) {
+      if (matchingFieldsReader == null
+          || matchingFieldsReader.getVersion() != VERSION_CURRENT) { // means reader version is not the same as the writer version
         // naive merge...
         for (int i = nextLiveDoc(0, liveDocs, maxDoc); i < maxDoc; i = nextLiveDoc(i + 1, liveDocs, maxDoc)) {
           StoredDocument doc = reader.document(i);

Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/lucene41/Lucene41StoredFieldsFormat.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/lucene41/Lucene41StoredFieldsFormat.java?rev=1520025&r1=1520024&r2=1520025&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/lucene41/Lucene41StoredFieldsFormat.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/codecs/lucene41/Lucene41StoredFieldsFormat.java Wed Sep  4 14:00:34 2013
@@ -23,6 +23,7 @@ import org.apache.lucene.codecs.compress
 import org.apache.lucene.codecs.compressing.CompressingStoredFieldsIndexWriter;
 import org.apache.lucene.codecs.compressing.CompressionMode;
 import org.apache.lucene.codecs.lucene40.Lucene40StoredFieldsFormat;
+import org.apache.lucene.index.StoredFieldVisitor;
 import org.apache.lucene.store.DataOutput;
 import org.apache.lucene.util.packed.PackedInts;
 
@@ -88,6 +89,11 @@ import org.apache.lucene.util.packed.Pac
  * <li>If documents are larger than 16KB then chunks will likely contain only
  * one document. However, documents can never spread across several chunks (all
  * fields of a single document are in the same chunk).</li>
+ * <li>When at least one document in a chunk is large enough so that the chunk
+ * is larger than 32KB, the chunk will actually be compressed in several LZ4
+ * blocks of 16KB. This allows {@link StoredFieldVisitor}s which are only
+ * interested in the first fields of a document to not have to decompress 10MB
+ * of data if the document is 10MB, but only 16KB.</li>
  * <li>Given that the original lengths are written in the metadata of the chunk,
  * the decompressor can leverage this information to stop decoding as soon as
  * enough data has been decompressed.</li>