You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by bu...@apache.org on 2011/02/22 02:01:11 UTC
svn commit: r1073192 [7/32] - in /lucene/dev/branches/realtime_search: ./
dev-tools/eclipse/ dev-tools/idea/.idea/ dev-tools/idea/lucene/contrib/ant/
dev-tools/idea/lucene/contrib/demo/
dev-tools/idea/lucene/contrib/highlighter/ dev-tools/idea/lucene/c...
Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/BufferedDeletes.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/BufferedDeletes.java?rev=1073192&r1=1073191&r2=1073192&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/BufferedDeletes.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/BufferedDeletes.java Tue Feb 22 01:00:39 2011
@@ -17,440 +17,228 @@ package org.apache.lucene.index;
* limitations under the License.
*/
-import java.io.IOException;
-import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Iterator;
import java.util.HashMap;
-import java.util.Date;
-import java.util.Map.Entry;
+import java.util.List;
import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.lucene.index.IndexReader.AtomicReaderContext;
-import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
-import org.apache.lucene.search.Scorer;
-import org.apache.lucene.search.Weight;
+import org.apache.lucene.util.RamUsageEstimator;
+import org.apache.lucene.index.BufferedDeletesStream.QueryAndLimit;
-/** Holds a {@link SegmentDeletes} for each segment in the
- * index. */
+/* Holds buffered deletes, by docID, term or query for a
+ * single segment. This is used to hold buffered pending
+ * deletes against the to-be-flushed segment. Once the
+ * deletes are pushed (on flush in DocumentsWriter), these
+ * deletes are converted to a FrozenDeletes instance. */
+
+// NOTE: we are sync'd by BufferedDeletes, ie, all access to
+// instances of this class is via sync'd methods on
+// BufferedDeletes
class BufferedDeletes {
- // Deletes for all flushed/merged segments:
- private final Map<SegmentInfo,SegmentDeletes> deletesMap = new HashMap<SegmentInfo,SegmentDeletes>();
-
- // used only by assert
- private Term lastDeleteTerm;
-
- private PrintStream infoStream;
- private final AtomicLong bytesUsed = new AtomicLong();
- private final AtomicInteger numTerms = new AtomicInteger();
- private final int messageID;
-
- public BufferedDeletes(int messageID) {
- this.messageID = messageID;
- }
-
- private synchronized void message(String message) {
- if (infoStream != null) {
- infoStream.println("BD " + messageID + " [" + new Date() + "; " + Thread.currentThread().getName() + "]: BD " + message);
- }
- }
-
- public synchronized void setInfoStream(PrintStream infoStream) {
- this.infoStream = infoStream;
- }
-
- public synchronized void pushDeletes(SegmentDeletes newDeletes, SegmentInfo info) {
- pushDeletes(newDeletes, info, false);
- }
-
- // Moves all pending deletes onto the provided segment,
- // then clears the pending deletes
- public synchronized void pushDeletes(SegmentDeletes newDeletes, SegmentInfo info, boolean noLimit) {
- assert newDeletes.any();
- numTerms.addAndGet(newDeletes.numTermDeletes.get());
-
- if (!noLimit) {
- assert !deletesMap.containsKey(info);
- assert info != null;
- deletesMap.put(info, newDeletes);
- bytesUsed.addAndGet(newDeletes.bytesUsed.get());
+ /* Rough logic: HashMap has an array[Entry] w/ varying
+ load factor (say 2 * POINTER). Entry is object w/ Term
+ key, Integer val, int hash, Entry next
+ (OBJ_HEADER + 3*POINTER + INT). Term is object w/
+ String field and String text (OBJ_HEADER + 2*POINTER).
+ We don't count Term's field since it's interned.
+ Term's text is String (OBJ_HEADER + 4*INT + POINTER +
+ OBJ_HEADER + string.length*CHAR). Integer is
+ OBJ_HEADER + INT. */
+ final static int BYTES_PER_DEL_TERM = 8*RamUsageEstimator.NUM_BYTES_OBJECT_REF + 5*RamUsageEstimator.NUM_BYTES_OBJECT_HEADER + 6*RamUsageEstimator.NUM_BYTES_INT;
+
+ /* Rough logic: del docIDs are List<Integer>. Say list
+ allocates ~2X size (2*POINTER). Integer is OBJ_HEADER
+ + int */
+ final static int BYTES_PER_DEL_DOCID = 2*RamUsageEstimator.NUM_BYTES_OBJECT_REF + RamUsageEstimator.NUM_BYTES_OBJECT_HEADER + RamUsageEstimator.NUM_BYTES_INT;
+
+ /* Rough logic: HashMap has an array[Entry] w/ varying
+ load factor (say 2 * POINTER). Entry is object w/
+ Query key, Integer val, int hash, Entry next
+ (OBJ_HEADER + 3*POINTER + INT). Query we often
+ undercount (say 24 bytes). Integer is OBJ_HEADER + INT. */
+ final static int BYTES_PER_DEL_QUERY = 5*RamUsageEstimator.NUM_BYTES_OBJECT_REF + 2*RamUsageEstimator.NUM_BYTES_OBJECT_HEADER + 2*RamUsageEstimator.NUM_BYTES_INT + 24;
+
+ final AtomicInteger numTermDeletes = new AtomicInteger();
+ final Map<Term,Integer> terms;
+ final Map<Query,Integer> queries = new HashMap<Query,Integer>();
+ final List<Integer> docIDs = new ArrayList<Integer>();
+
+ public static final Integer MAX_INT = Integer.valueOf(Integer.MAX_VALUE);
+
+ final AtomicLong bytesUsed = new AtomicLong();
+
+ private final static boolean VERBOSE_DELETES = false;
+
+ long gen;
+
+ public BufferedDeletes(boolean sortTerms) {
+ if (sortTerms) {
+ terms = new TreeMap<Term,Integer>();
} else {
- final SegmentDeletes deletes = getDeletes(info);
- bytesUsed.addAndGet(-deletes.bytesUsed.get());
- deletes.update(newDeletes, noLimit);
- bytesUsed.addAndGet(deletes.bytesUsed.get());
+ terms = new HashMap<Term,Integer>();
}
- if (infoStream != null) {
- message("push deletes seg=" + info + " dels=" + getDeletes(info));
- }
- assert checkDeleteStats();
- }
-
- public synchronized void clear() {
- deletesMap.clear();
- numTerms.set(0);
- bytesUsed.set(0);
- }
-
- synchronized boolean any() {
- return bytesUsed.get() != 0;
- }
-
- public int numTerms() {
- return numTerms.get();
}
- public long bytesUsed() {
- return bytesUsed.get();
- }
-
- // IW calls this on finishing a merge. While the merge
- // was running, it's possible new deletes were pushed onto
- // our last (and only our last) segment. In this case we
- // must carry forward those deletes onto the merged
- // segment.
- synchronized void commitMerge(MergePolicy.OneMerge merge) {
- assert checkDeleteStats();
- if (infoStream != null) {
- message("commitMerge merge.info=" + merge.info + " merge.segments=" + merge.segments);
- }
- final SegmentInfo lastInfo = merge.segments.lastElement();
- final SegmentDeletes lastDeletes = deletesMap.get(lastInfo);
- if (lastDeletes != null) {
- deletesMap.remove(lastInfo);
- assert !deletesMap.containsKey(merge.info);
- deletesMap.put(merge.info, lastDeletes);
- // don't need to update numTerms/bytesUsed since we
- // are just moving the deletes from one info to
- // another
- if (infoStream != null) {
- message("commitMerge done: new deletions=" + lastDeletes);
+ @Override
+ public String toString() {
+ if (VERBOSE_DELETES) {
+ return "gen=" + gen + " numTerms=" + numTermDeletes + ", terms=" + terms
+ + ", queries=" + queries + ", docIDs=" + docIDs + ", bytesUsed="
+ + bytesUsed;
+ } else {
+ String s = "gen=" + gen;
+ if (numTermDeletes.get() != 0) {
+ s += " " + numTermDeletes.get() + " deleted terms (unique count=" + terms.size() + ")";
}
- } else if (infoStream != null) {
- message("commitMerge done: no new deletions");
- }
- assert !anyDeletes(merge.segments.range(0, merge.segments.size()-1));
- assert checkDeleteStats();
- }
-
- synchronized void clear(SegmentDeletes deletes) {
- deletes.clear();
- }
-
- public synchronized boolean applyDeletes(IndexWriter.ReaderPool readerPool, SegmentInfos segmentInfos, SegmentInfos applyInfos) throws IOException {
- if (!any()) {
- return false;
- }
- final long t0 = System.currentTimeMillis();
-
- if (infoStream != null) {
- message("applyDeletes: applyInfos=" + applyInfos + "; index=" + segmentInfos);
- }
-
- assert checkDeleteStats();
-
- assert applyInfos.size() > 0;
-
- boolean any = false;
-
- final SegmentInfo lastApplyInfo = applyInfos.lastElement();
- final int lastIdx = segmentInfos.indexOf(lastApplyInfo);
-
- final SegmentInfo firstInfo = applyInfos.firstElement();
- final int firstIdx = segmentInfos.indexOf(firstInfo);
-
- // applyInfos must be a slice of segmentInfos
- assert lastIdx - firstIdx + 1 == applyInfos.size();
-
- // iterate over all segment infos backwards
- // coalesceing deletes along the way
- // when we're at or below the last of the
- // segments to apply to, start applying the deletes
- // we traverse up to the first apply infos
- SegmentDeletes coalescedDeletes = null;
- boolean hasDeletes = false;
- for (int segIdx=segmentInfos.size()-1; segIdx >= firstIdx; segIdx--) {
- final SegmentInfo info = segmentInfos.info(segIdx);
- final SegmentDeletes deletes = deletesMap.get(info);
- assert deletes == null || deletes.any();
-
- if (deletes == null && coalescedDeletes == null) {
- continue;
+ if (queries.size() != 0) {
+ s += " " + queries.size() + " deleted queries";
}
-
- if (infoStream != null) {
- message("applyDeletes: seg=" + info + " segment's deletes=[" + (deletes == null ? "null" : deletes) + "]; coalesced deletes=[" + (coalescedDeletes == null ? "null" : coalescedDeletes) + "]");
+ if (docIDs.size() != 0) {
+ s += " " + docIDs.size() + " deleted docIDs";
+ }
+ if (bytesUsed.get() != 0) {
+ s += " bytesUsed=" + bytesUsed.get();
}
- hasDeletes |= deletes != null;
-
- if (segIdx <= lastIdx && hasDeletes) {
-
- final long delCountInc = applyDeletes(readerPool, info, coalescedDeletes, deletes);
+ return s;
+ }
+ }
- if (delCountInc != 0) {
- any = true;
- }
- if (infoStream != null) {
- message("deletes touched " + delCountInc + " docIDs");
- }
-
- if (deletes != null) {
- // we've applied doc ids, and they're only applied
- // on the current segment
- bytesUsed.addAndGet(-deletes.docIDs.size() * SegmentDeletes.BYTES_PER_DEL_DOCID);
- deletes.clearDocIDs();
- }
+ void update(BufferedDeletes in) {
+ numTermDeletes.addAndGet(in.numTermDeletes.get());
+ for (Map.Entry<Term,Integer> ent : in.terms.entrySet()) {
+ final Term term = ent.getKey();
+ if (!terms.containsKey(term)) {
+ // only incr bytesUsed if this term wasn't already buffered:
+ bytesUsed.addAndGet(BYTES_PER_DEL_TERM);
}
+ terms.put(term, MAX_INT);
+ }
- // now coalesce at the max limit
- if (deletes != null) {
- if (coalescedDeletes == null) {
- coalescedDeletes = new SegmentDeletes();
- }
- // TODO: we could make this single pass (coalesce as
- // we apply the deletes
- coalescedDeletes.update(deletes, true);
+ for (Map.Entry<Query,Integer> ent : in.queries.entrySet()) {
+ final Query query = ent.getKey();
+ if (!queries.containsKey(query)) {
+ // only incr bytesUsed if this query wasn't already buffered:
+ bytesUsed.addAndGet(BYTES_PER_DEL_QUERY);
}
+ queries.put(query, MAX_INT);
}
- // move all deletes to segment just before our merge.
- if (firstIdx > 0) {
-
- SegmentDeletes mergedDeletes = null;
- // TODO: we could also make this single pass
- for (SegmentInfo info : applyInfos) {
- final SegmentDeletes deletes = deletesMap.get(info);
- if (deletes != null) {
- assert deletes.any();
- if (mergedDeletes == null) {
- mergedDeletes = getDeletes(segmentInfos.info(firstIdx-1));
- numTerms.addAndGet(-mergedDeletes.numTermDeletes.get());
- assert numTerms.get() >= 0;
- bytesUsed.addAndGet(-mergedDeletes.bytesUsed.get());
- assert bytesUsed.get() >= 0;
- }
-
- mergedDeletes.update(deletes, true);
- }
- }
+ // docIDs never move across segments and the docIDs
+ // should already be cleared
+ }
- if (mergedDeletes != null) {
- numTerms.addAndGet(mergedDeletes.numTermDeletes.get());
- bytesUsed.addAndGet(mergedDeletes.bytesUsed.get());
+ void update(FrozenBufferedDeletes in) {
+ numTermDeletes.addAndGet(in.numTermDeletes);
+ for(Term term : in.terms) {
+ if (!terms.containsKey(term)) {
+ // only incr bytesUsed if this term wasn't already buffered:
+ bytesUsed.addAndGet(BYTES_PER_DEL_TERM);
}
+ terms.put(term, MAX_INT);
+ }
- if (infoStream != null) {
- if (mergedDeletes != null) {
- message("applyDeletes: merge all deletes into seg=" + segmentInfos.info(firstIdx-1) + ": " + mergedDeletes);
- } else {
- message("applyDeletes: no deletes to merge");
- }
+ for(int queryIdx=0;queryIdx<in.queries.length;queryIdx++) {
+ final Query query = in.queries[queryIdx];
+ if (!queries.containsKey(query)) {
+ // only incr bytesUsed if this query wasn't already buffered:
+ bytesUsed.addAndGet(BYTES_PER_DEL_QUERY);
}
- } else {
- // We drop the deletes in this case, because we've
- // applied them to segment infos starting w/ the first
- // segment. There are no prior segments so there's no
- // reason to keep them around. When the applyInfos ==
- // segmentInfos this means all deletes have been
- // removed:
+ queries.put(query, MAX_INT);
}
- remove(applyInfos);
-
- assert checkDeleteStats();
- assert applyInfos != segmentInfos || !any();
+ }
- if (infoStream != null) {
- message("applyDeletes took " + (System.currentTimeMillis()-t0) + " msec");
+ public void addQuery(Query query, int docIDUpto) {
+ Integer current = queries.put(query, docIDUpto);
+ // increment bytes used only if the query wasn't added so far.
+ if (current == null) {
+ bytesUsed.addAndGet(BYTES_PER_DEL_QUERY);
}
- return any;
}
- private synchronized long applyDeletes(IndexWriter.ReaderPool readerPool,
- SegmentInfo info,
- SegmentDeletes coalescedDeletes,
- SegmentDeletes segmentDeletes) throws IOException {
- assert readerPool.infoIsLive(info);
-
- assert coalescedDeletes == null || coalescedDeletes.docIDs.size() == 0;
+ public void addDocID(int docID) {
+ docIDs.add(Integer.valueOf(docID));
+ bytesUsed.addAndGet(BYTES_PER_DEL_DOCID);
+ }
- long delCount = 0;
+ public void addTerm(Term term, int docIDUpto) {
+ Integer current = terms.get(term);
+ if (current != null && docIDUpto < current) {
+ // Only record the new number if it's greater than the
+ // current one. This is important because if multiple
+ // threads are replacing the same doc at nearly the
+ // same time, it's possible that one thread that got a
+ // higher docID is scheduled before the other
+ // threads. If we blindly replace than we can
+ // incorrectly get both docs indexed.
+ return;
+ }
- // Lock order: IW -> BD -> RP
- SegmentReader reader = readerPool.get(info, false);
- try {
- if (coalescedDeletes != null) {
- delCount += applyDeletes(coalescedDeletes, reader);
- }
- if (segmentDeletes != null) {
- delCount += applyDeletes(segmentDeletes, reader);
- }
- } finally {
- readerPool.release(reader);
+ terms.put(term, Integer.valueOf(docIDUpto));
+ numTermDeletes.incrementAndGet();
+ if (current == null) {
+ bytesUsed.addAndGet(BYTES_PER_DEL_TERM + term.bytes.length);
}
- return delCount;
}
- private synchronized long applyDeletes(SegmentDeletes deletes, SegmentReader reader) throws IOException {
-
- long delCount = 0;
-
- assert checkDeleteTerm(null);
-
- if (deletes.terms.size() > 0) {
- Fields fields = reader.fields();
- if (fields == null) {
- // This reader has no postings
- return 0;
+ public Iterable<Term> termsIterable() {
+ return new Iterable<Term>() {
+ // @Override -- not until Java 1.6
+ public Iterator<Term> iterator() {
+ return terms.keySet().iterator();
}
+ };
+ }
- TermsEnum termsEnum = null;
-
- String currentField = null;
- DocsEnum docs = null;
+ public Iterable<QueryAndLimit> queriesIterable() {
+ return new Iterable<QueryAndLimit>() {
+
+ // @Override -- not until Java 1.6
+ public Iterator<QueryAndLimit> iterator() {
+ return new Iterator<QueryAndLimit>() {
+ private final Iterator<Map.Entry<Query,Integer>> iter = queries.entrySet().iterator();
- for (Entry<Term,Integer> entry: deletes.terms.entrySet()) {
- Term term = entry.getKey();
- // Since we visit terms sorted, we gain performance
- // by re-using the same TermsEnum and seeking only
- // forwards
- if (term.field() != currentField) {
- assert currentField == null || currentField.compareTo(term.field()) < 0;
- currentField = term.field();
- Terms terms = fields.terms(currentField);
- if (terms != null) {
- termsEnum = terms.iterator();
- } else {
- termsEnum = null;
+ // @Override -- not until Java 1.6
+ public boolean hasNext() {
+ return iter.hasNext();
}
- }
- if (termsEnum == null) {
- continue;
- }
- assert checkDeleteTerm(term);
-
- if (termsEnum.seek(term.bytes(), false) == TermsEnum.SeekStatus.FOUND) {
- DocsEnum docsEnum = termsEnum.docs(reader.getDeletedDocs(), docs);
-
- if (docsEnum != null) {
- docs = docsEnum;
- final int limit = entry.getValue();
- while (true) {
- final int docID = docs.nextDoc();
- if (docID == DocsEnum.NO_MORE_DOCS || docID >= limit) {
- break;
- }
- reader.deleteDocument(docID);
- // TODO: we could/should change
- // reader.deleteDocument to return boolean
- // true if it did in fact delete, because here
- // we could be deleting an already-deleted doc
- // which makes this an upper bound:
- delCount++;
- }
+ // @Override -- not until Java 1.6
+ public QueryAndLimit next() {
+ final Map.Entry<Query,Integer> ent = iter.next();
+ return new QueryAndLimit(ent.getKey(), ent.getValue());
}
- }
- }
- }
-
- // Delete by docID
- for (Integer docIdInt : deletes.docIDs) {
- int docID = docIdInt.intValue();
- reader.deleteDocument(docID);
- delCount++;
- }
- // Delete by query
- if (deletes.queries.size() > 0) {
- IndexSearcher searcher = new IndexSearcher(reader);
- assert searcher.getTopReaderContext().isAtomic;
- final AtomicReaderContext readerContext = (AtomicReaderContext) searcher.getTopReaderContext();
- try {
- for (Entry<Query, Integer> entry : deletes.queries.entrySet()) {
- Query query = entry.getKey();
- int limit = entry.getValue().intValue();
- Weight weight = query.weight(searcher);
- Scorer scorer = weight.scorer(readerContext, Weight.ScorerContext.def());
- if (scorer != null) {
- while(true) {
- int doc = scorer.nextDoc();
- if (doc >= limit)
- break;
-
- reader.deleteDocument(doc);
- // TODO: we could/should change
- // reader.deleteDocument to return boolean
- // true if it did in fact delete, because here
- // we could be deleting an already-deleted doc
- // which makes this an upper bound:
- delCount++;
- }
+ // @Override -- not until Java 1.6
+ public void remove() {
+ throw new UnsupportedOperationException();
}
- }
- } finally {
- searcher.close();
- }
- }
-
- return delCount;
- }
-
- public synchronized SegmentDeletes getDeletes(SegmentInfo info) {
- SegmentDeletes deletes = deletesMap.get(info);
- if (deletes == null) {
- deletes = new SegmentDeletes();
- deletesMap.put(info, deletes);
- }
- return deletes;
- }
-
- public synchronized void remove(SegmentInfos infos) {
- assert infos.size() > 0;
- for (SegmentInfo info : infos) {
- SegmentDeletes deletes = deletesMap.get(info);
- if (deletes != null) {
- bytesUsed.addAndGet(-deletes.bytesUsed.get());
- assert bytesUsed.get() >= 0: "bytesUsed=" + bytesUsed;
- numTerms.addAndGet(-deletes.numTermDeletes.get());
- assert numTerms.get() >= 0: "numTerms=" + numTerms;
- deletesMap.remove(info);
+ };
}
- }
- }
-
- // used only by assert
- private boolean anyDeletes(SegmentInfos infos) {
- for(SegmentInfo info : infos) {
- if (deletesMap.containsKey(info)) {
- return true;
- }
- }
- return false;
- }
-
- // used only by assert
- private boolean checkDeleteTerm(Term term) {
- if (term != null) {
- assert lastDeleteTerm == null || term.compareTo(lastDeleteTerm) > 0: "lastTerm=" + lastDeleteTerm + " vs term=" + term;
- }
- lastDeleteTerm = term;
- return true;
+ };
+ }
+
+ void clear() {
+ terms.clear();
+ queries.clear();
+ docIDs.clear();
+ numTermDeletes.set(0);
+ bytesUsed.set(0);
}
-
- // only for assert
- private boolean checkDeleteStats() {
- int numTerms2 = 0;
- long bytesUsed2 = 0;
- for(SegmentDeletes deletes : deletesMap.values()) {
- numTerms2 += deletes.numTermDeletes.get();
- bytesUsed2 += deletes.bytesUsed.get();
- }
- assert numTerms2 == numTerms.get(): "numTerms2=" + numTerms2 + " vs " + numTerms.get();
- assert bytesUsed2 == bytesUsed.get(): "bytesUsed2=" + bytesUsed2 + " vs " + bytesUsed;
- return true;
+
+ void clearDocIDs() {
+ bytesUsed.addAndGet(-docIDs.size()*BYTES_PER_DEL_DOCID);
+ docIDs.clear();
+ }
+
+ boolean any() {
+ return terms.size() > 0 || docIDs.size() > 0 || queries.size() > 0;
}
}
Added: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/BufferedDeletesStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/BufferedDeletesStream.java?rev=1073192&view=auto
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/BufferedDeletesStream.java (added)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/BufferedDeletesStream.java Tue Feb 22 01:00:39 2011
@@ -0,0 +1,465 @@
+package org.apache.lucene.index;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.Comparator;
+import java.util.Collections;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.lucene.index.IndexReader.AtomicReaderContext;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.search.Scorer;
+import org.apache.lucene.search.Weight;
+
+/* Tracks the stream of {@link BuffereDeletes}.
+ * When DocumensWriter flushes, its buffered
+ * deletes are appended to this stream. We later
+ * apply these deletes (resolve them to the actual
+ * docIDs, per segment) when a merge is started
+ * (only to the to-be-merged segments). We
+ * also apply to all segments when NRT reader is pulled,
+ * commit/close is called, or when too many deletes are
+ * buffered and must be flushed (by RAM usage or by count).
+ *
+ * Each packet is assigned a generation, and each flushed or
+ * merged segment is also assigned a generation, so we can
+ * track which BufferedDeletes packets to apply to any given
+ * segment. */
+
+class BufferedDeletesStream {
+
+ // TODO: maybe linked list?
+ private final List<FrozenBufferedDeletes> deletes = new ArrayList<FrozenBufferedDeletes>();
+
+ // Starts at 1 so that SegmentInfos that have never had
+ // deletes applied (whose bufferedDelGen defaults to 0)
+ // will be correct:
+ private long nextGen = 1;
+
+ // used only by assert
+ private Term lastDeleteTerm;
+
+ private PrintStream infoStream;
+ private final AtomicLong bytesUsed = new AtomicLong();
+ private final AtomicInteger numTerms = new AtomicInteger();
+ private final int messageID;
+
+ public BufferedDeletesStream(int messageID) {
+ this.messageID = messageID;
+ }
+
+ private synchronized void message(String message) {
+ if (infoStream != null) {
+ infoStream.println("BD " + messageID + " [" + new Date() + "; " + Thread.currentThread().getName() + "]: " + message);
+ }
+ }
+
+ public synchronized void setInfoStream(PrintStream infoStream) {
+ this.infoStream = infoStream;
+ }
+
+ // Appends a new packet of buffered deletes to the stream,
+ // setting its generation:
+ public synchronized void push(FrozenBufferedDeletes packet) {
+ assert packet.any();
+ assert checkDeleteStats();
+ assert packet.gen < nextGen;
+ deletes.add(packet);
+ numTerms.addAndGet(packet.numTermDeletes);
+ bytesUsed.addAndGet(packet.bytesUsed);
+ if (infoStream != null) {
+ message("push deletes " + packet + " delGen=" + packet.gen + " packetCount=" + deletes.size());
+ }
+ assert checkDeleteStats();
+ }
+
+ public synchronized void clear() {
+ deletes.clear();
+ nextGen = 1;
+ numTerms.set(0);
+ bytesUsed.set(0);
+ }
+
+ public boolean any() {
+ return bytesUsed.get() != 0;
+ }
+
+ public int numTerms() {
+ return numTerms.get();
+ }
+
+ public long bytesUsed() {
+ return bytesUsed.get();
+ }
+
+ public static class ApplyDeletesResult {
+ // True if any actual deletes took place:
+ public final boolean anyDeletes;
+
+ // Current gen, for the merged segment:
+ public final long gen;
+
+ // If non-null, contains segments that are 100% deleted
+ public final SegmentInfos allDeleted;
+
+ ApplyDeletesResult(boolean anyDeletes, long gen, SegmentInfos allDeleted) {
+ this.anyDeletes = anyDeletes;
+ this.gen = gen;
+ this.allDeleted = allDeleted;
+ }
+ }
+
+ // Sorts SegmentInfos from smallest to biggest bufferedDelGen:
+ private static final Comparator<SegmentInfo> sortByDelGen = new Comparator<SegmentInfo>() {
+ // @Override -- not until Java 1.6
+ public int compare(SegmentInfo si1, SegmentInfo si2) {
+ final long cmp = si1.getBufferedDeletesGen() - si2.getBufferedDeletesGen();
+ if (cmp > 0) {
+ return 1;
+ } else if (cmp < 0) {
+ return -1;
+ } else {
+ return 0;
+ }
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ return sortByDelGen == other;
+ }
+ };
+
+ /** Resolves the buffered deleted Term/Query/docIDs, into
+ * actual deleted docIDs in the deletedDocs BitVector for
+ * each SegmentReader. */
+ public synchronized ApplyDeletesResult applyDeletes(IndexWriter.ReaderPool readerPool, SegmentInfos infos) throws IOException {
+ final long t0 = System.currentTimeMillis();
+
+ if (infos.size() == 0) {
+ return new ApplyDeletesResult(false, nextGen++, null);
+ }
+
+ assert checkDeleteStats();
+
+ if (!any()) {
+ message("applyDeletes: no deletes; skipping");
+ return new ApplyDeletesResult(false, nextGen++, null);
+ }
+
+ if (infoStream != null) {
+ message("applyDeletes: infos=" + infos + " packetCount=" + deletes.size());
+ }
+
+ SegmentInfos infos2 = new SegmentInfos();
+ infos2.addAll(infos);
+ Collections.sort(infos2, sortByDelGen);
+
+ BufferedDeletes coalescedDeletes = null;
+ boolean anyNewDeletes = false;
+
+ int infosIDX = infos2.size()-1;
+ int delIDX = deletes.size()-1;
+
+ SegmentInfos allDeleted = null;
+
+ while (infosIDX >= 0) {
+ //System.out.println("BD: cycle delIDX=" + delIDX + " infoIDX=" + infosIDX);
+
+ final FrozenBufferedDeletes packet = delIDX >= 0 ? deletes.get(delIDX) : null;
+ final SegmentInfo info = infos2.get(infosIDX);
+ final long segGen = info.getBufferedDeletesGen();
+
+ if (packet != null && segGen < packet.gen) {
+ //System.out.println(" coalesce");
+ if (coalescedDeletes == null) {
+ coalescedDeletes = new BufferedDeletes(true);
+ }
+ coalescedDeletes.update(packet);
+ delIDX--;
+ } else if (packet != null && segGen == packet.gen) {
+ //System.out.println(" eq");
+
+ // Lock order: IW -> BD -> RP
+ assert readerPool.infoIsLive(info);
+ SegmentReader reader = readerPool.get(info, false);
+ int delCount = 0;
+ final boolean segAllDeletes;
+ try {
+ if (coalescedDeletes != null) {
+ //System.out.println(" del coalesced");
+ delCount += applyTermDeletes(coalescedDeletes.termsIterable(), reader);
+ delCount += applyQueryDeletes(coalescedDeletes.queriesIterable(), reader);
+ }
+ //System.out.println(" del exact");
+ // Don't delete by Term here; DocumentsWriter
+ // already did that on flush:
+ delCount += applyQueryDeletes(packet.queriesIterable(), reader);
+ segAllDeletes = reader.numDocs() == 0;
+ } finally {
+ readerPool.release(reader);
+ }
+ anyNewDeletes |= delCount > 0;
+
+ if (segAllDeletes) {
+ if (allDeleted == null) {
+ allDeleted = new SegmentInfos();
+ }
+ allDeleted.add(info);
+ }
+
+ if (infoStream != null) {
+ message("seg=" + info + " segGen=" + segGen + " segDeletes=[" + packet + "]; coalesced deletes=[" + (coalescedDeletes == null ? "null" : coalescedDeletes) + "] delCount=" + delCount + (segAllDeletes ? " 100% deleted" : ""));
+ }
+
+ if (coalescedDeletes == null) {
+ coalescedDeletes = new BufferedDeletes(true);
+ }
+ coalescedDeletes.update(packet);
+ delIDX--;
+ infosIDX--;
+ info.setBufferedDeletesGen(nextGen);
+
+ } else {
+ //System.out.println(" gt");
+
+ if (coalescedDeletes != null) {
+ // Lock order: IW -> BD -> RP
+ assert readerPool.infoIsLive(info);
+ SegmentReader reader = readerPool.get(info, false);
+ int delCount = 0;
+ final boolean segAllDeletes;
+ try {
+ delCount += applyTermDeletes(coalescedDeletes.termsIterable(), reader);
+ delCount += applyQueryDeletes(coalescedDeletes.queriesIterable(), reader);
+ segAllDeletes = reader.numDocs() == 0;
+ } finally {
+ readerPool.release(reader);
+ }
+ anyNewDeletes |= delCount > 0;
+
+ if (segAllDeletes) {
+ if (allDeleted == null) {
+ allDeleted = new SegmentInfos();
+ }
+ allDeleted.add(info);
+ }
+
+ if (infoStream != null) {
+ message("seg=" + info + " segGen=" + segGen + " coalesced deletes=[" + (coalescedDeletes == null ? "null" : coalescedDeletes) + "] delCount=" + delCount + (segAllDeletes ? " 100% deleted" : ""));
+ }
+ }
+ info.setBufferedDeletesGen(nextGen);
+
+ infosIDX--;
+ }
+ }
+
+ assert checkDeleteStats();
+ if (infoStream != null) {
+ message("applyDeletes took " + (System.currentTimeMillis()-t0) + " msec");
+ }
+ // assert infos != segmentInfos || !any() : "infos=" + infos + " segmentInfos=" + segmentInfos + " any=" + any;
+
+ return new ApplyDeletesResult(anyNewDeletes, nextGen++, allDeleted);
+ }
+
+ public synchronized long getNextGen() {
+ return nextGen++;
+ }
+
+ // Lock order IW -> BD
+ /* Removes any BufferedDeletes that we no longer need to
+ * store because all segments in the index have had the
+ * deletes applied. */
+ public synchronized void prune(SegmentInfos segmentInfos) {
+ assert checkDeleteStats();
+ long minGen = Long.MAX_VALUE;
+ for(SegmentInfo info : segmentInfos) {
+ minGen = Math.min(info.getBufferedDeletesGen(), minGen);
+ }
+
+ if (infoStream != null) {
+ message("prune sis=" + segmentInfos + " minGen=" + minGen + " packetCount=" + deletes.size());
+ }
+
+ final int limit = deletes.size();
+ for(int delIDX=0;delIDX<limit;delIDX++) {
+ if (deletes.get(delIDX).gen >= minGen) {
+ prune(delIDX);
+ assert checkDeleteStats();
+ return;
+ }
+ }
+
+ // All deletes pruned
+ prune(limit);
+ assert !any();
+ assert checkDeleteStats();
+ }
+
+ private synchronized void prune(int count) {
+ if (count > 0) {
+ if (infoStream != null) {
+ message("pruneDeletes: prune " + count + " packets; " + (deletes.size() - count) + " packets remain");
+ }
+ for(int delIDX=0;delIDX<count;delIDX++) {
+ final FrozenBufferedDeletes packet = deletes.get(delIDX);
+ numTerms.addAndGet(-packet.numTermDeletes);
+ assert numTerms.get() >= 0;
+ bytesUsed.addAndGet(-packet.bytesUsed);
+ assert bytesUsed.get() >= 0;
+ }
+ deletes.subList(0, count).clear();
+ }
+ }
+
+ // Delete by Term
+ private synchronized long applyTermDeletes(Iterable<Term> termsIter, SegmentReader reader) throws IOException {
+ long delCount = 0;
+ Fields fields = reader.fields();
+ if (fields == null) {
+ // This reader has no postings
+ return 0;
+ }
+
+ TermsEnum termsEnum = null;
+
+ String currentField = null;
+ DocsEnum docs = null;
+
+ assert checkDeleteTerm(null);
+
+ for (Term term : termsIter) {
+ // Since we visit terms sorted, we gain performance
+ // by re-using the same TermsEnum and seeking only
+ // forwards
+ if (term.field() != currentField) {
+ assert currentField == null || currentField.compareTo(term.field()) < 0;
+ currentField = term.field();
+ Terms terms = fields.terms(currentField);
+ if (terms != null) {
+ termsEnum = terms.iterator();
+ } else {
+ termsEnum = null;
+ }
+ }
+
+ if (termsEnum == null) {
+ continue;
+ }
+ assert checkDeleteTerm(term);
+
+ // System.out.println(" term=" + term);
+
+ if (termsEnum.seek(term.bytes(), false) == TermsEnum.SeekStatus.FOUND) {
+ DocsEnum docsEnum = termsEnum.docs(reader.getDeletedDocs(), docs);
+
+ if (docsEnum != null) {
+ while (true) {
+ final int docID = docsEnum.nextDoc();
+ if (docID == DocsEnum.NO_MORE_DOCS) {
+ break;
+ }
+ reader.deleteDocument(docID);
+ // TODO: we could/should change
+ // reader.deleteDocument to return boolean
+ // true if it did in fact delete, because here
+ // we could be deleting an already-deleted doc
+ // which makes this an upper bound:
+ delCount++;
+ }
+ }
+ }
+ }
+
+ return delCount;
+ }
+
+ public static class QueryAndLimit {
+ public final Query query;
+ public final int limit;
+ public QueryAndLimit(Query query, int limit) {
+ this.query = query;
+ this.limit = limit;
+ }
+ }
+
+ // Delete by query
+ private synchronized long applyQueryDeletes(Iterable<QueryAndLimit> queriesIter, SegmentReader reader) throws IOException {
+ long delCount = 0;
+ IndexSearcher searcher = new IndexSearcher(reader);
+ assert searcher.getTopReaderContext().isAtomic;
+ final AtomicReaderContext readerContext = (AtomicReaderContext) searcher.getTopReaderContext();
+ try {
+ for (QueryAndLimit ent : queriesIter) {
+ Query query = ent.query;
+ int limit = ent.limit;
+ Weight weight = query.weight(searcher);
+ Scorer scorer = weight.scorer(readerContext, Weight.ScorerContext.def());
+ if (scorer != null) {
+ while(true) {
+ int doc = scorer.nextDoc();
+ if (doc >= limit)
+ break;
+
+ reader.deleteDocument(doc);
+ // TODO: we could/should change
+ // reader.deleteDocument to return boolean
+ // true if it did in fact delete, because here
+ // we could be deleting an already-deleted doc
+ // which makes this an upper bound:
+ delCount++;
+ }
+ }
+ }
+ } finally {
+ searcher.close();
+ }
+
+ return delCount;
+ }
+
+ // used only by assert
+ private boolean checkDeleteTerm(Term term) {
+ if (term != null) {
+ assert lastDeleteTerm == null || term.compareTo(lastDeleteTerm) > 0: "lastTerm=" + lastDeleteTerm + " vs term=" + term;
+ }
+ lastDeleteTerm = term;
+ return true;
+ }
+
+ // only for assert
+ private boolean checkDeleteStats() {
+ int numTerms2 = 0;
+ long bytesUsed2 = 0;
+ for(FrozenBufferedDeletes packet : deletes) {
+ numTerms2 += packet.numTermDeletes;
+ bytesUsed2 += packet.bytesUsed;
+ }
+ assert numTerms2 == numTerms.get(): "numTerms2=" + numTerms2 + " vs " + numTerms.get();
+ assert bytesUsed2 == bytesUsed.get(): "bytesUsed2=" + bytesUsed2 + " vs " + bytesUsed;
+ return true;
+ }
+}
Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java?rev=1073192&r1=1073191&r2=1073192&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java Tue Feb 22 01:00:39 2011
@@ -145,7 +145,7 @@ public class ConcurrentMergeScheduler ex
/**
* Called whenever the running merges have changed, to pause & unpause
* threads. This method sorts the merge threads by their merge size in
- * descending order and then pauses/unpauses threads from first to lsat --
+ * descending order and then pauses/unpauses threads from first to last --
* that way, smaller merges are guaranteed to run before larger ones.
*/
protected synchronized void updateMergeThreads() {
@@ -308,10 +308,31 @@ public class ConcurrentMergeScheduler ex
// pending merges, until it's empty:
while (true) {
+ synchronized(this) {
+ long startStallTime = 0;
+ while (mergeThreadCount() >= 1+maxMergeCount) {
+ startStallTime = System.currentTimeMillis();
+ if (verbose()) {
+ message(" too many merges; stalling...");
+ }
+ try {
+ wait();
+ } catch (InterruptedException ie) {
+ throw new ThreadInterruptedException(ie);
+ }
+ }
+
+ if (verbose()) {
+ if (startStallTime != 0) {
+ message(" stalled for " + (System.currentTimeMillis()-startStallTime) + " msec");
+ }
+ }
+ }
+
+
// TODO: we could be careful about which merges to do in
// the BG (eg maybe the "biggest" ones) vs FG, which
// merges to do first (the easiest ones?), etc.
-
MergePolicy.OneMerge merge = writer.getNextMerge();
if (merge == null) {
if (verbose())
@@ -326,32 +347,11 @@ public class ConcurrentMergeScheduler ex
boolean success = false;
try {
synchronized(this) {
- final MergeThread merger;
- long startStallTime = 0;
- while (mergeThreadCount() >= maxMergeCount) {
- startStallTime = System.currentTimeMillis();
- if (verbose()) {
- message(" too many merges; stalling...");
- }
- try {
- wait();
- } catch (InterruptedException ie) {
- throw new ThreadInterruptedException(ie);
- }
- }
-
- if (verbose()) {
- if (startStallTime != 0) {
- message(" stalled for " + (System.currentTimeMillis()-startStallTime) + " msec");
- }
- message(" consider merge " + merge.segString(dir));
- }
-
- assert mergeThreadCount() < maxMergeCount;
+ message(" consider merge " + merge.segString(dir));
// OK to spawn a new merge thread to handle this
// merge:
- merger = getMergeThread(writer, merge);
+ final MergeThread merger = getMergeThread(writer, merge);
mergeThreads.add(merger);
if (verbose()) {
message(" launch new thread [" + merger.getName() + "]");
Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DirectoryReader.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DirectoryReader.java?rev=1073192&r1=1073191&r2=1073192&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DirectoryReader.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DirectoryReader.java Tue Feb 22 01:00:39 2011
@@ -27,6 +27,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.FieldSelector;
@@ -36,8 +37,7 @@ import org.apache.lucene.store.LockObtai
import org.apache.lucene.index.codecs.CodecProvider;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
-
-import org.apache.lucene.search.FieldCache; // not great (circular); used only to purge FieldCache entry on close
+import org.apache.lucene.util.MapBackedSet;
/**
* An IndexReader which reads indexes with multiple segments.
@@ -70,6 +70,8 @@ class DirectoryReader extends IndexReade
// opened on a past IndexCommit:
private long maxIndexVersion;
+ private final boolean applyAllDeletes;
+
// static IndexReader open(final Directory directory, final IndexDeletionPolicy deletionPolicy, final IndexCommit commit, final boolean readOnly,
// final int termInfosIndexDivisor) throws CorruptIndexException, IOException {
// return open(directory, deletionPolicy, commit, readOnly, termInfosIndexDivisor, null);
@@ -106,6 +108,8 @@ class DirectoryReader extends IndexReade
} else {
this.codecs = codecs;
}
+ readerFinishedListeners = new MapBackedSet<ReaderFinishedListener>(new ConcurrentHashMap<ReaderFinishedListener,Boolean>());
+ applyAllDeletes = false;
// To reduce the chance of hitting FileNotFound
// (and having to retry), we open segments in
@@ -117,6 +121,7 @@ class DirectoryReader extends IndexReade
boolean success = false;
try {
readers[i] = SegmentReader.get(readOnly, sis.info(i), termInfosIndexDivisor);
+ readers[i].readerFinishedListeners = readerFinishedListeners;
success = true;
} finally {
if (!success) {
@@ -136,37 +141,50 @@ class DirectoryReader extends IndexReade
}
// Used by near real-time search
- DirectoryReader(IndexWriter writer, SegmentInfos infos, int termInfosIndexDivisor, CodecProvider codecs) throws IOException {
+ DirectoryReader(IndexWriter writer, SegmentInfos infos, int termInfosIndexDivisor, CodecProvider codecs, boolean applyAllDeletes) throws IOException {
this.directory = writer.getDirectory();
this.readOnly = true;
- segmentInfos = (SegmentInfos) infos.clone();// make sure we clone otherwise we share mutable state with IW
+ this.applyAllDeletes = applyAllDeletes; // saved for reopen
+
this.termInfosIndexDivisor = termInfosIndexDivisor;
if (codecs == null) {
this.codecs = CodecProvider.getDefault();
} else {
this.codecs = codecs;
}
+ readerFinishedListeners = writer.getReaderFinishedListeners();
// IndexWriter synchronizes externally before calling
// us, which ensures infos will not change; so there's
// no need to process segments in reverse order
final int numSegments = infos.size();
- SegmentReader[] readers = new SegmentReader[numSegments];
+
+ List<SegmentReader> readers = new ArrayList<SegmentReader>();
final Directory dir = writer.getDirectory();
+ segmentInfos = (SegmentInfos) infos.clone();
+ int infosUpto = 0;
for (int i=0;i<numSegments;i++) {
boolean success = false;
try {
final SegmentInfo info = infos.info(i);
assert info.dir == dir;
- readers[i] = writer.readerPool.getReadOnlyClone(info, true, termInfosIndexDivisor);
+ final SegmentReader reader = writer.readerPool.getReadOnlyClone(info, true, termInfosIndexDivisor);
+ if (reader.numDocs() > 0 || writer.getKeepFullyDeletedSegments()) {
+ reader.readerFinishedListeners = readerFinishedListeners;
+ readers.add(reader);
+ infosUpto++;
+ } else {
+ reader.close();
+ segmentInfos.remove(infosUpto);
+ }
success = true;
} finally {
if (!success) {
// Close all readers we had opened:
- for(i--;i>=0;i--) {
+ for(SegmentReader reader : readers) {
try {
- readers[i].close();
+ reader.close();
} catch (Throwable ignore) {
// keep going - we want to clean up as much as possible
}
@@ -177,16 +195,20 @@ class DirectoryReader extends IndexReade
this.writer = writer;
- initialize(readers);
+ initialize(readers.toArray(new SegmentReader[readers.size()]));
}
/** This constructor is only used for {@link #reopen()} */
DirectoryReader(Directory directory, SegmentInfos infos, SegmentReader[] oldReaders, int[] oldStarts,
- boolean readOnly, boolean doClone, int termInfosIndexDivisor, CodecProvider codecs) throws IOException {
+ boolean readOnly, boolean doClone, int termInfosIndexDivisor, CodecProvider codecs,
+ Collection<ReaderFinishedListener> readerFinishedListeners) throws IOException {
this.directory = directory;
this.readOnly = readOnly;
this.segmentInfos = infos;
this.termInfosIndexDivisor = termInfosIndexDivisor;
+ this.readerFinishedListeners = readerFinishedListeners;
+ applyAllDeletes = false;
+
if (codecs == null) {
this.codecs = CodecProvider.getDefault();
} else {
@@ -232,8 +254,10 @@ class DirectoryReader extends IndexReade
// this is a new reader; in case we hit an exception we can close it safely
newReader = SegmentReader.get(readOnly, infos.info(i), termInfosIndexDivisor);
+ newReader.readerFinishedListeners = readerFinishedListeners;
} else {
newReader = newReaders[i].reopenSegment(infos.info(i), doClone, readOnly);
+ assert newReader.readerFinishedListeners == readerFinishedListeners;
}
if (newReader == newReaders[i]) {
// this reader will be shared between the old and the new one,
@@ -357,6 +381,7 @@ class DirectoryReader extends IndexReade
writeLock = null;
hasChanges = false;
}
+ assert newReader.readerFinishedListeners != null;
return newReader;
}
@@ -391,7 +416,9 @@ class DirectoryReader extends IndexReade
// TODO: right now we *always* make a new reader; in
// the future we could have write make some effort to
// detect that no changes have occurred
- return writer.getReader();
+ IndexReader reader = writer.getReader(applyAllDeletes);
+ reader.readerFinishedListeners = readerFinishedListeners;
+ return reader;
}
private IndexReader doReopen(final boolean openReadOnly, IndexCommit commit) throws CorruptIndexException, IOException {
@@ -458,7 +485,7 @@ class DirectoryReader extends IndexReade
private synchronized DirectoryReader doReopen(SegmentInfos infos, boolean doClone, boolean openReadOnly) throws CorruptIndexException, IOException {
DirectoryReader reader;
- reader = new DirectoryReader(directory, infos, subReaders, starts, openReadOnly, doClone, termInfosIndexDivisor, codecs);
+ reader = new DirectoryReader(directory, infos, subReaders, starts, openReadOnly, doClone, termInfosIndexDivisor, codecs, readerFinishedListeners);
return reader;
}
@@ -705,11 +732,18 @@ class DirectoryReader extends IndexReade
// case we have to roll back:
startCommit();
+ final SegmentInfos rollbackSegmentInfos = new SegmentInfos();
+ rollbackSegmentInfos.addAll(segmentInfos);
+
boolean success = false;
try {
for (int i = 0; i < subReaders.length; i++)
subReaders[i].commit();
+ // Remove segments that contain only 100% deleted
+ // docs:
+ segmentInfos.pruneDeletedSegments();
+
// Sync all files we just wrote
directory.sync(segmentInfos.files(directory, false));
segmentInfos.commit(directory);
@@ -729,6 +763,10 @@ class DirectoryReader extends IndexReade
// partially written .del files, etc, are
// removed):
deleter.refresh();
+
+ // Restore all SegmentInfos (in case we pruned some)
+ segmentInfos.clear();
+ segmentInfos.addAll(rollbackSegmentInfos);
}
}
@@ -805,11 +843,6 @@ class DirectoryReader extends IndexReade
}
}
- // NOTE: only needed in case someone had asked for
- // FieldCache for top-level reader (which is generally
- // not a good idea):
- FieldCache.DEFAULT.purge(this);
-
if (writer != null) {
// Since we just closed, writer may now be able to
// delete unused files:
Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocConsumer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocConsumer.java?rev=1073192&r1=1073191&r2=1073192&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocConsumer.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocConsumer.java Tue Feb 22 01:00:39 2011
@@ -20,9 +20,10 @@ package org.apache.lucene.index;
import java.io.IOException;
abstract class DocConsumer {
- abstract void processDocument() throws IOException;
+ abstract void processDocument(FieldInfos fieldInfos) throws IOException;
abstract void finishDocument() throws IOException;
abstract void flush(final SegmentWriteState state) throws IOException;
abstract void abort();
abstract boolean freeRAM();
+ abstract void doAfterFlush();
}
Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumer.java?rev=1073192&r1=1073191&r2=1073192&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumer.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumer.java Tue Feb 22 01:00:39 2011
@@ -21,9 +21,6 @@ import java.io.IOException;
import java.util.Map;
abstract class DocFieldConsumer {
-
- FieldInfos fieldInfos;
-
/** Called when DocumentsWriter decides to create a new
* segment */
abstract void flush(Map<FieldInfo, DocFieldConsumerPerField> fieldsToFlush, SegmentWriteState state) throws IOException;
@@ -42,7 +39,4 @@ abstract class DocFieldConsumer {
abstract void finishDocument() throws IOException;
- void setFieldInfos(FieldInfos fieldInfos) {
- this.fieldInfos = fieldInfos;
- }
}
Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumers.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumers.java?rev=1073192&r1=1073191&r2=1073192&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumers.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldConsumers.java Tue Feb 22 01:00:39 2011
@@ -36,13 +36,6 @@ final class DocFieldConsumers extends Do
}
@Override
- void setFieldInfos(FieldInfos fieldInfos) {
- super.setFieldInfos(fieldInfos);
- one.setFieldInfos(fieldInfos);
- two.setFieldInfos(fieldInfos);
- }
-
- @Override
public void flush(Map<FieldInfo, DocFieldConsumerPerField> fieldsToFlush, SegmentWriteState state) throws IOException {
Map<FieldInfo, DocFieldConsumerPerField> oneFieldsToFlush = new HashMap<FieldInfo, DocFieldConsumerPerField>();
@@ -79,16 +72,16 @@ final class DocFieldConsumers extends Do
try {
one.finishDocument();
} finally {
- two.finishDocument();
+ two.finishDocument();
}
}
-
+
@Override
public void startDocument() throws IOException {
one.startDocument();
two.startDocument();
}
-
+
@Override
public DocFieldConsumerPerField addField(FieldInfo fi) {
return new DocFieldConsumersPerField(this, fi, one.addField(fi), two.addField(fi));
Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldProcessor.java?rev=1073192&r1=1073191&r2=1073192&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldProcessor.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocFieldProcessor.java Tue Feb 22 01:00:39 2011
@@ -38,7 +38,6 @@ import org.apache.lucene.document.Fielda
final class DocFieldProcessor extends DocConsumer {
- final FieldInfos fieldInfos;
final DocFieldConsumer consumer;
final StoredFieldsWriter fieldsWriter;
@@ -60,9 +59,7 @@ final class DocFieldProcessor extends Do
public DocFieldProcessor(DocumentsWriterPerThread docWriter, DocFieldConsumer consumer) {
this.docState = docWriter.docState;
this.consumer = consumer;
- fieldInfos = docWriter.getFieldInfos();
- consumer.setFieldInfos(fieldInfos);
- fieldsWriter = new StoredFieldsWriter(docWriter, fieldInfos);
+ fieldsWriter = new StoredFieldsWriter(docWriter);
}
@Override
@@ -73,7 +70,6 @@ final class DocFieldProcessor extends Do
for (DocFieldConsumerPerField f : fields) {
childFields.put(f.getFieldInfo(), f);
}
- trimFields(state);
fieldsWriter.flush(state);
consumer.flush(childFields, state);
@@ -83,7 +79,7 @@ final class DocFieldProcessor extends Do
// FreqProxTermsWriter does this with
// FieldInfo.storePayload.
final String fileName = IndexFileNames.segmentFileName(state.segmentName, "", IndexFileNames.FIELD_INFOS_EXTENSION);
- fieldInfos.write(state.directory, fileName);
+ state.fieldInfos.write(state.directory, fileName);
}
@Override
@@ -122,43 +118,13 @@ final class DocFieldProcessor extends Do
return fields;
}
- /** If there are fields we've seen but did not see again
- * in the last run, then free them up. */
-
- void trimFields(SegmentWriteState state) {
-
- for(int i=0;i<fieldHash.length;i++) {
- DocFieldProcessorPerField perField = fieldHash[i];
- DocFieldProcessorPerField lastPerField = null;
-
- while (perField != null) {
-
- if (perField.lastGen == -1) {
-
- // This field was not seen since the previous
- // flush, so, free up its resources now
-
- // Unhash
- if (lastPerField == null)
- fieldHash[i] = perField.next;
- else
- lastPerField.next = perField.next;
-
- if (state.infoStream != null) {
- state.infoStream.println(" purge field=" + perField.fieldInfo.name);
- }
-
- totalFieldCount--;
-
- } else {
- // Reset
- perField.lastGen = -1;
- lastPerField = perField;
- }
-
- perField = perField.next;
- }
- }
+ /** In flush we reset the fieldHash to not maintain per-field state
+ * across segments */
+ @Override
+ void doAfterFlush() {
+ fieldHash = new DocFieldProcessorPerField[2];
+ hashMask = 1;
+ totalFieldCount = 0;
}
private void rehash() {
@@ -185,7 +151,7 @@ final class DocFieldProcessor extends Do
}
@Override
- public void processDocument() throws IOException {
+ public void processDocument(FieldInfos fieldInfos) throws IOException {
consumer.startDocument();
fieldsWriter.startDocument();
Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocInverter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocInverter.java?rev=1073192&r1=1073191&r2=1073192&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocInverter.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocInverter.java Tue Feb 22 01:00:39 2011
@@ -66,13 +66,6 @@ final class DocInverter extends DocField
}
@Override
- void setFieldInfos(FieldInfos fieldInfos) {
- super.setFieldInfos(fieldInfos);
- consumer.setFieldInfos(fieldInfos);
- endConsumer.setFieldInfos(fieldInfos);
- }
-
- @Override
void flush(Map<FieldInfo, DocFieldConsumerPerField> fieldsToFlush, SegmentWriteState state) throws IOException {
Map<FieldInfo, InvertedDocConsumerPerField> childFieldsToFlush = new HashMap<FieldInfo, InvertedDocConsumerPerField>();
Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java?rev=1073192&r1=1073191&r2=1073192&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java Tue Feb 22 01:00:39 2011
@@ -33,6 +33,7 @@ import org.apache.lucene.search.Query;
import org.apache.lucene.search.SimilarityProvider;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
+import org.apache.lucene.util.BitVector;
/**
* This class accepts multiple added documents and directly
@@ -126,24 +127,21 @@ final class DocumentsWriter {
// non-zero we will flush by RAM usage instead.
private int maxBufferedDocs = IndexWriterConfig.DEFAULT_MAX_BUFFERED_DOCS;
- private final FieldInfos fieldInfos;
-
- final BufferedDeletes bufferedDeletes;
- SegmentDeletes pendingDeletes;
+ final BufferedDeletesStream bufferedDeletesStream;
+ // TODO: cutover to BytesRefHash
+ private BufferedDeletes pendingDeletes = new BufferedDeletes(false);
final IndexingChain chain;
final DocumentsWriterPerThreadPool perThreadPool;
- DocumentsWriter(Directory directory, IndexWriter writer, IndexingChain chain, DocumentsWriterPerThreadPool indexerThreadPool, FieldInfos fieldInfos, BufferedDeletes bufferedDeletes) throws IOException {
+ DocumentsWriter(Directory directory, IndexWriter writer, IndexingChain chain, DocumentsWriterPerThreadPool indexerThreadPool, FieldInfos fieldInfos, BufferedDeletesStream bufferedDeletesStream) throws IOException {
this.directory = directory;
this.indexWriter = writer;
this.similarityProvider = writer.getConfig().getSimilarityProvider();
- this.fieldInfos = fieldInfos;
- this.bufferedDeletes = bufferedDeletes;
+ this.bufferedDeletesStream = bufferedDeletesStream;
this.perThreadPool = indexerThreadPool;
- this.pendingDeletes = new SegmentDeletes();
this.chain = chain;
- this.perThreadPool.initialize(this);
+ this.perThreadPool.initialize(this, fieldInfos);
}
boolean deleteQueries(final Query... queries) throws IOException {
@@ -164,7 +162,7 @@ final class DocumentsWriter {
if (!deleted) {
synchronized(this) {
for (Query query : queries) {
- pendingDeletes.addQuery(query, SegmentDeletes.MAX_INT);
+ pendingDeletes.addQuery(query, BufferedDeletes.MAX_INT);
}
}
}
@@ -194,7 +192,7 @@ final class DocumentsWriter {
if (!deleted) {
synchronized(this) {
for (Term term : terms) {
- pendingDeletes.addTerm(term, SegmentDeletes.MAX_INT);
+ pendingDeletes.addTerm(term, BufferedDeletes.MAX_INT);
}
}
}
@@ -202,6 +200,9 @@ final class DocumentsWriter {
return false;
}
+ // TODO: we could check w/ FreqProxTermsWriter: if the
+ // term doesn't exist, don't bother buffering into the
+ // per-DWPT map (but still must go into the global map)
boolean deleteTerm(final Term term) throws IOException {
return deleteTerms(term);
}
@@ -226,16 +227,6 @@ final class DocumentsWriter {
return deleted;
}
- public FieldInfos getFieldInfos() {
- return fieldInfos;
- }
-
- /** Returns true if any of the fields in the current
- * buffered docs have omitTermFreqAndPositions==false */
- boolean hasProx() {
- return fieldInfos.hasProx();
- }
-
/** If non-null, various details of indexing are printed
* here. */
synchronized void setInfoStream(PrintStream infoStream) {
@@ -396,7 +387,8 @@ final class DocumentsWriter {
ensureOpen();
SegmentInfo newSegment = null;
- SegmentDeletes segmentDeletes = null;
+ BufferedDeletes segmentDeletes = null;
+ BitVector deletedDocs = null;
ThreadState perThread = perThreadPool.getAndLock(Thread.currentThread(), this, doc);
try {
@@ -407,10 +399,10 @@ final class DocumentsWriter {
newSegment = finishAddDocument(dwpt, perThreadRAMUsedBeforeAdd);
if (newSegment != null) {
- fieldInfos.update(dwpt.getFieldInfos());
+ deletedDocs = dwpt.flushState.deletedDocs;
if (dwpt.pendingDeletes.any()) {
segmentDeletes = dwpt.pendingDeletes;
- dwpt.pendingDeletes = new SegmentDeletes();
+ dwpt.pendingDeletes = new BufferedDeletes(false);
}
}
} finally {
@@ -423,7 +415,7 @@ final class DocumentsWriter {
if (newSegment != null) {
perThreadPool.clearThreadBindings(perThread);
- indexWriter.addFlushedSegment(newSegment);
+ indexWriter.addFlushedSegment(newSegment, deletedDocs);
return true;
}
@@ -460,17 +452,23 @@ final class DocumentsWriter {
}
}
- private final void pushDeletes(SegmentInfo segmentInfo, SegmentDeletes segmentDeletes) {
+ private final void pushDeletes(SegmentInfo segmentInfo, BufferedDeletes segmentDeletes) {
synchronized(indexWriter) {
// Lock order: DW -> BD
+ final long delGen = bufferedDeletesStream.getNextGen();
if (segmentDeletes.any()) {
- if (segmentInfo != null) {
- bufferedDeletes.pushDeletes(segmentDeletes, segmentInfo);
- } else if (indexWriter.segmentInfos.size() > 0) {
+ if (indexWriter.segmentInfos.size() > 0 || segmentInfo != null) {
+ final FrozenBufferedDeletes packet = new FrozenBufferedDeletes(segmentDeletes, delGen);
+ if (infoStream != null) {
+ message("flush: push buffered deletes");
+ }
+ bufferedDeletesStream.push(packet);
if (infoStream != null) {
- message("flush: push buffered deletes to previously flushed segment " + indexWriter.segmentInfos.lastElement());
+ message("flush: delGen=" + packet.gen);
+ }
+ if (segmentInfo != null) {
+ segmentInfo.setBufferedDeletesGen(packet.gen);
}
- bufferedDeletes.pushDeletes(segmentDeletes, indexWriter.segmentInfos.lastElement(), true);
} else {
if (infoStream != null) {
message("flush: drop buffered deletes: no segments");
@@ -479,6 +477,8 @@ final class DocumentsWriter {
// there are no segments, the deletions cannot
// affect anything.
}
+ } else if (segmentInfo != null) {
+ segmentInfo.setBufferedDeletesGen(delGen);
}
}
}
@@ -489,7 +489,7 @@ final class DocumentsWriter {
if (flushDeletes) {
synchronized (this) {
pushDeletes(null, pendingDeletes);
- pendingDeletes = new SegmentDeletes();
+ pendingDeletes = new BufferedDeletes(false);
}
}
@@ -498,7 +498,8 @@ final class DocumentsWriter {
while (threadsIterator.hasNext()) {
SegmentInfo newSegment = null;
- SegmentDeletes segmentDeletes = null;
+ BufferedDeletes segmentDeletes = null;
+ BitVector deletedDocs = null;
ThreadState perThread = threadsIterator.next();
perThread.lock();
@@ -519,17 +520,17 @@ final class DocumentsWriter {
newSegment = dwpt.flush();
if (newSegment != null) {
- fieldInfos.update(dwpt.getFieldInfos());
anythingFlushed = true;
+ deletedDocs = dwpt.flushState.deletedDocs;
perThreadPool.clearThreadBindings(perThread);
if (dwpt.pendingDeletes.any()) {
segmentDeletes = dwpt.pendingDeletes;
- dwpt.pendingDeletes = new SegmentDeletes();
+ dwpt.pendingDeletes = new BufferedDeletes(false);
}
}
} else if (flushDeletes && dwpt.pendingDeletes.any()) {
segmentDeletes = dwpt.pendingDeletes;
- dwpt.pendingDeletes = new SegmentDeletes();
+ dwpt.pendingDeletes = new BufferedDeletes(false);
}
} finally {
perThread.unlock();
@@ -543,7 +544,7 @@ final class DocumentsWriter {
if (newSegment != null) {
// important do unlock the perThread before finishFlushedSegment
// is called to prevent deadlock on IndexWriter mutex
- indexWriter.addFlushedSegment(newSegment);
+ indexWriter.addFlushedSegment(newSegment, deletedDocs);
}
}
Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java?rev=1073192&r1=1073191&r2=1073192&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java Tue Feb 22 01:00:39 2011
@@ -30,6 +30,7 @@ import org.apache.lucene.document.Docume
import org.apache.lucene.search.Query;
import org.apache.lucene.search.SimilarityProvider;
import org.apache.lucene.store.Directory;
+import org.apache.lucene.util.BitVector;
import org.apache.lucene.util.ByteBlockPool.DirectAllocator;
import org.apache.lucene.util.RamUsageEstimator;
@@ -81,7 +82,7 @@ public class DocumentsWriterPerThread {
};
// Deletes for our still-in-RAM (to be flushed next) segment
- SegmentDeletes pendingDeletes = new SegmentDeletes();
+ BufferedDeletes pendingDeletes = new BufferedDeletes(false);
static class DocState {
final DocumentsWriterPerThread docWriter;
@@ -147,13 +148,13 @@ public class DocumentsWriterPerThread {
final AtomicLong bytesUsed = new AtomicLong(0);
- private final FieldInfos fieldInfos;
+ private FieldInfos fieldInfos;
- public DocumentsWriterPerThread(Directory directory, DocumentsWriter parent, IndexingChain indexingChain) {
+ public DocumentsWriterPerThread(Directory directory, DocumentsWriter parent, FieldInfos fieldInfos, IndexingChain indexingChain) {
this.directory = directory;
this.parent = parent;
+ this.fieldInfos = fieldInfos.newFieldInfosWithGlobalFieldNumberMap();
this.writer = parent.indexWriter;
- this.fieldInfos = new FieldInfos();
this.infoStream = parent.indexWriter.getInfoStream();
this.docState = new DocState(this);
this.docState.similarityProvider = parent.indexWriter.getConfig().getSimilarityProvider();
@@ -185,7 +186,7 @@ public class DocumentsWriterPerThread {
boolean success = false;
try {
- consumer.processDocument();
+ consumer.processDocument(fieldInfos);
success = true;
} finally {
@@ -244,13 +245,6 @@ public class DocumentsWriterPerThread {
return numDocsInRAM;
}
- /** Returns true if any of the fields in the current
- * buffered docs have omitTermFreqAndPositions==false */
- boolean hasProx() {
- return (docFieldProcessor != null) ? docFieldProcessor.fieldInfos.hasProx()
- : true;
- }
-
SegmentCodecs getCodec() {
return flushState.segmentCodecs;
}
@@ -258,6 +252,8 @@ public class DocumentsWriterPerThread {
/** Reset after a flush */
private void doAfterFlush() throws IOException {
segment = null;
+ consumer.doAfterFlush();
+ fieldInfos = fieldInfos.newFieldInfosWithGlobalFieldNumberMap();
parent.substractFlushedNumDocs(numDocsInRAM);
numDocsInRAM = 0;
}
@@ -268,7 +264,19 @@ public class DocumentsWriterPerThread {
flushState = new SegmentWriteState(infoStream, directory, segment, fieldInfos,
numDocsInRAM, writer.getConfig().getTermIndexInterval(),
- SegmentCodecs.build(fieldInfos, writer.codecs));
+ SegmentCodecs.build(fieldInfos, writer.codecs), pendingDeletes);
+
+ // Apply delete-by-docID now (delete-byDocID only
+ // happens when an exception is hit processing that
+ // doc, eg if analyzer has some problem w/ the text):
+ if (pendingDeletes.docIDs.size() > 0) {
+ flushState.deletedDocs = new BitVector(numDocsInRAM);
+ for(int delDocID : pendingDeletes.docIDs) {
+ flushState.deletedDocs.set(delDocID);
+ }
+ pendingDeletes.bytesUsed.addAndGet(-pendingDeletes.docIDs.size() * BufferedDeletes.BYTES_PER_DEL_DOCID);
+ pendingDeletes.docIDs.clear();
+ }
if (infoStream != null) {
message("flush postings as segment " + flushState.segmentName + " numDocs=" + numDocsInRAM);
@@ -285,11 +293,12 @@ public class DocumentsWriterPerThread {
try {
- SegmentInfo newSegment = new SegmentInfo(segment, flushState.numDocs, directory, false, fieldInfos.hasProx(), flushState.segmentCodecs, false);
+ SegmentInfo newSegment = new SegmentInfo(segment, flushState.numDocs, directory, false, flushState.segmentCodecs, fieldInfos);
consumer.flush(flushState);
- newSegment.setHasVectors(flushState.hasVectors);
+ newSegment.clearFilesCache();
if (infoStream != null) {
+ message("new segment has " + flushState.deletedDocs.count() + " deleted docs");
message("new segment has " + (flushState.hasVectors ? "vectors" : "no vectors"));
message("flushedFiles=" + newSegment.files());
message("flushed codecs=" + newSegment.getSegmentCodecs());
Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java?rev=1073192&r1=1073191&r2=1073192&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java Tue Feb 22 01:00:39 2011
@@ -24,9 +24,9 @@ public abstract class DocumentsWriterPer
numThreadStatesActive = 0;
}
- public void initialize(DocumentsWriter documentsWriter) {
+ public void initialize(DocumentsWriter documentsWriter, FieldInfos fieldInfos) {
for (int i = 0; i < perThreads.length; i++) {
- perThreads[i] = new ThreadState(new DocumentsWriterPerThread(documentsWriter.directory, documentsWriter, documentsWriter.chain));
+ perThreads[i] = new ThreadState(new DocumentsWriterPerThread(documentsWriter.directory, documentsWriter, fieldInfos, documentsWriter.chain));
}
}
Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FieldInfo.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FieldInfo.java?rev=1073192&r1=1073191&r2=1073192&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FieldInfo.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/FieldInfo.java Tue Feb 22 01:00:39 2011
@@ -32,7 +32,7 @@ public final class FieldInfo {
public boolean omitTermFreqAndPositions;
public boolean storePayloads; // whether this field stores payloads together with term positions
- int codecId = 0; // set inside SegmentCodecs#build() during segment flush - this is used to identify the codec used to write this field
+ private int codecId = -1; // set inside SegmentCodecs#build() during segment flush - this is used to identify the codec used to write this field
FieldInfo(String na, boolean tk, int nu, boolean storeTermVector,
boolean storePositionWithTermVector, boolean storeOffsetWithTermVector,
@@ -57,10 +57,21 @@ public final class FieldInfo {
}
}
+ public void setCodecId(int codecId) {
+ assert this.codecId == -1 : "CodecId can only be set once.";
+ this.codecId = codecId;
+ }
+
+ public int getCodecId() {
+ return codecId;
+ }
+
@Override
public Object clone() {
- return new FieldInfo(name, isIndexed, number, storeTermVector, storePositionWithTermVector,
+ FieldInfo clone = new FieldInfo(name, isIndexed, number, storeTermVector, storePositionWithTermVector,
storeOffsetWithTermVector, omitNorms, storePayloads, omitTermFreqAndPositions);
+ clone.codecId = this.codecId;
+ return clone;
}
void update(boolean isIndexed, boolean storeTermVector, boolean storePositionWithTermVector,