You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by mi...@apache.org on 2011/02/01 00:35:03 UTC
svn commit: r1065855 - in /lucene/dev/trunk: lucene/
lucene/src/java/org/apache/lucene/index/
lucene/src/java/org/apache/lucene/index/codecs/
lucene/src/test/org/apache/lucene/index/ modules/benchmark/conf/
Author: mikemccand
Date: Mon Jan 31 23:35:02 2011
New Revision: 1065855
URL: http://svn.apache.org/viewvc?rev=1065855&view=rev
Log:
LUCENE-2897: apply delete-by-term on flushed segment while we flush (still buffer delete-by-terms for past segments)
Added:
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/FrozenBufferedDeletes.java (with props)
Modified:
lucene/dev/trunk/lucene/CHANGES.txt
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/BufferedDeletes.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/BufferedDeletesStream.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriter.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IndexWriter.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/LogMergePolicy.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/SegmentMerger.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/SegmentWriteState.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/codecs/BlockTermsReader.java
lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestAddIndexes.java
lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestCodecs.java
lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestIndexWriter.java
lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestIndexWriterDelete.java
lucene/dev/trunk/modules/benchmark/conf/createLineFile.alg
Modified: lucene/dev/trunk/lucene/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/CHANGES.txt?rev=1065855&r1=1065854&r2=1065855&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/CHANGES.txt (original)
+++ lucene/dev/trunk/lucene/CHANGES.txt Mon Jan 31 23:35:02 2011
@@ -330,6 +330,9 @@ Optimizations
seek the term dictionary in TermQuery / TermWeight.
(Simon Willnauer, Mike McCandless, Robert Muir)
+* LUCENE-2897: Apply deleted terms while flushing a segment. We still
+ buffer deleted terms to later apply to past segments. (Mike McCandless)
+
Bug fixes
* LUCENE-2633: PackedInts Packed32 and Packed64 did not support internal
Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/BufferedDeletes.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/BufferedDeletes.java?rev=1065855&r1=1065854&r2=1065855&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/BufferedDeletes.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/BufferedDeletes.java Mon Jan 31 23:35:02 2011
@@ -18,21 +18,23 @@ package org.apache.lucene.index;
*/
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.search.Query;
import org.apache.lucene.util.RamUsageEstimator;
+import org.apache.lucene.index.BufferedDeletesStream.QueryAndLimit;
-/** Holds buffered deletes, by docID, term or query for a
- * single segment. This is used to hold buffered pending
- * deletes against the to-be-flushed segment as well as
- * per-segment deletes for each segment in the index. */
+/* Holds buffered deletes, by docID, term or query for a
+ * single segment. This is used to hold buffered pending
+ * deletes against the to-be-flushed segment. Once the
+ * deletes are pushed (on flush in DocumentsWriter), these
+ * deletes are converted to a FrozenDeletes instance. */
// NOTE: we are sync'd by BufferedDeletes, ie, all access to
// instances of this class is via sync'd methods on
@@ -63,13 +65,8 @@ class BufferedDeletes {
undercount (say 24 bytes). Integer is OBJ_HEADER + INT. */
final static int BYTES_PER_DEL_QUERY = 5*RamUsageEstimator.NUM_BYTES_OBJECT_REF + 2*RamUsageEstimator.NUM_BYTES_OBJECT_HEADER + 2*RamUsageEstimator.NUM_BYTES_INT + 24;
- // TODO: many of the deletes stored here will map to
- // Integer.MAX_VALUE; we could be more efficient for this
- // case ie use a SortedSet not a SortedMap. But: Java's
- // SortedSet impls are simply backed by a Map so we won't
- // save anything unless we do something custom...
final AtomicInteger numTermDeletes = new AtomicInteger();
- final SortedMap<Term,Integer> terms = new TreeMap<Term,Integer>();
+ final Map<Term,Integer> terms;
final Map<Query,Integer> queries = new HashMap<Query,Integer>();
final List<Integer> docIDs = new ArrayList<Integer>();
@@ -81,6 +78,14 @@ class BufferedDeletes {
long gen;
+ public BufferedDeletes(boolean sortTerms) {
+ if (sortTerms) {
+ terms = new TreeMap<Term,Integer>();
+ } else {
+ terms = new HashMap<Term,Integer>();
+ }
+ }
+
@Override
public String toString() {
if (VERBOSE_DELETES) {
@@ -130,6 +135,26 @@ class BufferedDeletes {
// should already be cleared
}
+ void update(FrozenBufferedDeletes in) {
+ numTermDeletes.addAndGet(in.numTermDeletes);
+ for(Term term : in.terms) {
+ if (!terms.containsKey(term)) {
+ // only incr bytesUsed if this term wasn't already buffered:
+ bytesUsed.addAndGet(BYTES_PER_DEL_TERM);
+ }
+ terms.put(term, MAX_INT);
+ }
+
+ for(int queryIdx=0;queryIdx<in.queries.length;queryIdx++) {
+ final Query query = in.queries[queryIdx];
+ if (!queries.containsKey(query)) {
+ // only incr bytesUsed if this query wasn't already buffered:
+ bytesUsed.addAndGet(BYTES_PER_DEL_QUERY);
+ }
+ queries.put(query, MAX_INT);
+ }
+ }
+
public void addQuery(Query query, int docIDUpto) {
Integer current = queries.put(query, docIDUpto);
// increment bytes used only if the query wasn't added so far.
@@ -162,6 +187,43 @@ class BufferedDeletes {
bytesUsed.addAndGet(BYTES_PER_DEL_TERM + term.bytes.length);
}
}
+
+ public Iterable<Term> termsIterable() {
+ return new Iterable<Term>() {
+ // @Override -- not until Java 1.6
+ public Iterator<Term> iterator() {
+ return terms.keySet().iterator();
+ }
+ };
+ }
+
+ public Iterable<QueryAndLimit> queriesIterable() {
+ return new Iterable<QueryAndLimit>() {
+
+ // @Override -- not until Java 1.6
+ public Iterator<QueryAndLimit> iterator() {
+ return new Iterator<QueryAndLimit>() {
+ private final Iterator<Map.Entry<Query,Integer>> iter = queries.entrySet().iterator();
+
+ // @Override -- not until Java 1.6
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ // @Override -- not until Java 1.6
+ public QueryAndLimit next() {
+ final Map.Entry<Query,Integer> ent = iter.next();
+ return new QueryAndLimit(ent.getKey(), ent.getValue());
+ }
+
+ // @Override -- not until Java 1.6
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+ };
+ }
void clear() {
terms.clear();
Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/BufferedDeletesStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/BufferedDeletesStream.java?rev=1065855&r1=1065854&r2=1065855&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/BufferedDeletesStream.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/BufferedDeletesStream.java Mon Jan 31 23:35:02 2011
@@ -22,7 +22,6 @@ import java.io.PrintStream;
import java.util.List;
import java.util.ArrayList;
import java.util.Date;
-import java.util.Map.Entry;
import java.util.Comparator;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicInteger;
@@ -52,7 +51,7 @@ import org.apache.lucene.search.Weight;
class BufferedDeletesStream {
// TODO: maybe linked list?
- private final List<BufferedDeletes> deletes = new ArrayList<BufferedDeletes>();
+ private final List<FrozenBufferedDeletes> deletes = new ArrayList<FrozenBufferedDeletes>();
// Starts at 1 so that SegmentInfos that have never had
// deletes applied (whose bufferedDelGen defaults to 0)
@@ -83,13 +82,13 @@ class BufferedDeletesStream {
// Appends a new packet of buffered deletes to the stream,
// setting its generation:
- public synchronized void push(BufferedDeletes packet) {
+ public synchronized void push(FrozenBufferedDeletes packet) {
assert packet.any();
assert checkDeleteStats();
- packet.gen = nextGen++;
+ assert packet.gen < nextGen;
deletes.add(packet);
- numTerms.addAndGet(packet.numTermDeletes.get());
- bytesUsed.addAndGet(packet.bytesUsed.get());
+ numTerms.addAndGet(packet.numTermDeletes);
+ bytesUsed.addAndGet(packet.bytesUsed);
if (infoStream != null) {
message("push deletes " + packet + " delGen=" + packet.gen + " packetCount=" + deletes.size());
}
@@ -182,14 +181,14 @@ class BufferedDeletesStream {
while (infosIDX >= 0) {
//System.out.println("BD: cycle delIDX=" + delIDX + " infoIDX=" + infosIDX);
- final BufferedDeletes packet = delIDX >= 0 ? deletes.get(delIDX) : null;
+ final FrozenBufferedDeletes packet = delIDX >= 0 ? deletes.get(delIDX) : null;
final SegmentInfo info = infos2.get(infosIDX);
final long segGen = info.getBufferedDeletesGen();
if (packet != null && segGen < packet.gen) {
//System.out.println(" coalesce");
if (coalescedDeletes == null) {
- coalescedDeletes = new BufferedDeletes();
+ coalescedDeletes = new BufferedDeletes(true);
}
coalescedDeletes.update(packet);
delIDX--;
@@ -202,25 +201,25 @@ class BufferedDeletesStream {
int delCount = 0;
try {
if (coalescedDeletes != null) {
- delCount += applyDeletes(coalescedDeletes, reader);
+ //System.out.println(" del coalesced");
+ delCount += applyTermDeletes(coalescedDeletes.termsIterable(), reader);
+ delCount += applyQueryDeletes(coalescedDeletes.queriesIterable(), reader);
}
- delCount += applyDeletes(packet, reader);
+ //System.out.println(" del exact");
+ // Don't delete by Term here; DocumentsWriter
+ // already did that on flush:
+ delCount += applyQueryDeletes(packet.queriesIterable(), reader);
} finally {
readerPool.release(reader);
}
anyNewDeletes |= delCount > 0;
- // We've applied doc ids, and they're only applied
- // on the current segment
- bytesUsed.addAndGet(-packet.docIDs.size() * BufferedDeletes.BYTES_PER_DEL_DOCID);
- packet.clearDocIDs();
-
if (infoStream != null) {
message("seg=" + info + " segGen=" + segGen + " segDeletes=[" + packet + "]; coalesced deletes=[" + (coalescedDeletes == null ? "null" : coalescedDeletes) + "] delCount=" + delCount);
}
if (coalescedDeletes == null) {
- coalescedDeletes = new BufferedDeletes();
+ coalescedDeletes = new BufferedDeletes(true);
}
coalescedDeletes.update(packet);
delIDX--;
@@ -236,7 +235,8 @@ class BufferedDeletesStream {
SegmentReader reader = readerPool.get(info, false);
int delCount = 0;
try {
- delCount += applyDeletes(coalescedDeletes, reader);
+ delCount += applyTermDeletes(coalescedDeletes.termsIterable(), reader);
+ delCount += applyQueryDeletes(coalescedDeletes.queriesIterable(), reader);
} finally {
readerPool.release(reader);
}
@@ -301,121 +301,122 @@ class BufferedDeletesStream {
message("pruneDeletes: prune " + count + " packets; " + (deletes.size() - count) + " packets remain");
}
for(int delIDX=0;delIDX<count;delIDX++) {
- final BufferedDeletes packet = deletes.get(delIDX);
- numTerms.addAndGet(-packet.numTermDeletes.get());
+ final FrozenBufferedDeletes packet = deletes.get(delIDX);
+ numTerms.addAndGet(-packet.numTermDeletes);
assert numTerms.get() >= 0;
- bytesUsed.addAndGet(-packet.bytesUsed.get());
+ bytesUsed.addAndGet(-packet.bytesUsed);
assert bytesUsed.get() >= 0;
}
deletes.subList(0, count).clear();
}
}
- private synchronized long applyDeletes(BufferedDeletes deletes, SegmentReader reader) throws IOException {
-
+ // Delete by Term
+ private synchronized long applyTermDeletes(Iterable<Term> termsIter, SegmentReader reader) throws IOException {
long delCount = 0;
+ Fields fields = reader.fields();
+ if (fields == null) {
+ // This reader has no postings
+ return 0;
+ }
- assert checkDeleteTerm(null);
-
- if (deletes.terms.size() > 0) {
- Fields fields = reader.fields();
- if (fields == null) {
- // This reader has no postings
- return 0;
- }
-
- TermsEnum termsEnum = null;
+ TermsEnum termsEnum = null;
- String currentField = null;
- DocsEnum docs = null;
+ String currentField = null;
+ DocsEnum docs = null;
- for (Entry<Term,Integer> entry: deletes.terms.entrySet()) {
- Term term = entry.getKey();
- // Since we visit terms sorted, we gain performance
- // by re-using the same TermsEnum and seeking only
- // forwards
- if (term.field() != currentField) {
- assert currentField == null || currentField.compareTo(term.field()) < 0;
- currentField = term.field();
- Terms terms = fields.terms(currentField);
- if (terms != null) {
- termsEnum = terms.iterator();
- } else {
- termsEnum = null;
- }
- }
-
- if (termsEnum == null) {
- continue;
+ assert checkDeleteTerm(null);
+
+ for (Term term : termsIter) {
+ // Since we visit terms sorted, we gain performance
+ // by re-using the same TermsEnum and seeking only
+ // forwards
+ if (term.field() != currentField) {
+ assert currentField == null || currentField.compareTo(term.field()) < 0;
+ currentField = term.field();
+ Terms terms = fields.terms(currentField);
+ if (terms != null) {
+ termsEnum = terms.iterator();
+ } else {
+ termsEnum = null;
}
- assert checkDeleteTerm(term);
+ }
+
+ if (termsEnum == null) {
+ continue;
+ }
+ assert checkDeleteTerm(term);
+
+ // System.out.println(" term=" + term);
- if (termsEnum.seek(term.bytes(), false) == TermsEnum.SeekStatus.FOUND) {
- DocsEnum docsEnum = termsEnum.docs(reader.getDeletedDocs(), docs);
+ if (termsEnum.seek(term.bytes(), false) == TermsEnum.SeekStatus.FOUND) {
+ DocsEnum docsEnum = termsEnum.docs(reader.getDeletedDocs(), docs);
- if (docsEnum != null) {
- docs = docsEnum;
- final int limit = entry.getValue();
- while (true) {
- final int docID = docs.nextDoc();
- if (docID == DocsEnum.NO_MORE_DOCS || docID >= limit) {
- break;
- }
- reader.deleteDocument(docID);
- // TODO: we could/should change
- // reader.deleteDocument to return boolean
- // true if it did in fact delete, because here
- // we could be deleting an already-deleted doc
- // which makes this an upper bound:
- delCount++;
+ if (docsEnum != null) {
+ while (true) {
+ final int docID = docsEnum.nextDoc();
+ if (docID == DocsEnum.NO_MORE_DOCS) {
+ break;
}
+ reader.deleteDocument(docID);
+ // TODO: we could/should change
+ // reader.deleteDocument to return boolean
+ // true if it did in fact delete, because here
+ // we could be deleting an already-deleted doc
+ // which makes this an upper bound:
+ delCount++;
}
}
}
}
- // Delete by docID
- for (Integer docIdInt : deletes.docIDs) {
- int docID = docIdInt.intValue();
- reader.deleteDocument(docID);
- delCount++;
- }
-
- // Delete by query
- if (deletes.queries.size() > 0) {
- IndexSearcher searcher = new IndexSearcher(reader);
- assert searcher.getTopReaderContext().isAtomic;
- final AtomicReaderContext readerContext = (AtomicReaderContext) searcher.getTopReaderContext();
- try {
- for (Entry<Query, Integer> entry : deletes.queries.entrySet()) {
- Query query = entry.getKey();
- int limit = entry.getValue().intValue();
- Weight weight = query.weight(searcher);
- Scorer scorer = weight.scorer(readerContext, Weight.ScorerContext.def());
- if (scorer != null) {
- while(true) {
- int doc = scorer.nextDoc();
- if (doc >= limit)
- break;
-
- reader.deleteDocument(doc);
- // TODO: we could/should change
- // reader.deleteDocument to return boolean
- // true if it did in fact delete, because here
- // we could be deleting an already-deleted doc
- // which makes this an upper bound:
- delCount++;
- }
+ return delCount;
+ }
+
+ public static class QueryAndLimit {
+ public final Query query;
+ public final int limit;
+ public QueryAndLimit(Query query, int limit) {
+ this.query = query;
+ this.limit = limit;
+ }
+ }
+
+ // Delete by query
+ private synchronized long applyQueryDeletes(Iterable<QueryAndLimit> queriesIter, SegmentReader reader) throws IOException {
+ long delCount = 0;
+ IndexSearcher searcher = new IndexSearcher(reader);
+ assert searcher.getTopReaderContext().isAtomic;
+ final AtomicReaderContext readerContext = (AtomicReaderContext) searcher.getTopReaderContext();
+ try {
+ for (QueryAndLimit ent : queriesIter) {
+ Query query = ent.query;
+ int limit = ent.limit;
+ Weight weight = query.weight(searcher);
+ Scorer scorer = weight.scorer(readerContext, Weight.ScorerContext.def());
+ if (scorer != null) {
+ while(true) {
+ int doc = scorer.nextDoc();
+ if (doc >= limit)
+ break;
+
+ reader.deleteDocument(doc);
+ // TODO: we could/should change
+ // reader.deleteDocument to return boolean
+ // true if it did in fact delete, because here
+ // we could be deleting an already-deleted doc
+ // which makes this an upper bound:
+ delCount++;
}
}
- } finally {
- searcher.close();
}
+ } finally {
+ searcher.close();
}
return delCount;
}
-
+
// used only by assert
private boolean checkDeleteTerm(Term term) {
if (term != null) {
@@ -429,9 +430,9 @@ class BufferedDeletesStream {
private boolean checkDeleteStats() {
int numTerms2 = 0;
long bytesUsed2 = 0;
- for(BufferedDeletes packet : deletes) {
- numTerms2 += packet.numTermDeletes.get();
- bytesUsed2 += packet.bytesUsed.get();
+ for(FrozenBufferedDeletes packet : deletes) {
+ numTerms2 += packet.numTermDeletes;
+ bytesUsed2 += packet.bytesUsed;
}
assert numTerms2 == numTerms.get(): "numTerms2=" + numTerms2 + " vs " + numTerms.get();
assert bytesUsed2 == bytesUsed.get(): "bytesUsed2=" + bytesUsed2 + " vs " + bytesUsed;
Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java?rev=1065855&r1=1065854&r2=1065855&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java Mon Jan 31 23:35:02 2011
@@ -35,9 +35,11 @@ import org.apache.lucene.store.AlreadyCl
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.RAMFile;
import org.apache.lucene.util.ArrayUtil;
+import org.apache.lucene.util.BitVector;
+import org.apache.lucene.util.RamUsageEstimator;
import org.apache.lucene.util.RecyclingByteBlockAllocator;
import org.apache.lucene.util.ThreadInterruptedException;
-import org.apache.lucene.util.RamUsageEstimator;
+
import static org.apache.lucene.util.ByteBlockPool.BYTE_BLOCK_MASK;
import static org.apache.lucene.util.ByteBlockPool.BYTE_BLOCK_SIZE;
@@ -133,8 +135,9 @@ final class DocumentsWriter {
// this, they wait for others to finish first
private final int maxThreadStates;
+ // TODO: cutover to BytesRefHash
// Deletes for our still-in-RAM (to be flushed next) segment
- private BufferedDeletes pendingDeletes = new BufferedDeletes();
+ private BufferedDeletes pendingDeletes = new BufferedDeletes(false);
static class DocState {
DocumentsWriter docWriter;
@@ -336,6 +339,9 @@ final class DocumentsWriter {
return doFlush;
}
+ // TODO: we could check w/ FreqProxTermsWriter: if the
+ // term doesn't exist, don't bother buffering into the
+ // per-DWPT map (but still must go into the global map)
boolean deleteTerm(Term term, boolean skipWait) {
final boolean doFlush = flushControl.waitUpdate(0, 1, skipWait);
synchronized(this) {
@@ -507,17 +513,19 @@ final class DocumentsWriter {
private void pushDeletes(SegmentInfo newSegment, SegmentInfos segmentInfos) {
// Lock order: DW -> BD
+ final long delGen = bufferedDeletesStream.getNextGen();
if (pendingDeletes.any()) {
if (segmentInfos.size() > 0 || newSegment != null) {
+ final FrozenBufferedDeletes packet = new FrozenBufferedDeletes(pendingDeletes, delGen);
if (infoStream != null) {
message("flush: push buffered deletes");
}
- bufferedDeletesStream.push(pendingDeletes);
+ bufferedDeletesStream.push(packet);
if (infoStream != null) {
- message("flush: delGen=" + pendingDeletes.gen);
+ message("flush: delGen=" + packet.gen);
}
if (newSegment != null) {
- newSegment.setBufferedDeletesGen(pendingDeletes.gen);
+ newSegment.setBufferedDeletesGen(packet.gen);
}
} else {
if (infoStream != null) {
@@ -527,9 +535,9 @@ final class DocumentsWriter {
// there are no segments, the deletions cannot
// affect anything.
}
- pendingDeletes = new BufferedDeletes();
+ pendingDeletes.clear();
} else if (newSegment != null) {
- newSegment.setBufferedDeletesGen(bufferedDeletesStream.getNextGen());
+ newSegment.setBufferedDeletesGen(delGen);
}
}
@@ -580,7 +588,19 @@ final class DocumentsWriter {
final SegmentWriteState flushState = new SegmentWriteState(infoStream, directory, segment, fieldInfos,
numDocs, writer.getConfig().getTermIndexInterval(),
- SegmentCodecs.build(fieldInfos, writer.codecs));
+ SegmentCodecs.build(fieldInfos, writer.codecs),
+ pendingDeletes);
+ // Apply delete-by-docID now (delete-byDocID only
+ // happens when an exception is hit processing that
+ // doc, eg if analyzer has some problem w/ the text):
+ if (pendingDeletes.docIDs.size() > 0) {
+ flushState.deletedDocs = new BitVector(numDocs);
+ for(int delDocID : pendingDeletes.docIDs) {
+ flushState.deletedDocs.set(delDocID);
+ }
+ pendingDeletes.bytesUsed.addAndGet(-pendingDeletes.docIDs.size() * BufferedDeletes.BYTES_PER_DEL_DOCID);
+ pendingDeletes.docIDs.clear();
+ }
newSegment = new SegmentInfo(segment, numDocs, directory, false, fieldInfos.hasProx(), flushState.segmentCodecs, false);
@@ -592,10 +612,14 @@ final class DocumentsWriter {
double startMBUsed = bytesUsed()/1024./1024.;
consumer.flush(threads, flushState);
+
newSegment.setHasVectors(flushState.hasVectors);
if (infoStream != null) {
message("new segment has " + (flushState.hasVectors ? "vectors" : "no vectors"));
+ if (flushState.deletedDocs != null) {
+ message("new segment has " + flushState.deletedDocs.count() + " deleted docs");
+ }
message("flushedFiles=" + newSegment.files());
message("flushed codecs=" + newSegment.getSegmentCodecs());
}
@@ -616,6 +640,30 @@ final class DocumentsWriter {
newSegment.setUseCompoundFile(true);
}
+ // Must write deleted docs after the CFS so we don't
+ // slurp the del file into CFS:
+ if (flushState.deletedDocs != null) {
+ final int delCount = flushState.deletedDocs.count();
+ assert delCount > 0;
+ newSegment.setDelCount(delCount);
+ newSegment.advanceDelGen();
+ final String delFileName = newSegment.getDelFileName();
+ boolean success2 = false;
+ try {
+ flushState.deletedDocs.write(directory, delFileName);
+ success2 = true;
+ } finally {
+ if (!success2) {
+ try {
+ directory.deleteFile(delFileName);
+ } catch (Throwable t) {
+ // suppress this so we keep throwing the
+ // original exception
+ }
+ }
+ }
+ }
+
if (infoStream != null) {
message("flush: segment=" + newSegment);
final double newSegmentSizeNoStore = newSegment.sizeInBytes(false)/1024./1024.;
Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriter.java?rev=1065855&r1=1065854&r2=1065855&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriter.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriter.java Mon Jan 31 23:35:02 2011
@@ -26,8 +26,9 @@ import java.util.Map;
import org.apache.lucene.index.codecs.FieldsConsumer;
import org.apache.lucene.index.codecs.PostingsConsumer;
-import org.apache.lucene.index.codecs.TermsConsumer;
import org.apache.lucene.index.codecs.TermStats;
+import org.apache.lucene.index.codecs.TermsConsumer;
+import org.apache.lucene.util.BitVector;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.CollectionUtil;
@@ -108,7 +109,7 @@ final class FreqProxTermsWriter extends
// If this field has postings then add them to the
// segment
- appendPostings(fields, consumer);
+ appendPostings(fieldName, state, fields, consumer);
for(int i=0;i<fields.length;i++) {
TermsHashPerField perField = fields[i].termsHashPerField;
@@ -133,7 +134,8 @@ final class FreqProxTermsWriter extends
/* Walk through all unique text tokens (Posting
* instances) found in this field and serialize them
* into a single RAM segment. */
- void appendPostings(FreqProxTermsWriterPerField[] fields,
+ void appendPostings(String fieldName, SegmentWriteState state,
+ FreqProxTermsWriterPerField[] fields,
FieldsConsumer consumer)
throws CorruptIndexException, IOException {
@@ -156,11 +158,20 @@ final class FreqProxTermsWriter extends
assert result;
}
+ final Term protoTerm = new Term(fieldName);
+
FreqProxFieldMergeState[] termStates = new FreqProxFieldMergeState[numFields];
final boolean currentFieldOmitTermFreqAndPositions = fields[0].fieldInfo.omitTermFreqAndPositions;
//System.out.println("flush terms field=" + fields[0].fieldInfo.name);
+ final Map<Term,Integer> segDeletes;
+ if (state.segDeletes != null && state.segDeletes.terms.size() > 0) {
+ segDeletes = state.segDeletes.terms;
+ } else {
+ segDeletes = null;
+ }
+
// TODO: really TermsHashPerField should take over most
// of this loop, including merge sort of terms from
// multiple threads and interacting with the
@@ -195,6 +206,18 @@ final class FreqProxTermsWriter extends
final PostingsConsumer postingsConsumer = termsConsumer.startTerm(text);
+ final int delDocLimit;
+ if (segDeletes != null) {
+ final Integer docIDUpto = segDeletes.get(protoTerm.createTerm(text));
+ if (docIDUpto != null) {
+ delDocLimit = docIDUpto;
+ } else {
+ delDocLimit = 0;
+ }
+ } else {
+ delDocLimit = 0;
+ }
+
// Now termStates has numToMerge FieldMergeStates
// which all share the same term. Now we must
// interleave the docID streams.
@@ -214,7 +237,28 @@ final class FreqProxTermsWriter extends
assert minState.docID < flushedDocCount: "doc=" + minState.docID + " maxDoc=" + flushedDocCount;
+ // NOTE: we could check here if the docID was
+ // deleted, and skip it. However, this is somewhat
+ // dangerous because it can yield non-deterministic
+ // behavior since we may see the docID before we see
+ // the term that caused it to be deleted. This
+ // would mean some (but not all) of its postings may
+ // make it into the index, which'd alter the docFreq
+ // for those terms. We could fix this by doing two
+ // passes, ie first sweep marks all del docs, and
+ // 2nd sweep does the real flush, but I suspect
+ // that'd add too much time to flush.
+
postingsConsumer.startDoc(minState.docID, termDocFreq);
+ if (minState.docID < delDocLimit) {
+ // Mark it deleted. TODO: we could also skip
+ // writing its postings; this would be
+ // deterministic (just for this Term's docs).
+ if (state.deletedDocs == null) {
+ state.deletedDocs = new BitVector(state.numDocs);
+ }
+ state.deletedDocs.set(minState.docID);
+ }
final ByteSliceReader prox = minState.prox;
Added: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/FrozenBufferedDeletes.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/FrozenBufferedDeletes.java?rev=1065855&view=auto
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/FrozenBufferedDeletes.java (added)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/FrozenBufferedDeletes.java Mon Jan 31 23:35:02 2011
@@ -0,0 +1,145 @@
+package org.apache.lucene.index;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.lucene.search.Query;
+import org.apache.lucene.util.RamUsageEstimator;
+import org.apache.lucene.index.BufferedDeletesStream.QueryAndLimit;
+
+/** Holds buffered deletes by term or query, once pushed.
+ * Pushed deltes are write-once, so we shift to more
+ * memory efficient data structure to hold them. We don't
+ * hold docIDs because these are applied on flush. */
+
+class FrozenBufferedDeletes {
+
+ /* Rough logic: Term is object w/
+ String field and String text (OBJ_HEADER + 2*POINTER).
+ We don't count Term's field since it's interned.
+ Term's text is String (OBJ_HEADER + 4*INT + POINTER +
+ OBJ_HEADER + string.length*CHAR). */
+ final static int BYTES_PER_DEL_TERM = 3*RamUsageEstimator.NUM_BYTES_OBJECT_REF + 3*RamUsageEstimator.NUM_BYTES_OBJECT_HEADER + 4*RamUsageEstimator.NUM_BYTES_INT;
+
+ /* Query we often undercount (say 24 bytes), plus int. */
+ final static int BYTES_PER_DEL_QUERY = RamUsageEstimator.NUM_BYTES_OBJECT_REF + RamUsageEstimator.NUM_BYTES_INT + 24;
+
+ // Terms, in sorted order:
+ // TODO: would be more RAM efficient to store BytesRef[],
+ // per field:
+ final Term[] terms;
+
+ // Parallel array of deleted query, and the docIDUpto for
+ // each
+ final Query[] queries;
+ final int[] queryLimits;
+ final int bytesUsed;
+ final int numTermDeletes;
+ final long gen;
+
+ public FrozenBufferedDeletes(BufferedDeletes deletes, long gen) {
+ terms = deletes.terms.keySet().toArray(new Term[deletes.terms.size()]);
+ queries = new Query[deletes.queries.size()];
+ queryLimits = new int[deletes.queries.size()];
+ int upto = 0;
+ for(Map.Entry<Query,Integer> ent : deletes.queries.entrySet()) {
+ queries[upto] = ent.getKey();
+ queryLimits[upto] = ent.getValue();
+ upto++;
+ }
+ bytesUsed = terms.length * BYTES_PER_DEL_TERM + queries.length * BYTES_PER_DEL_QUERY;
+ numTermDeletes = deletes.numTermDeletes.get();
+ this.gen = gen;
+ }
+
+ public Iterable<Term> termsIterable() {
+ return new Iterable<Term>() {
+ // @Override -- not until Java 1.6
+ public Iterator<Term> iterator() {
+ return new Iterator<Term>() {
+ private int upto;
+
+ // @Override -- not until Java 1.6
+ public boolean hasNext() {
+ return upto < terms.length;
+ }
+
+ // @Override -- not until Java 1.6
+ public Term next() {
+ return terms[upto++];
+ }
+
+ // @Override -- not until Java 1.6
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+ };
+ }
+
+ public Iterable<QueryAndLimit> queriesIterable() {
+ return new Iterable<QueryAndLimit>() {
+ // @Override -- not until Java 1.6
+ public Iterator<QueryAndLimit> iterator() {
+ return new Iterator<QueryAndLimit>() {
+ private int upto;
+
+ // @Override -- not until Java 1.6
+ public boolean hasNext() {
+ return upto < queries.length;
+ }
+
+ // @Override -- not until Java 1.6
+ public QueryAndLimit next() {
+ QueryAndLimit ret = new QueryAndLimit(queries[upto], queryLimits[upto]);
+ upto++;
+ return ret;
+ }
+
+ // @Override -- not until Java 1.6
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+ };
+ }
+
+ @Override
+ public String toString() {
+ String s = "";
+ if (numTermDeletes != 0) {
+ s += " " + numTermDeletes + " deleted terms (unique count=" + terms.length + ")";
+ }
+ if (queries.length != 0) {
+ s += " " + queries.length + " deleted queries";
+ }
+ if (bytesUsed != 0) {
+ s += " bytesUsed=" + bytesUsed;
+ }
+
+ return s;
+ }
+
+ boolean any() {
+ return terms.length > 0 || queries.length > 0;
+ }
+}
Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IndexWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IndexWriter.java?rev=1065855&r1=1065854&r2=1065855&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IndexWriter.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IndexWriter.java Mon Jan 31 23:35:02 2011
@@ -339,7 +339,7 @@ public class IndexWriter implements Clos
*/
IndexReader getReader(boolean applyAllDeletes) throws IOException {
ensureOpen();
-
+
final long tStart = System.currentTimeMillis();
if (infoStream != null) {
Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/LogMergePolicy.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/LogMergePolicy.java?rev=1065855&r1=1065854&r2=1065855&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/LogMergePolicy.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/LogMergePolicy.java Mon Jan 31 23:35:02 2011
@@ -661,7 +661,7 @@ public abstract class LogMergePolicy ext
sb.append("maxMergeSizeForOptimize=").append(maxMergeSizeForOptimize).append(", ");
sb.append("calibrateSizeByDeletes=").append(calibrateSizeByDeletes).append(", ");
sb.append("maxMergeDocs=").append(maxMergeDocs).append(", ");
- sb.append("useCompoundFile=").append(useCompoundFile);
+ sb.append("useCompoundFile=").append(useCompoundFile).append(", ");
sb.append("requireContiguousMerge=").append(requireContiguousMerge);
sb.append("]");
return sb.toString();
Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/SegmentMerger.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/SegmentMerger.java?rev=1065855&r1=1065854&r2=1065855&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/SegmentMerger.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/SegmentMerger.java Mon Jan 31 23:35:02 2011
@@ -266,7 +266,7 @@ final class SegmentMerger {
// details.
throw new RuntimeException("mergeFields produced an invalid result: docCount is " + docCount + " but fdx file size is " + fdxFileLength + " file=" + fileName + " file exists?=" + directory.fileExists(fileName) + "; now aborting this merge to prevent index corruption");
- segmentWriteState = new SegmentWriteState(null, directory, segment, fieldInfos, docCount, termIndexInterval, codecInfo);
+ segmentWriteState = new SegmentWriteState(null, directory, segment, fieldInfos, docCount, termIndexInterval, codecInfo, null);
return docCount;
}
Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/SegmentWriteState.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/SegmentWriteState.java?rev=1065855&r1=1065854&r2=1065855&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/SegmentWriteState.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/SegmentWriteState.java Mon Jan 31 23:35:02 2011
@@ -20,6 +20,7 @@ package org.apache.lucene.index;
import java.io.PrintStream;
import org.apache.lucene.store.Directory;
+import org.apache.lucene.util.BitVector;
/**
* @lucene.experimental
@@ -32,6 +33,16 @@ public class SegmentWriteState {
public final int numDocs;
public boolean hasVectors;
+ // Deletes to apply while we are flushing the segment. A
+ // Term is enrolled in here if it was deleted at one
+ // point, and it's mapped to the docIDUpto, meaning any
+ // docID < docIDUpto containing this term should be
+ // deleted.
+ public final BufferedDeletes segDeletes;
+
+ // Lazily created:
+ public BitVector deletedDocs;
+
final SegmentCodecs segmentCodecs;
public final String codecId;
@@ -57,8 +68,9 @@ public class SegmentWriteState {
public SegmentWriteState(PrintStream infoStream, Directory directory, String segmentName, FieldInfos fieldInfos,
- int numDocs, int termIndexInterval, SegmentCodecs segmentCodecs) {
+ int numDocs, int termIndexInterval, SegmentCodecs segmentCodecs, BufferedDeletes segDeletes) {
this.infoStream = infoStream;
+ this.segDeletes = segDeletes;
this.directory = directory;
this.segmentName = segmentName;
this.fieldInfos = fieldInfos;
@@ -80,5 +92,6 @@ public class SegmentWriteState {
termIndexInterval = state.termIndexInterval;
segmentCodecs = state.segmentCodecs;
this.codecId = codecId;
+ segDeletes = state.segDeletes;
}
}
Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/codecs/BlockTermsReader.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/codecs/BlockTermsReader.java?rev=1065855&r1=1065854&r2=1065855&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/codecs/BlockTermsReader.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/codecs/BlockTermsReader.java Mon Jan 31 23:35:02 2011
@@ -108,7 +108,7 @@ public class BlockTermsReader extends Fi
}
}
- private String segment;
+ //private String segment;
public BlockTermsReader(TermsIndexReaderBase indexReader, Directory dir, FieldInfos fieldInfos, String segment, PostingsReaderBase postingsReader, int readBufferSize,
Comparator<BytesRef> termComp, int termsCacheSize, String codecId)
@@ -118,7 +118,7 @@ public class BlockTermsReader extends Fi
termsCache = new DoubleBarrelLRUCache<FieldAndTerm,BlockTermState>(termsCacheSize);
this.termComp = termComp;
- this.segment = segment;
+ //this.segment = segment;
in = dir.openInput(IndexFileNames.segmentFileName(segment, codecId, BlockTermsWriter.TERMS_EXTENSION),
readBufferSize);
Modified: lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestAddIndexes.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestAddIndexes.java?rev=1065855&r1=1065854&r2=1065855&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestAddIndexes.java (original)
+++ lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestAddIndexes.java Mon Jan 31 23:35:02 2011
@@ -157,6 +157,7 @@ public class TestAddIndexes extends Luce
setUpDirs(dir, aux);
IndexWriter writer = newWriter(dir, newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer()).setOpenMode(OpenMode.APPEND));
+ writer.setInfoStream(VERBOSE ? System.out : null);
writer.addIndexes(aux);
// Adds 10 docs, then replaces them with another 10
Modified: lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestCodecs.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestCodecs.java?rev=1065855&r1=1065854&r2=1065855&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestCodecs.java (original)
+++ lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestCodecs.java Mon Jan 31 23:35:02 2011
@@ -589,7 +589,7 @@ public class TestCodecs extends LuceneTe
final int termIndexInterval = _TestUtil.nextInt(random, 13, 27);
final SegmentCodecs codecInfo = SegmentCodecs.build(fieldInfos, CodecProvider.getDefault());
- final SegmentWriteState state = new SegmentWriteState(null, dir, SEGMENT, fieldInfos, 10000, termIndexInterval, codecInfo);
+ final SegmentWriteState state = new SegmentWriteState(null, dir, SEGMENT, fieldInfos, 10000, termIndexInterval, codecInfo, null);
final FieldsConsumer consumer = state.segmentCodecs.codec().fieldsConsumer(state);
Arrays.sort(fields);
Modified: lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestIndexWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestIndexWriter.java?rev=1065855&r1=1065854&r2=1065855&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestIndexWriter.java (original)
+++ lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestIndexWriter.java Mon Jan 31 23:35:02 2011
@@ -2576,7 +2576,7 @@ public class TestIndexWriter extends Luc
count++;
}
}
- assertTrue("flush happened too quickly during " + (doIndexing ? "indexing" : "deleting") + " count=" + count, count > 2500);
+ assertTrue("flush happened too quickly during " + (doIndexing ? "indexing" : "deleting") + " count=" + count, count > 1500);
}
w.close();
dir.close();
Modified: lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestIndexWriterDelete.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestIndexWriterDelete.java?rev=1065855&r1=1065854&r2=1065855&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestIndexWriterDelete.java (original)
+++ lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestIndexWriterDelete.java Mon Jan 31 23:35:02 2011
@@ -157,8 +157,6 @@ public class TestIndexWriterDelete exten
assertEquals(0, modifier.getSegmentCount());
modifier.commit();
- modifier.commit();
-
IndexReader reader = IndexReader.open(dir, true);
assertEquals(1, reader.numDocs());
Modified: lucene/dev/trunk/modules/benchmark/conf/createLineFile.alg
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/modules/benchmark/conf/createLineFile.alg?rev=1065855&r1=1065854&r2=1065855&view=diff
==============================================================================
--- lucene/dev/trunk/modules/benchmark/conf/createLineFile.alg (original)
+++ lucene/dev/trunk/modules/benchmark/conf/createLineFile.alg Mon Jan 31 23:35:02 2011
@@ -29,10 +29,14 @@
#
# Where to get documents from:
-content.source=org.apache.lucene.benchmark.byTask.feeds.ReutersContentSource
+content.source=org.apache.lucene.benchmark.byTask.feeds.EnwikiContentSource
# Where to write the line file output:
-line.file.out=work/reuters.lines.txt
+line.file.out=/x/tmp/enwiki.out.txt
+
+docs.file=/x/lucene/data/enwiki/enwiki-20110115-pages-articles.xml
+
+keep.image.only.docs = false
# Stop after processing the document feed once:
content.source.forever=false