You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by mi...@apache.org on 2011/01/14 13:03:23 UTC
svn commit: r1058973 - in /lucene/dev/branches/bulkpostings: ./ lucene/
lucene/contrib/instantiated/src/test/org/apache/lucene/store/instantiated/
lucene/src/java/org/apache/lucene/index/
lucene/src/java/org/apache/lucene/index/codecs/ lucene/src/java/...
Author: mikemccand
Date: Fri Jan 14 12:03:22 2011
New Revision: 1058973
URL: http://svn.apache.org/viewvc?rev=1058973&view=rev
Log:
LUCENE-2857: merged to bulk postings branch
Added:
lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/store/ByteArrayDataInput.java
- copied, changed from r1057897, lucene/dev/trunk/lucene/src/java/org/apache/lucene/store/ByteArrayDataInput.java
Modified:
lucene/dev/branches/bulkpostings/ (props changed)
lucene/dev/branches/bulkpostings/lucene/ (props changed)
lucene/dev/branches/bulkpostings/lucene/contrib/instantiated/src/test/org/apache/lucene/store/instantiated/TestIndicesEquals.java
lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/BulkPostingsEnum.java
lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/codecs/PostingsConsumer.java
lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/codecs/PrefixCodedTermsReader.java
lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/codecs/TermState.java
lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/codecs/pulsing/PulsingPostingsReaderImpl.java
lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/codecs/pulsing/PulsingPostingsWriterImpl.java
lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/codecs/sep/SepPostingsReaderImpl.java
lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/codecs/standard/StandardPostingsReader.java
lucene/dev/branches/bulkpostings/lucene/src/test/org/apache/lucene/index/TestIndexReader.java
lucene/dev/branches/bulkpostings/lucene/src/test/org/apache/lucene/index/TestNRTThreads.java
lucene/dev/branches/bulkpostings/lucene/src/test/org/apache/lucene/index/codecs/mockrandom/MockRandomCodec.java
lucene/dev/branches/bulkpostings/solr/ (props changed)
Modified: lucene/dev/branches/bulkpostings/lucene/contrib/instantiated/src/test/org/apache/lucene/store/instantiated/TestIndicesEquals.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/bulkpostings/lucene/contrib/instantiated/src/test/org/apache/lucene/store/instantiated/TestIndicesEquals.java?rev=1058973&r1=1058972&r2=1058973&view=diff
==============================================================================
--- lucene/dev/branches/bulkpostings/lucene/contrib/instantiated/src/test/org/apache/lucene/store/instantiated/TestIndicesEquals.java (original)
+++ lucene/dev/branches/bulkpostings/lucene/contrib/instantiated/src/test/org/apache/lucene/store/instantiated/TestIndicesEquals.java Fri Jan 14 12:03:22 2011
@@ -91,6 +91,10 @@ public class TestIndicesEquals extends L
// create dir data
IndexWriter indexWriter = new IndexWriter(dir, newIndexWriterConfig(
TEST_VERSION_CURRENT, new MockAnalyzer()));
+ indexWriter.setInfoStream(VERBOSE ? System.out : null);
+ if (VERBOSE) {
+ System.out.println("TEST: make test index");
+ }
for (int i = 0; i < 500; i++) {
Document document = new Document();
assembleDocument(document, i);
@@ -320,6 +324,9 @@ public class TestIndicesEquals extends L
protected void testEquals(Directory aprioriIndex, InstantiatedIndex testIndex) throws Exception {
+ if (VERBOSE) {
+ System.out.println("TEST: testEquals");
+ }
testTermDocsSomeMore(aprioriIndex, testIndex);
IndexReader aprioriReader = IndexReader.open(aprioriIndex, false);
@@ -401,6 +408,9 @@ public class TestIndicesEquals extends L
String aprioriField;
while((aprioriField = aprioriFieldsEnum.next()) != null) {
String testField = testFieldsEnum.next();
+ if (VERBOSE) {
+ System.out.println("TEST: verify field=" + testField);
+ }
assertEquals(aprioriField, testField);
TermsEnum aprioriTermEnum = aprioriFieldsEnum.terms();
@@ -409,6 +419,9 @@ public class TestIndicesEquals extends L
BytesRef aprioriText;
while((aprioriText = aprioriTermEnum.next()) != null) {
assertEquals(aprioriText, testTermEnum.next());
+ if (VERBOSE) {
+ System.out.println("TEST: verify term=" + aprioriText.utf8ToString());
+ }
assertTrue(aprioriTermEnum.docFreq() == testTermEnum.docFreq());
@@ -434,6 +447,10 @@ public class TestIndicesEquals extends L
assertEquals(DocsEnum.NO_MORE_DOCS, testTermDocs.nextDoc());
break;
}
+ if (VERBOSE) {
+ System.out.println("TEST: verify doc=" + aprioriTermDocs.docID());
+ }
+
assertTrue(testTermDocs.nextDoc() != DocsEnum.NO_MORE_DOCS);
assertEquals(aprioriTermDocs.docID(), testTermDocs.docID());
@@ -445,12 +462,19 @@ public class TestIndicesEquals extends L
DocsAndPositionsEnum aprioriTermPositions = aprioriTermEnum.docsAndPositions(MultiFields.getDeletedDocs(aprioriReader), null);
DocsAndPositionsEnum testTermPositions = testTermEnum.docsAndPositions(MultiFields.getDeletedDocs(testReader), null);
+ if (VERBOSE) {
+ System.out.println("TEST: enum1=" + aprioriTermPositions + " enum2=" + testTermPositions);
+ }
if (aprioriTermPositions != null) {
for (int docIndex = 0; docIndex < aprioriReader.maxDoc(); docIndex++) {
boolean hasNext = aprioriTermPositions.nextDoc() != DocsEnum.NO_MORE_DOCS;
if (hasNext) {
assertTrue(testTermPositions.nextDoc() != DocsEnum.NO_MORE_DOCS);
+
+ if (VERBOSE) {
+ System.out.println("TEST: verify doc=" + aprioriTermPositions.docID());
+ }
assertEquals(aprioriTermPositions.freq(), testTermPositions.freq());
@@ -458,6 +482,10 @@ public class TestIndicesEquals extends L
int aprioriPos = aprioriTermPositions.nextPosition();
int testPos = testTermPositions.nextPosition();
+ if (VERBOSE) {
+ System.out.println("TEST: verify pos=" + aprioriPos);
+ }
+
assertEquals(aprioriPos, testPos);
assertEquals(aprioriTermPositions.hasPayload(), testTermPositions.hasPayload());
Modified: lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/BulkPostingsEnum.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/BulkPostingsEnum.java?rev=1058973&r1=1058972&r2=1058973&view=diff
==============================================================================
--- lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/BulkPostingsEnum.java (original)
+++ lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/BulkPostingsEnum.java Fri Jan 14 12:03:22 2011
@@ -65,6 +65,8 @@ public abstract class BulkPostingsEnum {
* always 0. */
public abstract int fill() throws IOException;
+ // nocommit -- need a skip(int count)
+
/** End index plus 1 of valid data in the buffer. */
public abstract int end();
@@ -102,5 +104,5 @@ public abstract class BulkPostingsEnum {
* skipping is not possible, ie you should just scan
* yourself). The returned JumpResult will only be valid until
* the next call to jump. */
- abstract public JumpResult jump(int target, int curCount) throws IOException;
+ abstract public JumpResult jump(int targetDocID, int curCount) throws IOException;
}
Modified: lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/codecs/PostingsConsumer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/codecs/PostingsConsumer.java?rev=1058973&r1=1058972&r2=1058973&view=diff
==============================================================================
--- lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/codecs/PostingsConsumer.java (original)
+++ lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/codecs/PostingsConsumer.java Fri Jan 14 12:03:22 2011
@@ -30,9 +30,9 @@ import org.apache.lucene.util.BytesRef;
public abstract class PostingsConsumer {
- /** Adds a new doc in this term. Return null if this
- * consumer doesn't need to see the positions for this
- * doc. */
+ /** Adds a new doc in this term. If this field omits term
+ * freqs & positions then termDocFreq should be ignored,
+ * and, finishDoc will not be called. */
public abstract void startDoc(int docID, int termDocFreq) throws IOException;
public static class PostingsMergeState {
@@ -49,7 +49,8 @@ public abstract class PostingsConsumer {
public abstract void addPosition(int position, BytesRef payload) throws IOException;
/** Called when we are done adding positions & payloads
- * for each doc */
+ * for each doc. Not called when the field omits term
+ * freq and positions. */
public abstract void finishDoc() throws IOException;
/** Default merge impl: append documents, mapping around
Modified: lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/codecs/PrefixCodedTermsReader.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/codecs/PrefixCodedTermsReader.java?rev=1058973&r1=1058972&r2=1058973&view=diff
==============================================================================
--- lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/codecs/PrefixCodedTermsReader.java (original)
+++ lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/codecs/PrefixCodedTermsReader.java Fri Jan 14 12:03:22 2011
@@ -357,7 +357,7 @@ public class PrefixCodedTermsReader exte
fieldTerm.term = term;
cachedState = termsCache.get(fieldTerm);
if (cachedState != null) {
- state.copy(cachedState);
+ state.copyFrom(cachedState);
seekPending = true;
positioned = false;
bytesReader.term.copy(term);
Modified: lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/codecs/TermState.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/codecs/TermState.java?rev=1058973&r1=1058972&r2=1058973&view=diff
==============================================================================
--- lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/codecs/TermState.java (original)
+++ lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/codecs/TermState.java Fri Jan 14 12:03:22 2011
@@ -33,7 +33,7 @@ public class TermState implements Clonea
public long filePointer; // fp into the terms dict primary file (_X.tis)
public int docFreq; // how many docs have this term
- public void copy(TermState other) {
+ public void copyFrom(TermState other) {
ord = other.ord;
filePointer = other.filePointer;
docFreq = other.docFreq;
Modified: lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/codecs/pulsing/PulsingPostingsReaderImpl.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/codecs/pulsing/PulsingPostingsReaderImpl.java?rev=1058973&r1=1058972&r2=1058973&view=diff
==============================================================================
--- lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/codecs/pulsing/PulsingPostingsReaderImpl.java (original)
+++ lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/codecs/pulsing/PulsingPostingsReaderImpl.java Fri Jan 14 12:03:22 2011
@@ -19,14 +19,13 @@ package org.apache.lucene.index.codecs.p
import java.io.IOException;
+import org.apache.lucene.index.DocsAndPositionsEnum;
import org.apache.lucene.index.DocsEnum;
import org.apache.lucene.index.BulkPostingsEnum;
import org.apache.lucene.index.FieldInfo;
-import org.apache.lucene.index.DocsAndPositionsEnum;
-import org.apache.lucene.index.codecs.TermState;
import org.apache.lucene.index.codecs.PostingsReaderBase;
-import org.apache.lucene.index.codecs.pulsing.PulsingPostingsWriterImpl.Document;
-import org.apache.lucene.index.codecs.pulsing.PulsingPostingsWriterImpl.Position;
+import org.apache.lucene.index.codecs.TermState;
+import org.apache.lucene.store.ByteArrayDataInput;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
@@ -45,7 +44,6 @@ public class PulsingPostingsReaderImpl e
// Fallback reader for non-pulsed terms:
final PostingsReaderBase wrappedPostingsReader;
- int maxPulsingDocFreq;
public PulsingPostingsReaderImpl(PostingsReaderBase wrappedPostingsReader) throws IOException {
this.wrappedPostingsReader = wrappedPostingsReader;
@@ -55,38 +53,50 @@ public class PulsingPostingsReaderImpl e
public void init(IndexInput termsIn) throws IOException {
CodecUtil.checkHeader(termsIn, PulsingPostingsWriterImpl.CODEC,
PulsingPostingsWriterImpl.VERSION_START, PulsingPostingsWriterImpl.VERSION_START);
- maxPulsingDocFreq = termsIn.readVInt();
wrappedPostingsReader.init(termsIn);
}
private static class PulsingTermState extends TermState {
- Document docs[];
+ private byte[] postings;
+ private int postingsSize; // -1 if this term was not inlined
private TermState wrappedTermState;
private boolean pendingIndexTerm;
+ @Override
public Object clone() {
PulsingTermState clone;
clone = (PulsingTermState) super.clone();
- clone.docs = docs.clone();
- for(int i=0;i<clone.docs.length;i++) {
- final Document doc = clone.docs[i];
- if (doc != null) {
- clone.docs[i] = (Document) doc.clone();
- }
+ if (postingsSize != -1) {
+ clone.postings = new byte[postingsSize];
+ System.arraycopy(postings, 0, clone.postings, 0, postingsSize);
+ } else {
+ assert wrappedTermState != null;
+ clone.wrappedTermState = (TermState) wrappedTermState.clone();
}
- clone.wrappedTermState = (TermState) wrappedTermState.clone();
return clone;
}
- public void copy(TermState _other) {
- super.copy(_other);
+ @Override
+ public void copyFrom(TermState _other) {
+ super.copyFrom(_other);
PulsingTermState other = (PulsingTermState) _other;
- pendingIndexTerm = other.pendingIndexTerm;
- wrappedTermState.copy(other.wrappedTermState);
- for(int i=0;i<docs.length;i++) {
- if (other.docs[i] != null) {
- docs[i] = (Document) other.docs[i].clone();
+ postingsSize = other.postingsSize;
+ if (other.postingsSize != -1) {
+ if (postings == null || postings.length < other.postingsSize) {
+ postings = new byte[ArrayUtil.oversize(other.postingsSize, 1)];
}
+ System.arraycopy(other.postings, 0, postings, 0, other.postingsSize);
+ } else {
+ wrappedTermState.copyFrom(other.wrappedTermState);
+ }
+ }
+
+ @Override
+ public String toString() {
+ if (postingsSize == -1) {
+ return "PulsingTermState: not inlined";
+ } else {
+ return "PulsingTermState: inlined size=" + postingsSize;
}
}
}
@@ -95,7 +105,6 @@ public class PulsingPostingsReaderImpl e
public TermState newTermState() throws IOException {
PulsingTermState state = new PulsingTermState();
state.wrappedTermState = wrappedPostingsReader.newTermState();
- state.docs = new Document[maxPulsingDocFreq];
return state;
}
@@ -106,75 +115,19 @@ public class PulsingPostingsReaderImpl e
termState.pendingIndexTerm |= isIndexTerm;
- if (termState.docFreq <= maxPulsingDocFreq) {
+ // TODO: wasteful to use whole byte for this (need just a 1 bit);
+ if (termsIn.readByte() == 1) {
- // Inlined into terms dict -- read everything in
-
- // TODO: maybe only read everything in lazily? But
- // then we'd need to store length so we could seek
- // over it when docs/pos enum was not requested
-
- // TODO: it'd be better to share this encoding logic
- // in some inner codec that knows how to write a
- // single doc / single position, etc. This way if a
- // given codec wants to store other interesting
- // stuff, it could use this pulsing codec to do so
-
- int docID = 0;
- for(int i=0;i<termState.docFreq;i++) {
- Document doc = termState.docs[i];
- if (doc == null) {
- doc = termState.docs[i] = new Document();
- }
- final int code = termsIn.readVInt();
- if (fieldInfo.omitTermFreqAndPositions) {
- docID += code;
- doc.numPositions = 1;
- } else {
- docID += code>>>1;
- if ((code & 1) != 0) {
- doc.numPositions = 1;
- } else {
- doc.numPositions = termsIn.readVInt();
- }
-
- if (doc.numPositions > doc.positions.length) {
- doc.reallocPositions(doc.numPositions);
- }
-
- int position = 0;
- int payloadLength = -1;
-
- for(int j=0;j<doc.numPositions;j++) {
- final Position pos = doc.positions[j];
- final int code2 = termsIn.readVInt();
- if (fieldInfo.storePayloads) {
- position += code2 >>> 1;
- if ((code2 & 1) != 0) {
- payloadLength = termsIn.readVInt();
- }
-
- if (payloadLength > 0) {
- if (pos.payload == null) {
- pos.payload = new BytesRef();
- pos.payload.bytes = new byte[payloadLength];
- } else if (payloadLength > pos.payload.bytes.length) {
- pos.payload.grow(payloadLength);
- }
- pos.payload.length = payloadLength;
- termsIn.readBytes(pos.payload.bytes, 0, payloadLength);
- } else if (pos.payload != null) {
- pos.payload.length = 0;
- }
- } else {
- position += code2;
- }
- pos.pos = position;
- }
- }
- doc.docID = docID;
+ // Inlined into terms dict -- just read the byte[] blob in,
+ // but don't decode it now (we only decode when a DocsEnum
+ // or D&PEnum is pulled):
+ termState.postingsSize = termsIn.readVInt();
+ if (termState.postings == null || termState.postings.length < termState.postingsSize) {
+ termState.postings = new byte[ArrayUtil.oversize(termState.postingsSize, 1)];
}
+ termsIn.readBytes(termState.postings, 0, termState.postingsSize);
} else {
+ termState.postingsSize = -1;
termState.wrappedTermState.docFreq = termState.docFreq;
wrappedPostingsReader.readTerm(termsIn, fieldInfo, termState.wrappedTermState, termState.pendingIndexTerm);
termState.pendingIndexTerm = false;
@@ -186,14 +139,19 @@ public class PulsingPostingsReaderImpl e
@Override
public DocsEnum docs(FieldInfo field, TermState _termState, Bits skipDocs, DocsEnum reuse) throws IOException {
PulsingTermState termState = (PulsingTermState) _termState;
- if (termState.docFreq <= maxPulsingDocFreq) {
+ if (termState.postingsSize != -1) {
+ PulsingDocsEnum postings;
if (reuse instanceof PulsingDocsEnum) {
- return ((PulsingDocsEnum) reuse).reset(skipDocs, termState);
+ postings = (PulsingDocsEnum) reuse;
+ if (!postings.canReuse(field)) {
+ postings = new PulsingDocsEnum(field);
+ }
} else {
- PulsingDocsEnum docsEnum = new PulsingDocsEnum();
- return docsEnum.reset(skipDocs, termState);
+ postings = new PulsingDocsEnum(field);
}
+ return postings.reset(skipDocs, termState);
} else {
+ // TODO: not great that we lose reuse of PulsingDocsEnum in this case:
if (reuse instanceof PulsingDocsEnum) {
return wrappedPostingsReader.docs(field, termState.wrappedTermState, skipDocs, null);
} else {
@@ -207,33 +165,48 @@ public class PulsingPostingsReaderImpl e
@Override
public BulkPostingsEnum bulkPostings(FieldInfo field, TermState _termState, BulkPostingsEnum reuse, boolean doFreqs, boolean doPositions) throws IOException {
PulsingTermState termState = (PulsingTermState) _termState;
- if (termState.docFreq <= maxPulsingDocFreq) {
- if (reuse instanceof PulsingBulkPostingsEnum && ((PulsingBulkPostingsEnum) reuse).docDeltas.length == maxPulsingDocFreq) {
- return ((PulsingBulkPostingsEnum) reuse).reset(termState, doFreqs, doPositions);
- } else {
- PulsingBulkPostingsEnum postingsEnum = new PulsingBulkPostingsEnum(maxPulsingDocFreq);
- return postingsEnum.reset(termState, doFreqs, doPositions);
- }
- } else {
+ // nocommit -- other codecs should check too
+ if (doPositions && !doFreqs) {
+ throw new IllegalArgumentException("doFreqs must be true when doPositions is true");
+ }
+ if (termState.postingsSize != -1) {
+ PulsingBulkPostingsEnum postings;
if (reuse instanceof PulsingBulkPostingsEnum) {
- return wrappedPostingsReader.bulkPostings(field, termState.wrappedTermState, null, doFreqs, doPositions);
+ postings = (PulsingBulkPostingsEnum) reuse;
+ if (!postings.canReuse(field, doFreqs, doPositions)) {
+ postings = new PulsingBulkPostingsEnum(field, doFreqs, doPositions);
+ }
} else {
- return wrappedPostingsReader.bulkPostings(field, termState.wrappedTermState, reuse, doFreqs, doPositions);
+ postings = new PulsingBulkPostingsEnum(field, doFreqs, doPositions);
}
+ return postings.reset(termState);
+ } else {
+ return wrappedPostingsReader.bulkPostings(field, termState.wrappedTermState, reuse, doFreqs, doPositions);
}
}
// TODO: -- not great that we can't always reuse
@Override
public DocsAndPositionsEnum docsAndPositions(FieldInfo field, TermState _termState, Bits skipDocs, DocsAndPositionsEnum reuse) throws IOException {
- PulsingTermState termState = (PulsingTermState) _termState;
- if (termState.docFreq <= maxPulsingDocFreq) {
+ if (field.omitTermFreqAndPositions) {
+ return null;
+ }
+ //System.out.println("D&P: field=" + field.name);
+
+ final PulsingTermState termState = (PulsingTermState) _termState;
+
+ if (termState.postingsSize != -1) {
+ PulsingDocsAndPositionsEnum postings;
if (reuse instanceof PulsingDocsAndPositionsEnum) {
- return ((PulsingDocsAndPositionsEnum) reuse).reset(skipDocs, termState);
+ postings = (PulsingDocsAndPositionsEnum) reuse;
+ if (!postings.canReuse(field)) {
+ postings = new PulsingDocsAndPositionsEnum(field);
+ }
} else {
- PulsingDocsAndPositionsEnum postingsEnum = new PulsingDocsAndPositionsEnum();
- return postingsEnum.reset(skipDocs, termState);
+ postings = new PulsingDocsAndPositionsEnum(field);
}
+
+ return postings.reset(skipDocs, termState);
} else {
if (reuse instanceof PulsingDocsAndPositionsEnum) {
return wrappedPostingsReader.docsAndPositions(field, termState.wrappedTermState, skipDocs, null);
@@ -243,43 +216,90 @@ public class PulsingPostingsReaderImpl e
}
}
- static class PulsingDocsEnum extends DocsEnum {
- private int nextRead;
+ private static class PulsingDocsEnum extends DocsEnum {
+ private final ByteArrayDataInput postings = new ByteArrayDataInput(null);
+ private final boolean omitTF;
+ private final boolean storePayloads;
private Bits skipDocs;
- private Document doc;
- private PulsingTermState state;
+ private int docID;
+ private int freq;
- PulsingDocsEnum reset(Bits skipDocs, PulsingTermState termState) {
- // TODO: -- not great we have to clone here --
- // merging is wasteful; TermRangeQuery too
- state = (PulsingTermState) termState.clone();
+ public PulsingDocsEnum(FieldInfo fieldInfo) {
+ omitTF = fieldInfo.omitTermFreqAndPositions;
+ storePayloads = fieldInfo.storePayloads;
+ }
+
+ public PulsingDocsEnum reset(Bits skipDocs, PulsingTermState termState) {
+ //System.out.println("PR docsEnum termState=" + termState + " docFreq=" + termState.docFreq);
+ assert termState.postingsSize != -1;
+ final byte[] bytes = new byte[termState.postingsSize];
+ System.arraycopy(termState.postings, 0, bytes, 0, termState.postingsSize);
+ postings.reset(bytes);
+ docID = 0;
+ freq = 1;
this.skipDocs = skipDocs;
- nextRead = 0;
return this;
}
+ boolean canReuse(FieldInfo fieldInfo) {
+ return omitTF == fieldInfo.omitTermFreqAndPositions && storePayloads == fieldInfo.storePayloads;
+ }
+
@Override
- public int nextDoc() {
+ public int nextDoc() throws IOException {
+ //System.out.println("PR nextDoc this= "+ this);
while(true) {
- if (nextRead >= state.docFreq) {
+ if (postings.eof()) {
+ //System.out.println("PR END");
return NO_MORE_DOCS;
+ }
+
+ final int code = postings.readVInt();
+ if (omitTF) {
+ docID += code;
} else {
- doc = state.docs[nextRead++];
- if (skipDocs == null || !skipDocs.get(doc.docID)) {
- return doc.docID;
+ docID += code >>> 1; // shift off low bit
+ if ((code & 1) != 0) { // if low bit is set
+ freq = 1; // freq is one
+ } else {
+ freq = postings.readVInt(); // else read freq
}
+
+ // Skip positions
+ if (storePayloads) {
+ int payloadLength = -1;
+ for(int pos=0;pos<freq;pos++) {
+ final int posCode = postings.readVInt();
+ if ((posCode & 1) != 0) {
+ payloadLength = postings.readVInt();
+ }
+ if (payloadLength != 0) {
+ postings.skipBytes(payloadLength);
+ }
+ }
+ } else {
+ for(int pos=0;pos<freq;pos++) {
+ // TODO: skipVInt
+ postings.readVInt();
+ }
+ }
+ }
+
+ if (skipDocs == null || !skipDocs.get(docID)) {
+ //System.out.println(" return docID=" + docID + " freq=" + freq);
+ return docID;
}
}
}
@Override
public int freq() {
- return doc.numPositions;
+ return freq;
}
@Override
public int docID() {
- return doc.docID;
+ return docID;
}
@Override
@@ -293,52 +313,119 @@ public class PulsingPostingsReaderImpl e
}
}
- static class PulsingBulkPostingsEnum extends BulkPostingsEnum {
- private Document doc;
- private PulsingTermState state;
- int numDocs;
- final int[] docDeltas;
- final int[] freqs;
- int[] positionDeltas;
+ private static class PulsingBulkPostingsEnum extends BulkPostingsEnum {
+ private final ByteArrayDataInput postings = new ByteArrayDataInput(null);
+ private final boolean omitTF;
+ private final boolean storePayloads;
+
+ private int[] docDeltas;
+ private int[] freqs;
+ private int[] positionDeltas;
int numPositions;
- private boolean doFreqs;
- private boolean doPositions;
+ int numDocs;
+
+ public PulsingBulkPostingsEnum(FieldInfo field, boolean doFreq, boolean doPositions) {
+ omitTF = field.omitTermFreqAndPositions;
+ storePayloads = field.storePayloads;
+ docDeltas = new int[4];
+ if (doFreq && !omitTF) {
+ freqs = new int[4];
+ if (doPositions) {
+ positionDeltas = new int[4];
+ } else {
+ positionDeltas = null;
+ }
+ } else {
+ freqs = null;
+ positionDeltas = null;
+ }
+ }
- public PulsingBulkPostingsEnum(int maxFreq) {
- docDeltas = new int[maxFreq];
- freqs = new int[maxFreq];
- positionDeltas = new int[maxFreq];
+ public boolean canReuse(FieldInfo field, boolean doFreq, boolean doPositions) {
+ return (freqs != null) == doFreq &&
+ (positionDeltas != null) == doPositions &&
+ this.storePayloads == field.storePayloads &&
+ this.omitTF == field.omitTermFreqAndPositions;
}
- PulsingBulkPostingsEnum reset(PulsingTermState termState, boolean doFreqs, boolean doPositions) {
- numDocs = termState.docFreq;
- this.doFreqs = doFreqs;
- this.doPositions = doPositions;
- assert numDocs <= docDeltas.length;
- int lastDocID = 0;
+ PulsingBulkPostingsEnum reset(PulsingTermState termState) throws IOException {
+ // Fully decode the byte[] blob on reset:
+ assert termState.postingsSize != -1;
+ postings.reset(termState.postings, 0, termState.postingsSize);
+ numDocs = 0;
numPositions = 0;
- for(int i=0;i<numDocs;i++) {
- final int docID = termState.docs[i].docID;
- docDeltas[i] = docID - lastDocID;
- if (doFreqs) {
- freqs[i] = termState.docs[i].numPositions;
- assert freqs[i] > 0;
- if (doPositions) {
- final Position[] positions = termState.docs[i].positions;
- int lastPos = 0;
- for(int posIndex=0;posIndex<freqs[i];posIndex++) {
- if (positionDeltas.length == numPositions) {
- positionDeltas = ArrayUtil.grow(positionDeltas, 1+numPositions);
+ int payloadLength = 0;
+ while (!postings.eof()) {
+
+ if (numDocs == docDeltas.length) {
+ docDeltas = ArrayUtil.grow(docDeltas);
+ if (freqs != null) {
+ freqs = ArrayUtil.grow(freqs);
+ }
+ }
+
+ final int code = postings.readVInt();
+ if (omitTF) {
+ docDeltas[numDocs] = code;
+ } else {
+ docDeltas[numDocs] = code >>> 1;
+ final int freq;
+ if ((code & 1) != 0) {
+ freq = 1;
+ } else {
+ freq = postings.readVInt();
+ }
+ if (freqs != null) {
+ freqs[numDocs] = freq;
+ }
+
+ if (positionDeltas != null) {
+
+ final int limit = numPositions + freq;
+ if (positionDeltas.length < limit) {
+ positionDeltas = ArrayUtil.grow(positionDeltas, limit);
+ }
+
+ if (!storePayloads) {
+ while(numPositions < limit) {
+ positionDeltas[numPositions++] = postings.readVInt();
+ }
+ } else {
+ while(numPositions < limit) {
+ final int posCode = postings.readVInt();
+ positionDeltas[numPositions++] = posCode >> 1;
+ if ((posCode & 1) != 0) {
+ payloadLength = postings.readVInt();
+ }
+ if (payloadLength != 0) {
+ postings.skipBytes(payloadLength);
+ }
+ }
+ }
+ } else {
+ // Skip positions:
+ if (!storePayloads) {
+ for(int skipCount=0;skipCount<freq;skipCount++) {
+ postings.readVInt();
+ }
+ } else {
+ for(int skipCount=0;skipCount<freq;skipCount++) {
+ final int posCode = postings.readVInt();
+ if ((posCode & 1) != 0) {
+ payloadLength = postings.readVInt();
+ }
+ if (payloadLength != 0) {
+ postings.skipBytes(payloadLength);
+ }
}
- final int pos = positions[posIndex].pos;
- positionDeltas[numPositions++] = pos - lastPos;
- lastPos = pos;
}
}
}
- lastDocID = docID;
+ numDocs++;
}
+ assert numDocs == termState.docFreq: "numDocs=" + numDocs + " vs " + termState.docFreq;
+
return this;
}
@@ -403,7 +490,7 @@ public class PulsingPostingsReaderImpl e
@Override
public BulkPostingsEnum.BlockReader getFreqsReader() {
- return doFreqs ? freqsReader: null;
+ return freqs != null ? freqsReader: null;
}
private final BulkPostingsEnum.BlockReader positionDeltasReader = new BulkPostingsEnum.BlockReader() {
@@ -435,64 +522,90 @@ public class PulsingPostingsReaderImpl e
@Override
public BulkPostingsEnum.BlockReader getPositionDeltasReader() {
- return doPositions ? positionDeltasReader : null;
+ return positionDeltas != null ? positionDeltasReader : null;
}
@Override
public JumpResult jump(int target, int curCount) throws IOException {
// TODO: advance is likely unhelpful since apps
- // "usually" set a lowish docFreq cutoff
+ // "usually" set a lowish totalTermFreq cutoff
return null;
}
}
- static class PulsingDocsAndPositionsEnum extends DocsAndPositionsEnum {
- private int nextRead;
- private int nextPosRead;
+ private static class PulsingDocsAndPositionsEnum extends DocsAndPositionsEnum {
+ private final ByteArrayDataInput postings = new ByteArrayDataInput(null);
+ private final boolean storePayloads;
+
private Bits skipDocs;
- private Document doc;
- private Position pos;
- private PulsingTermState state;
+ private int docID;
+ private int freq;
+ private int posPending;
+ private int position;
+ private int payloadLength;
+ private BytesRef payload;
- // Only here to emulate limitation of standard codec,
- // which only allows retrieving payload more than once
private boolean payloadRetrieved;
- public void close() {}
+ public PulsingDocsAndPositionsEnum(FieldInfo fieldInfo) {
+ storePayloads = fieldInfo.storePayloads;
+ }
+
+ boolean canReuse(FieldInfo fieldInfo) {
+ return storePayloads == fieldInfo.storePayloads;
+ }
- PulsingDocsAndPositionsEnum reset(Bits skipDocs, PulsingTermState termState) {
- // TODO: -- not great we have to clone here --
- // merging is wasteful; TermRangeQuery too
- state = (PulsingTermState) termState.clone();
+ public PulsingDocsAndPositionsEnum reset(Bits skipDocs, PulsingTermState termState) {
+ assert termState.postingsSize != -1;
+ final byte[] bytes = new byte[termState.postingsSize];
+ System.arraycopy(termState.postings, 0, bytes, 0, termState.postingsSize);
+ postings.reset(bytes);
this.skipDocs = skipDocs;
- nextRead = 0;
- nextPosRead = 0;
+ payloadLength = 0;
+ docID = 0;
+ //System.out.println("PR d&p reset storesPayloads=" + storePayloads + " bytes=" + bytes.length + " this=" + this);
return this;
}
@Override
- public int nextDoc() {
+ public int nextDoc() throws IOException {
+ //System.out.println("PR d&p nextDoc this=" + this);
+
while(true) {
- if (nextRead >= state.docFreq) {
+ //System.out.println(" cycle skip posPending=" + posPending);
+
+ skipPositions();
+
+ if (postings.eof()) {
+ //System.out.println("PR END");
return NO_MORE_DOCS;
+ }
+
+ final int code = postings.readVInt();
+ docID += code >>> 1; // shift off low bit
+ if ((code & 1) != 0) { // if low bit is set
+ freq = 1; // freq is one
} else {
- doc = state.docs[nextRead++];
- if (skipDocs == null || !skipDocs.get(doc.docID)) {
- nextPosRead = 0;
- return doc.docID;
- }
+ freq = postings.readVInt(); // else read freq
+ }
+ posPending = freq;
+
+ if (skipDocs == null || !skipDocs.get(docID)) {
+ //System.out.println(" return docID=" + docID + " freq=" + freq);
+ position = 0;
+ return docID;
}
}
}
@Override
public int freq() {
- return doc.numPositions;
+ return freq;
}
@Override
public int docID() {
- return doc.docID;
+ return docID;
}
@Override
@@ -507,22 +620,68 @@ public class PulsingPostingsReaderImpl e
}
@Override
- public int nextPosition() {
- assert nextPosRead < doc.numPositions;
- pos = doc.positions[nextPosRead++];
- payloadRetrieved = false;
- return pos.pos;
+ public int nextPosition() throws IOException {
+ //System.out.println("PR d&p nextPosition posPending=" + posPending + " vs freq=" + freq);
+
+ assert posPending > 0;
+ posPending--;
+
+ if (storePayloads) {
+ if (!payloadRetrieved) {
+ //System.out.println("PR skip payload=" + payloadLength);
+ postings.skipBytes(payloadLength);
+ }
+ final int code = postings.readVInt();
+ //System.out.println("PR code=" + code);
+ if ((code & 1) != 0) {
+ payloadLength = postings.readVInt();
+ //System.out.println("PR new payload len=" + payloadLength);
+ }
+ position += code >> 1;
+ payloadRetrieved = false;
+ } else {
+ position += postings.readVInt();
+ }
+
+ //System.out.println("PR d&p nextPos return pos=" + position + " this=" + this);
+ return position;
+ }
+
+ private void skipPositions() throws IOException {
+ while(posPending != 0) {
+ nextPosition();
+ }
+ if (storePayloads && !payloadRetrieved) {
+ //System.out.println(" skip payload len=" + payloadLength);
+ postings.skipBytes(payloadLength);
+ payloadRetrieved = true;
+ }
}
@Override
public boolean hasPayload() {
- return !payloadRetrieved && pos.payload != null && pos.payload.length > 0;
+ return storePayloads && !payloadRetrieved && payloadLength > 0;
}
@Override
- public BytesRef getPayload() {
+ public BytesRef getPayload() throws IOException {
+ //System.out.println("PR getPayload payloadLength=" + payloadLength + " this=" + this);
+ if (payloadRetrieved) {
+ throw new IOException("Either no payload exists at this term position or an attempt was made to load it more than once.");
+ }
payloadRetrieved = true;
- return pos.payload;
+ if (payloadLength > 0) {
+ if (payload == null) {
+ payload = new BytesRef(payloadLength);
+ } else {
+ payload.grow(payloadLength);
+ }
+ postings.readBytes(payload.bytes, 0, payloadLength);
+ payload.length = payloadLength;
+ return payload;
+ } else {
+ return null;
+ }
}
}
Modified: lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/codecs/pulsing/PulsingPostingsWriterImpl.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/codecs/pulsing/PulsingPostingsWriterImpl.java?rev=1058973&r1=1058972&r2=1058973&view=diff
==============================================================================
--- lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/codecs/pulsing/PulsingPostingsWriterImpl.java (original)
+++ lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/codecs/pulsing/PulsingPostingsWriterImpl.java Fri Jan 14 12:03:22 2011
@@ -20,12 +20,11 @@ package org.apache.lucene.index.codecs.p
import java.io.IOException;
import org.apache.lucene.index.FieldInfo;
-import org.apache.lucene.util.CodecUtil;
import org.apache.lucene.index.codecs.PostingsWriterBase;
import org.apache.lucene.store.IndexOutput;
-import org.apache.lucene.util.ArrayUtil;
+import org.apache.lucene.store.RAMOutputStream;
import org.apache.lucene.util.BytesRef;
-import org.apache.lucene.util.RamUsageEstimator;
+import org.apache.lucene.util.CodecUtil;
// TODO: we now pulse entirely according to docFreq of the
// term; it might be better to eg pulse by "net bytes used"
@@ -44,69 +43,21 @@ public final class PulsingPostingsWriter
final static int VERSION_CURRENT = VERSION_START;
- IndexOutput termsOut;
-
- boolean omitTF;
- boolean storePayloads;
-
- // Starts a new term
- FieldInfo fieldInfo;
-
- /** @lucene.experimental */
- public static class Document {
- int docID;
- int termDocFreq;
- int numPositions;
- Position[] positions;
- Document() {
- positions = new Position[1];
- positions[0] = new Position();
- }
-
- @Override
- public Object clone() {
- Document doc = new Document();
- doc.docID = docID;
- // nocommit -- aren't termDocFreq and numPositions the
- // same thing???
- doc.termDocFreq = termDocFreq;
- doc.numPositions = numPositions;
- doc.positions = new Position[positions.length];
- for(int i = 0; i < positions.length; i++) {
- doc.positions[i] = (Position) positions[i].clone();
- }
-
- return doc;
- }
+ private IndexOutput termsOut;
- void reallocPositions(int minSize) {
- final Position[] newArray = new Position[ArrayUtil.oversize(minSize, RamUsageEstimator.NUM_BYTES_OBJECT_REF)];
- System.arraycopy(positions, 0, newArray, 0, positions.length);
- for(int i=positions.length;i<newArray.length;i++) {
- newArray[i] = new Position();
- }
- positions = newArray;
- }
- }
+ private boolean omitTF;
+ private boolean storePayloads;
- final Document[] pendingDocs;
- int pendingDocCount = 0;
- Document currentDoc;
- boolean pulsed; // false if we've seen > maxPulsingDocFreq docs
+ // one entry per position
+ private final Position[] pending;
+ private int pendingCount = 0; // -1 once we've hit too many positions
+ private Position currentDoc; // first Position entry of current doc
- static class Position {
+ private static final class Position {
BytesRef payload;
+ int termFreq; // only incremented on first position for a given doc
int pos;
-
- @Override
- public Object clone() {
- Position position = new Position();
- position.pos = pos;
- if (payload != null) {
- position.payload = new BytesRef(payload);
- }
- return position;
- }
+ int docID;
}
// TODO: -- lazy init this? ie, if every single term
@@ -114,18 +65,19 @@ public final class PulsingPostingsWriter
// Fallback writer for non-pulsed terms:
final PostingsWriterBase wrappedPostingsWriter;
- /** If docFreq <= maxPulsingDocFreq, its postings are
+ /** If the total number of positions (summed across all docs
+ * for this term) is <= maxPositions, then the postings are
* inlined into terms dict */
- public PulsingPostingsWriterImpl(int maxPulsingDocFreq, PostingsWriterBase wrappedPostingsWriter) throws IOException {
+ public PulsingPostingsWriterImpl(int maxPositions, PostingsWriterBase wrappedPostingsWriter) throws IOException {
super();
- pendingDocs = new Document[maxPulsingDocFreq];
- for(int i=0;i<maxPulsingDocFreq;i++) {
- pendingDocs[i] = new Document();
+ pending = new Position[maxPositions];
+ for(int i=0;i<maxPositions;i++) {
+ pending[i] = new Position();
}
// We simply wrap another postings writer, but only call
- // on it when doc freq is higher than our cutoff
+ // on it when tot positions is >= the cutoff:
this.wrappedPostingsWriter = wrappedPostingsWriter;
}
@@ -133,14 +85,13 @@ public final class PulsingPostingsWriter
public void start(IndexOutput termsOut) throws IOException {
this.termsOut = termsOut;
CodecUtil.writeHeader(termsOut, CODEC, VERSION_CURRENT);
- termsOut.writeVInt(pendingDocs.length);
wrappedPostingsWriter.start(termsOut);
}
@Override
public void startTerm() {
- assert pendingDocCount == 0;
- pulsed = false;
+ //System.out.println("PW startTerm");
+ assert pendingCount == 0;
}
// TODO: -- should we NOT reuse across fields? would
@@ -150,7 +101,7 @@ public final class PulsingPostingsWriter
// our parent calls setField whenever the field changes
@Override
public void setField(FieldInfo fieldInfo) {
- this.fieldInfo = fieldInfo;
+ //System.out.println("PW field=" + fieldInfo.name);
omitTF = fieldInfo.omitTermFreqAndPositions;
storePayloads = fieldInfo.storePayloads;
wrappedPostingsWriter.setField(fieldInfo);
@@ -158,65 +109,47 @@ public final class PulsingPostingsWriter
@Override
public void startDoc(int docID, int termDocFreq) throws IOException {
-
assert docID >= 0: "got docID=" + docID;
-
- if (!pulsed && pendingDocCount == pendingDocs.length) {
-
- // OK we just crossed the threshold, this term should
- // now be written with our wrapped codec:
- wrappedPostingsWriter.startTerm();
-
- // Flush all buffered docs
- for(int i=0;i<pendingDocCount;i++) {
- final Document doc = pendingDocs[i];
-
- wrappedPostingsWriter.startDoc(doc.docID, doc.termDocFreq);
-
- if (!omitTF) {
- assert doc.termDocFreq == doc.numPositions;
- for(int j=0;j<doc.termDocFreq;j++) {
- final Position pos = doc.positions[j];
- if (pos.payload != null && pos.payload.length > 0) {
- assert storePayloads;
- wrappedPostingsWriter.addPosition(pos.pos, pos.payload);
- } else {
- wrappedPostingsWriter.addPosition(pos.pos, null);
- }
- }
- wrappedPostingsWriter.finishDoc();
- }
- }
+ //System.out.println("PW doc=" + docID);
- pendingDocCount = 0;
-
- pulsed = true;
+ if (pendingCount == pending.length) {
+ push();
+ wrappedPostingsWriter.finishDoc();
}
- if (pulsed) {
+ if (pendingCount != -1) {
+ assert pendingCount < pending.length;
+ currentDoc = pending[pendingCount];
+ currentDoc.docID = docID;
+ if (omitTF) {
+ pendingCount++;
+ } else {
+ currentDoc.termFreq = termDocFreq;
+ }
+ } else {
// We've already seen too many docs for this term --
// just forward to our fallback writer
wrappedPostingsWriter.startDoc(docID, termDocFreq);
- } else {
- currentDoc = pendingDocs[pendingDocCount++];
- currentDoc.docID = docID;
- // TODO: -- need not store in doc? only used for alloc & assert
- currentDoc.termDocFreq = termDocFreq;
- if (termDocFreq > currentDoc.positions.length) {
- currentDoc.reallocPositions(termDocFreq);
- }
- currentDoc.numPositions = 0;
}
}
@Override
public void addPosition(int position, BytesRef payload) throws IOException {
- if (pulsed) {
+
+ //System.out.println("PW pos=" + position + " payload=" + (payload == null ? "null" : payload.length + " bytes"));
+ if (pendingCount == pending.length) {
+ push();
+ }
+
+ if (pendingCount == -1) {
+ // We've already seen too many docs for this term --
+ // just forward to our fallback writer
wrappedPostingsWriter.addPosition(position, payload);
} else {
- // just buffer up
- Position pos = currentDoc.positions[currentDoc.numPositions++];
+ // buffer up
+ final Position pos = pending[pendingCount++];
pos.pos = position;
+ pos.docID = currentDoc.docID;
if (payload != null && payload.length > 0) {
if (pos.payload == null) {
pos.payload = new BytesRef(payload);
@@ -231,86 +164,137 @@ public final class PulsingPostingsWriter
@Override
public void finishDoc() throws IOException {
- assert omitTF || currentDoc.numPositions == currentDoc.termDocFreq;
- if (pulsed) {
+ //System.out.println("PW finishDoc");
+ if (pendingCount == -1) {
wrappedPostingsWriter.finishDoc();
}
}
- boolean pendingIsIndexTerm;
+ private boolean pendingIsIndexTerm;
- int pulsedCount;
- int nonPulsedCount;
+ private final RAMOutputStream buffer = new RAMOutputStream();
/** Called when we are done adding docs to this term */
@Override
public void finishTerm(int docCount, boolean isIndexTerm) throws IOException {
+ //System.out.println("PW finishTerm docCount=" + docCount);
- assert docCount > 0;
+ assert pendingCount > 0 || pendingCount == -1;
pendingIsIndexTerm |= isIndexTerm;
- if (pulsed) {
+ if (pendingCount == -1) {
+ termsOut.writeByte((byte) 0);
wrappedPostingsWriter.finishTerm(docCount, pendingIsIndexTerm);
pendingIsIndexTerm = false;
- pulsedCount++;
} else {
- nonPulsedCount++;
- // OK, there were few enough occurrences for this
+
+ // There were few enough total occurrences for this
// term, so we fully inline our postings data into
// terms dict, now:
- int lastDocID = 0;
- for(int i=0;i<pendingDocCount;i++) {
- final Document doc = pendingDocs[i];
- final int delta = doc.docID - lastDocID;
- lastDocID = doc.docID;
- if (omitTF) {
- termsOut.writeVInt(delta);
- } else {
- assert doc.numPositions == doc.termDocFreq;
- if (doc.numPositions == 1)
- termsOut.writeVInt((delta<<1)|1);
- else {
- termsOut.writeVInt(delta<<1);
- termsOut.writeVInt(doc.numPositions);
+
+ termsOut.writeByte((byte) 1);
+
+ // TODO: it'd be better to share this encoding logic
+ // in some inner codec that knows how to write a
+ // single doc / single position, etc. This way if a
+ // given codec wants to store other interesting
+ // stuff, it could use this pulsing codec to do so
+
+ if (!omitTF) {
+ int lastDocID = 0;
+ int pendingIDX = 0;
+ while(pendingIDX < pendingCount) {
+ final Position doc = pending[pendingIDX];
+
+ final int delta = doc.docID - lastDocID;
+ lastDocID = doc.docID;
+
+ //System.out.println(" write doc=" + doc.docID + " freq=" + doc.termFreq);
+
+ if (doc.termFreq == 1) {
+ buffer.writeVInt((delta<<1)|1);
+ } else {
+ buffer.writeVInt(delta<<1);
+ buffer.writeVInt(doc.termFreq);
}
- // TODO: we could do better in encoding
- // payloadLength, eg, if it's always the same
- // across all terms
- int lastPosition = 0;
+ int lastPos = 0;
int lastPayloadLength = -1;
-
- for(int j=0;j<doc.numPositions;j++) {
- final Position pos = doc.positions[j];
- final int delta2 = pos.pos - lastPosition;
- lastPosition = pos.pos;
+ for(int posIDX=0;posIDX<doc.termFreq;posIDX++) {
+ final Position pos = pending[pendingIDX++];
+ assert pos.docID == doc.docID;
+ final int posDelta = pos.pos - lastPos;
+ lastPos = pos.pos;
+ //System.out.println(" write pos=" + pos.pos);
if (storePayloads) {
final int payloadLength = pos.payload == null ? 0 : pos.payload.length;
if (payloadLength != lastPayloadLength) {
- termsOut.writeVInt((delta2 << 1)|1);
- termsOut.writeVInt(payloadLength);
+ buffer.writeVInt((posDelta << 1)|1);
+ buffer.writeVInt(payloadLength);
lastPayloadLength = payloadLength;
} else {
- termsOut.writeVInt(delta2 << 1);
+ buffer.writeVInt(posDelta << 1);
}
-
if (payloadLength > 0) {
- termsOut.writeBytes(pos.payload.bytes, 0, pos.payload.length);
+ buffer.writeBytes(pos.payload.bytes, 0, pos.payload.length);
}
} else {
- termsOut.writeVInt(delta2);
+ buffer.writeVInt(posDelta);
}
}
}
+ } else {
+ int lastDocID = 0;
+ for(int posIDX=0;posIDX<pendingCount;posIDX++) {
+ final Position doc = pending[posIDX];
+ buffer.writeVInt(doc.docID - lastDocID);
+ lastDocID = doc.docID;
+ }
}
+
+ //System.out.println(" bytes=" + buffer.getFilePointer());
+ termsOut.writeVInt((int) buffer.getFilePointer());
+ buffer.writeTo(termsOut);
+ buffer.reset();
}
- pendingDocCount = 0;
+ pendingCount = 0;
}
@Override
public void close() throws IOException {
wrappedPostingsWriter.close();
}
+
+ // Pushes pending positions to the wrapped codec
+ private void push() throws IOException {
+ //System.out.println("PW now push @ " + pendingCount);
+ assert pendingCount == pending.length;
+
+ wrappedPostingsWriter.startTerm();
+
+ // Flush all buffered docs
+ if (!omitTF) {
+ Position doc = null;
+ for(Position pos : pending) {
+ if (doc == null) {
+ doc = pos;
+ wrappedPostingsWriter.startDoc(doc.docID, doc.termFreq);
+ } else if (doc.docID != pos.docID) {
+ assert pos.docID > doc.docID;
+ wrappedPostingsWriter.finishDoc();
+ doc = pos;
+ wrappedPostingsWriter.startDoc(doc.docID, doc.termFreq);
+ }
+ wrappedPostingsWriter.addPosition(pos.pos, pos.payload);
+ }
+ //wrappedPostingsWriter.finishDoc();
+ } else {
+ for(Position doc : pending) {
+ wrappedPostingsWriter.startDoc(doc.docID, 0);
+ }
+ }
+ pendingCount = -1;
+ }
}
Modified: lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/codecs/sep/SepPostingsReaderImpl.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/codecs/sep/SepPostingsReaderImpl.java?rev=1058973&r1=1058972&r2=1058973&view=diff
==============================================================================
--- lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/codecs/sep/SepPostingsReaderImpl.java (original)
+++ lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/codecs/sep/SepPostingsReaderImpl.java Fri Jan 14 12:03:22 2011
@@ -142,8 +142,8 @@ public class SepPostingsReaderImpl exten
return other;
}
- public void copy(TermState _other) {
- super.copy(_other);
+ public void copyFrom(TermState _other) {
+ super.copyFrom(_other);
SepTermState other = (SepTermState) _other;
docIndex.set(other.docIndex);
}
Modified: lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/codecs/standard/StandardPostingsReader.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/codecs/standard/StandardPostingsReader.java?rev=1058973&r1=1058972&r2=1058973&view=diff
==============================================================================
--- lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/codecs/standard/StandardPostingsReader.java (original)
+++ lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/index/codecs/standard/StandardPostingsReader.java Fri Jan 14 12:03:22 2011
@@ -91,12 +91,12 @@ public class StandardPostingsReader exte
public Object clone() {
DocTermState other = new DocTermState();
- other.copy(this);
+ other.copyFrom(this);
return other;
}
- public void copy(TermState _other) {
- super.copy(_other);
+ public void copyFrom(TermState _other) {
+ super.copyFrom(_other);
DocTermState other = (DocTermState) _other;
freqOffset = other.freqOffset;
proxOffset = other.proxOffset;
@@ -778,6 +778,7 @@ public class StandardPostingsReader exte
if (payloadLength > payload.bytes.length) {
payload.grow(payloadLength);
}
+
proxIn.readBytes(payload.bytes, 0, payloadLength);
payload.length = payloadLength;
payloadPending = false;
@@ -851,7 +852,7 @@ public class StandardPostingsReader exte
storePayloads == fieldInfo.storePayloads;
}
- final void read() throws IOException {
+ void read() throws IOException {
final int left = docFreq - ord;
final int limit = left > BULK_BUFFER_SIZE ? BULK_BUFFER_SIZE : left;
if (freqsReader == null) {
Copied: lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/store/ByteArrayDataInput.java (from r1057897, lucene/dev/trunk/lucene/src/java/org/apache/lucene/store/ByteArrayDataInput.java)
URL: http://svn.apache.org/viewvc/lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/store/ByteArrayDataInput.java?p2=lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/store/ByteArrayDataInput.java&p1=lucene/dev/trunk/lucene/src/java/org/apache/lucene/store/ByteArrayDataInput.java&r1=1057897&r2=1058973&rev=1058973&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/store/ByteArrayDataInput.java (original)
+++ lucene/dev/branches/bulkpostings/lucene/src/java/org/apache/lucene/store/ByteArrayDataInput.java Fri Jan 14 12:03:22 2011
@@ -22,19 +22,28 @@ public final class ByteArrayDataInput ex
private byte[] bytes;
private int pos;
+ private int limit;
// TODO: allow BytesRef (slice) too
public ByteArrayDataInput(byte[] bytes) {
this.bytes = bytes;
+ if (bytes != null) {
+ limit = bytes.length;
+ }
}
public void reset(byte[] bytes) {
+ reset(bytes, 0, bytes.length);
+ }
+
+ public void reset(byte[] bytes, int offset, int limit) {
this.bytes = bytes;
- pos = 0;
+ pos = offset;
+ this.limit = limit;
}
public boolean eof() {
- return pos == bytes.length;
+ return pos == limit;
}
public void skipBytes(int count) {
Modified: lucene/dev/branches/bulkpostings/lucene/src/test/org/apache/lucene/index/TestIndexReader.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/bulkpostings/lucene/src/test/org/apache/lucene/index/TestIndexReader.java?rev=1058973&r1=1058972&r2=1058973&view=diff
==============================================================================
--- lucene/dev/branches/bulkpostings/lucene/src/test/org/apache/lucene/index/TestIndexReader.java (original)
+++ lucene/dev/branches/bulkpostings/lucene/src/test/org/apache/lucene/index/TestIndexReader.java Fri Jan 14 12:03:22 2011
@@ -329,6 +329,7 @@ public class TestIndexReader extends Luc
// add 100 documents with term : aaa
writer = new IndexWriter(dir, newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer()));
+ writer.setInfoStream(VERBOSE ? System.out : null);
for (int i = 0; i < 100; i++) {
addDoc(writer, searchTerm.text());
}
Modified: lucene/dev/branches/bulkpostings/lucene/src/test/org/apache/lucene/index/TestNRTThreads.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/bulkpostings/lucene/src/test/org/apache/lucene/index/TestNRTThreads.java?rev=1058973&r1=1058972&r2=1058973&view=diff
==============================================================================
--- lucene/dev/branches/bulkpostings/lucene/src/test/org/apache/lucene/index/TestNRTThreads.java (original)
+++ lucene/dev/branches/bulkpostings/lucene/src/test/org/apache/lucene/index/TestNRTThreads.java Fri Jan 14 12:03:22 2011
@@ -349,6 +349,7 @@ public class TestNRTThreads extends Luce
assertEquals("index=" + writer.segString() + " addCount=" + addCount + " delCount=" + delCount, addCount.get() - delCount.get(), writer.numDocs());
writer.close(false);
+ _TestUtil.checkIndex(dir);
dir.close();
_TestUtil.rmDir(tempDir);
docs.close();
Modified: lucene/dev/branches/bulkpostings/lucene/src/test/org/apache/lucene/index/codecs/mockrandom/MockRandomCodec.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/bulkpostings/lucene/src/test/org/apache/lucene/index/codecs/mockrandom/MockRandomCodec.java?rev=1058973&r1=1058972&r2=1058973&view=diff
==============================================================================
--- lucene/dev/branches/bulkpostings/lucene/src/test/org/apache/lucene/index/codecs/mockrandom/MockRandomCodec.java (original)
+++ lucene/dev/branches/bulkpostings/lucene/src/test/org/apache/lucene/index/codecs/mockrandom/MockRandomCodec.java Fri Jan 14 12:03:22 2011
@@ -108,11 +108,11 @@ public class MockRandomCodec extends Cod
}
if (random.nextBoolean()) {
- final int freqCutoff = _TestUtil.nextInt(random, 1, 20);
+ final int totTFCutoff = _TestUtil.nextInt(random, 1, 20);
if (LuceneTestCase.VERBOSE) {
- System.out.println("MockRandomCodec: pulsing postings with freqCutoff=" + freqCutoff);
+ System.out.println("MockRandomCodec: pulsing postings with totTFCutoff=" + totTFCutoff);
}
- postingsWriter = new PulsingPostingsWriterImpl(freqCutoff, postingsWriter);
+ postingsWriter = new PulsingPostingsWriterImpl(totTFCutoff, postingsWriter);
}
final TermsIndexWriterBase indexWriter;
@@ -218,9 +218,9 @@ public class MockRandomCodec extends Cod
}
if (random.nextBoolean()) {
- final int freqCutoff = _TestUtil.nextInt(random, 1, 20);
+ final int totTFCutoff = _TestUtil.nextInt(random, 1, 20);
if (LuceneTestCase.VERBOSE) {
- System.out.println("MockRandomCodec: reading pulsing postings with freqCutoff=" + freqCutoff);
+ System.out.println("MockRandomCodec: reading pulsing postings with totTFCutoff=" + totTFCutoff);
}
postingsReader = new PulsingPostingsReaderImpl(postingsReader);
}