You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by rm...@apache.org on 2011/08/23 16:07:19 UTC
svn commit: r1160700 [6/22] - in /lucene/dev/branches/flexscoring: ./
dev-tools/eclipse/ dev-tools/idea/.idea/ dev-tools/idea/lucene/contrib/
dev-tools/idea/lucene/contrib/demo/
dev-tools/idea/lucene/contrib/highlighter/ dev-tools/idea/lucene/contrib/q...
Modified: lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java?rev=1160700&r1=1160699&r2=1160700&view=diff
==============================================================================
--- lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java (original)
+++ lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java Tue Aug 23 14:06:58 2011
@@ -18,6 +18,7 @@ package org.apache.lucene.index;
*/
import java.io.IOException;
import java.util.ArrayList;
+import java.util.List;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
@@ -339,7 +340,11 @@ public final class DocumentsWriterFlushC
* Returns the number of delete terms in the global pool
*/
public int getNumGlobalTermDeletes() {
- return documentsWriter.deleteQueue.numGlobalTermDeletes();
+ return documentsWriter.deleteQueue.numGlobalTermDeletes() + documentsWriter.indexWriter.bufferedDeletesStream.numTerms();
+ }
+
+ public long getDeleteBytesUsed() {
+ return documentsWriter.deleteQueue.bytesUsed() + documentsWriter.indexWriter.bufferedDeletesStream.bytesUsed();
}
synchronized int numFlushingDWPT() {
@@ -358,10 +363,25 @@ public final class DocumentsWriterFlushC
return this.perThreadPool.getActiveThreadState();
}
+ ThreadState obtainAndLock() {
+ final ThreadState perThread = perThreadPool.getAndLock(Thread
+ .currentThread(), documentsWriter);
+ if (perThread.isActive()
+ && perThread.perThread.deleteQueue != documentsWriter.deleteQueue) {
+ // There is a flush-all in process and this DWPT is
+ // now stale -- enroll it for flush and try for
+ // another DWPT:
+ addFlushableState(perThread);
+ }
+ // simply return the ThreadState even in a flush all case sine we already hold the lock
+ return perThread;
+ }
+
void markForFullFlush() {
final DocumentsWriterDeleteQueue flushingQueue;
synchronized (this) {
- assert !fullFlush;
+ assert !fullFlush : "called DWFC#markForFullFlush() while full flush is still running";
+ assert fullFlushBuffer.isEmpty() : "full flush buffer should be empty: "+ fullFlushBuffer;
fullFlush = true;
flushingQueue = documentsWriter.deleteQueue;
// Set a new delete queue - all subsequent DWPT will use this queue until
@@ -369,9 +389,7 @@ public final class DocumentsWriterFlushC
DocumentsWriterDeleteQueue newQueue = new DocumentsWriterDeleteQueue(flushingQueue.generation+1);
documentsWriter.deleteQueue = newQueue;
}
- final Iterator<ThreadState> allActiveThreads = perThreadPool
- .getActivePerThreadsIterator();
- final ArrayList<DocumentsWriterPerThread> toFlush = new ArrayList<DocumentsWriterPerThread>();
+ final Iterator<ThreadState> allActiveThreads = perThreadPool.getActivePerThreadsIterator();
while (allActiveThreads.hasNext()) {
final ThreadState next = allActiveThreads.next();
next.lock();
@@ -391,25 +409,7 @@ public final class DocumentsWriterFlushC
// this one is already a new DWPT
continue;
}
- if (next.perThread.getNumDocsInRAM() > 0 ) {
- final DocumentsWriterPerThread dwpt = next.perThread; // just for assert
- synchronized (this) {
- if (!next.flushPending) {
- setFlushPending(next);
- }
- final DocumentsWriterPerThread flushingDWPT = internalTryCheckOutForFlush(next);
- assert flushingDWPT != null : "DWPT must never be null here since we hold the lock and it holds documents";
- assert dwpt == flushingDWPT : "flushControl returned different DWPT";
- toFlush.add(flushingDWPT);
- }
- } else {
- if (closed) {
- next.resetWriter(null); // make this state inactive
- } else {
- // get the new delete queue from DW
- next.perThread.initialize();
- }
- }
+ addFlushableState(next);
} finally {
next.unlock();
}
@@ -421,9 +421,55 @@ public final class DocumentsWriterFlushC
* blocking indexing.*/
pruneBlockedQueue(flushingQueue);
assert assertBlockedFlushes(documentsWriter.deleteQueue);
- flushQueue.addAll(toFlush);
+ flushQueue.addAll(fullFlushBuffer);
+ fullFlushBuffer.clear();
stallControl.updateStalled(this);
}
+ assert assertActiveDeleteQueue(documentsWriter.deleteQueue);
+ }
+
+ private boolean assertActiveDeleteQueue(DocumentsWriterDeleteQueue queue) {
+ final Iterator<ThreadState> allActiveThreads = perThreadPool.getActivePerThreadsIterator();
+ while (allActiveThreads.hasNext()) {
+ final ThreadState next = allActiveThreads.next();
+ next.lock();
+ try {
+ assert !next.isActive() || next.perThread.deleteQueue == queue;
+ } finally {
+ next.unlock();
+ }
+ }
+ return true;
+ }
+
+ private final List<DocumentsWriterPerThread> fullFlushBuffer = new ArrayList<DocumentsWriterPerThread>();
+
+ void addFlushableState(ThreadState perThread) {
+ if (documentsWriter.infoStream != null) {
+ documentsWriter.message("FC: " + Thread.currentThread().getName() + ": addFlushableState " + perThread.perThread);
+ }
+ final DocumentsWriterPerThread dwpt = perThread.perThread;
+ assert perThread.isHeldByCurrentThread();
+ assert perThread.isActive();
+ assert fullFlush;
+ assert dwpt.deleteQueue != documentsWriter.deleteQueue;
+ if (dwpt.getNumDocsInRAM() > 0) {
+ synchronized(this) {
+ if (!perThread.flushPending) {
+ setFlushPending(perThread);
+ }
+ final DocumentsWriterPerThread flushingDWPT = internalTryCheckOutForFlush(perThread);
+ assert flushingDWPT != null : "DWPT must never be null here since we hold the lock and it holds documents";
+ assert dwpt == flushingDWPT : "flushControl returned different DWPT";
+ fullFlushBuffer.add(flushingDWPT);
+ }
+ } else {
+ if (closed) {
+ perThread.resetWriter(null); // make this state inactive
+ } else {
+ dwpt.initialize();
+ }
+ }
}
/**
@@ -498,7 +544,7 @@ public final class DocumentsWriterFlushC
/**
* Returns <code>true</code> if a full flush is currently running
*/
- synchronized boolean isFullFlush() { // used by assert
+ synchronized boolean isFullFlush() {
return fullFlush;
}
Modified: lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java?rev=1160700&r1=1160699&r2=1160700&view=diff
==============================================================================
--- lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java (original)
+++ lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java Tue Aug 23 14:06:58 2011
@@ -47,7 +47,7 @@ public class DocumentsWriterPerThread {
abstract static class IndexingChain {
abstract DocConsumer getChain(DocumentsWriterPerThread documentsWriterPerThread);
}
-
+
static final IndexingChain defaultIndexingChain = new IndexingChain() {
@@ -131,7 +131,7 @@ public class DocumentsWriterPerThread {
hasAborted = aborting = true;
try {
if (infoStream != null) {
- message("docWriter: now abort");
+ message("now abort");
}
try {
consumer.abort();
@@ -146,11 +146,11 @@ public class DocumentsWriterPerThread {
} finally {
aborting = false;
if (infoStream != null) {
- message("docWriter: done abort");
+ message("done abort");
}
}
}
-
+ private final static boolean INFO_VERBOSE = false;
final DocumentsWriter parent;
final IndexWriter writer;
final Directory directory;
@@ -188,7 +188,7 @@ public class DocumentsWriterPerThread {
bytesUsed = new AtomicLong(0);
byteBlockAllocator = new DirectTrackingAllocator(bytesUsed);
consumer = indexingChain.getChain(this);
- pendingDeletes = new BufferedDeletes(false);
+ pendingDeletes = new BufferedDeletes();
initialize();
}
@@ -223,8 +223,14 @@ public class DocumentsWriterPerThread {
// this call is synchronized on IndexWriter.segmentInfos
segment = writer.newSegmentName();
assert numDocsInRAM == 0;
+ if (INFO_VERBOSE) {
+ message(Thread.currentThread().getName() + " init seg=" + segment + " delQueue=" + deleteQueue);
+ }
+
+ }
+ if (INFO_VERBOSE) {
+ message(Thread.currentThread().getName() + " update delTerm=" + delTerm + " docID=" + docState.docID + " seg=" + segment);
}
-
boolean success = false;
try {
try {
@@ -265,8 +271,13 @@ public class DocumentsWriterPerThread {
// this call is synchronized on IndexWriter.segmentInfos
segment = writer.newSegmentName();
assert numDocsInRAM == 0;
+ if (INFO_VERBOSE) {
+ message(Thread.currentThread().getName() + " init seg=" + segment + " delQueue=" + deleteQueue);
+ }
+ }
+ if (INFO_VERBOSE) {
+ message(Thread.currentThread().getName() + " update delTerm=" + delTerm + " docID=" + docState.docID + " seg=" + segment);
}
-
int docCount = 0;
try {
for(Document doc : docs) {
@@ -476,7 +487,7 @@ public class DocumentsWriterPerThread {
segmentDeletes = null;
} else {
segmentDeletes = pendingDeletes;
- pendingDeletes = new BufferedDeletes(false);
+ pendingDeletes = new BufferedDeletes();
}
if (infoStream != null) {
@@ -552,4 +563,11 @@ public class DocumentsWriterPerThread {
this.infoStream = infoStream;
docState.infoStream = infoStream;
}
+
+ @Override
+ public String toString() {
+ return "DocumentsWriterPerThread [pendingDeletes=" + pendingDeletes
+ + ", segment=" + segment + ", aborting=" + aborting + ", numDocsInRAM="
+ + numDocsInRAM + ", deleteQueue=" + deleteQueue + "]";
+ }
}
Modified: lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java?rev=1160700&r1=1160699&r2=1160700&view=diff
==============================================================================
--- lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java (original)
+++ lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/FlushByRamOrCountsPolicy.java Tue Aug 23 14:06:58 2011
@@ -60,15 +60,11 @@ public class FlushByRamOrCountsPolicy ex
}
}
final DocumentsWriter writer = this.writer.get();
- // If deletes alone are consuming > 1/2 our RAM
- // buffer, force them all to apply now. This is to
- // prevent too-frequent flushing of a long tail of
- // tiny segments:
if ((flushOnRAM() &&
- writer.deleteQueue.bytesUsed() > (1024*1024*indexWriterConfig.getRAMBufferSizeMB()/2))) {
+ control.getDeleteBytesUsed() > (1024*1024*indexWriterConfig.getRAMBufferSizeMB()))) {
control.setApplyAllDeletes();
if (writer.infoStream != null) {
- writer.message("force apply deletes bytesUsed=" + writer.deleteQueue.bytesUsed() + " vs ramBuffer=" + (1024*1024*indexWriterConfig.getRAMBufferSizeMB()));
+ writer.message("force apply deletes bytesUsed=" + control.getDeleteBytesUsed() + " vs ramBuffer=" + (1024*1024*indexWriterConfig.getRAMBufferSizeMB()));
}
}
}
@@ -82,8 +78,12 @@ public class FlushByRamOrCountsPolicy ex
control.setFlushPending(state);
} else if (flushOnRAM()) {// flush by RAM
final long limit = (long) (indexWriterConfig.getRAMBufferSizeMB() * 1024.d * 1024.d);
- final long totalRam = control.activeBytes();
+ final long totalRam = control.activeBytes() + control.getDeleteBytesUsed();
if (totalRam >= limit) {
+ final DocumentsWriter writer = this.writer.get();
+ if (writer.infoStream != null) {
+ writer.message("flush: activeBytes=" + control.activeBytes() + " deleteBytes=" + control.getDeleteBytesUsed() + " vs limit=" + limit);
+ }
markLargestWriterPending(control, state, totalRam);
}
}
Modified: lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriter.java?rev=1160700&r1=1160699&r2=1160700&view=diff
==============================================================================
--- lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriter.java (original)
+++ lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/FreqProxTermsWriter.java Tue Aug 23 14:06:58 2011
@@ -26,6 +26,7 @@ import org.apache.lucene.index.FieldInfo
import org.apache.lucene.index.codecs.FieldsConsumer;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.CollectionUtil;
+import org.apache.lucene.util.IOUtils;
final class FreqProxTermsWriter extends TermsHashConsumer {
@@ -58,6 +59,8 @@ final class FreqProxTermsWriter extends
final FieldsConsumer consumer = state.segmentCodecs.codec().fieldsConsumer(state);
+ boolean success = false;
+
try {
TermsHash termsHash = null;
@@ -100,8 +103,9 @@ final class FreqProxTermsWriter extends
if (termsHash != null) {
termsHash.reset();
}
+ success = true;
} finally {
- consumer.close();
+ IOUtils.closeSafely(!success, consumer);
}
}
Modified: lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/FrozenBufferedDeletes.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/FrozenBufferedDeletes.java?rev=1160700&r1=1160699&r2=1160700&view=diff
==============================================================================
--- lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/FrozenBufferedDeletes.java (original)
+++ lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/FrozenBufferedDeletes.java Tue Aug 23 14:06:58 2011
@@ -21,31 +21,23 @@ import java.util.Iterator;
import java.util.Map;
import org.apache.lucene.search.Query;
+import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.RamUsageEstimator;
import org.apache.lucene.index.BufferedDeletesStream.QueryAndLimit;
/** Holds buffered deletes by term or query, once pushed.
- * Pushed deltes are write-once, so we shift to more
+ * Pushed deletes are write-once, so we shift to more
* memory efficient data structure to hold them. We don't
* hold docIDs because these are applied on flush. */
class FrozenBufferedDeletes {
- /* Rough logic: Term is object w/
- String field and BytesRef text (OBJ_HEADER + 2*POINTER).
- String field is (OBJ_HEADER + 4*INT +
- POINTER + OBJ_HEADER + CHAR*field.length).
- Term's text is BytesRef (OBJ_HEADER + 2*INT + POINTER +
- OBJ_HEADER + bytes.length). */
- final static int BYTES_PER_DEL_TERM = 4*RamUsageEstimator.NUM_BYTES_OBJECT_REF + 5*RamUsageEstimator.NUM_BYTES_OBJECT_HEADER + 6*RamUsageEstimator.NUM_BYTES_INT;
-
/* Query we often undercount (say 24 bytes), plus int. */
final static int BYTES_PER_DEL_QUERY = RamUsageEstimator.NUM_BYTES_OBJECT_REF + RamUsageEstimator.NUM_BYTES_INT + 24;
// Terms, in sorted order:
- // TODO: would be more RAM efficient to store BytesRef[],
- // per field:
- final Term[] terms;
+ final PrefixCodedTerms terms;
+ int termCount; // just for debugging
// Parallel array of deleted query, and the docIDUpto for
// each
@@ -62,7 +54,15 @@ class FrozenBufferedDeletes {
public FrozenBufferedDeletes(BufferedDeletes deletes, boolean isSegmentPrivate) {
this.isSegmentPrivate = isSegmentPrivate;
assert !isSegmentPrivate || deletes.terms.size() == 0 : "segment private package should only have del queries";
- terms = deletes.terms.keySet().toArray(new Term[deletes.terms.size()]);
+ Term termsArray[] = deletes.terms.keySet().toArray(new Term[deletes.terms.size()]);
+ termCount = termsArray.length;
+ ArrayUtil.mergeSort(termsArray);
+ PrefixCodedTerms.Builder builder = new PrefixCodedTerms.Builder();
+ for (Term term : termsArray) {
+ builder.add(term);
+ }
+ terms = builder.finish();
+
queries = new Query[deletes.queries.size()];
queryLimits = new int[deletes.queries.size()];
int upto = 0;
@@ -71,13 +71,8 @@ class FrozenBufferedDeletes {
queryLimits[upto] = ent.getValue();
upto++;
}
- int termDataBytes = 0;
- for(Map.Entry<Term,Integer> ent : deletes.terms.entrySet()) {
- final Term term = ent.getKey();
- termDataBytes += term.bytes().length;
- termDataBytes += term.field().length() * RamUsageEstimator.NUM_BYTES_CHAR;
- }
- bytesUsed = terms.length * BYTES_PER_DEL_TERM + queries.length * BYTES_PER_DEL_QUERY + termDataBytes;
+
+ bytesUsed = (int) terms.getSizeInBytes() + queries.length * BYTES_PER_DEL_QUERY;
numTermDeletes = deletes.numTermDeletes.get();
}
@@ -95,24 +90,7 @@ class FrozenBufferedDeletes {
return new Iterable<Term>() {
@Override
public Iterator<Term> iterator() {
- return new Iterator<Term>() {
- private int upto;
-
- @Override
- public boolean hasNext() {
- return upto < terms.length;
- }
-
- @Override
- public Term next() {
- return terms[upto++];
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- };
+ return terms.iterator();
}
};
}
@@ -149,7 +127,7 @@ class FrozenBufferedDeletes {
public String toString() {
String s = "";
if (numTermDeletes != 0) {
- s += " " + numTermDeletes + " deleted terms (unique count=" + terms.length + ")";
+ s += " " + numTermDeletes + " deleted terms (unique count=" + termCount + ")";
}
if (queries.length != 0) {
s += " " + queries.length + " deleted queries";
@@ -162,6 +140,6 @@ class FrozenBufferedDeletes {
}
boolean any() {
- return terms.length > 0 || queries.length > 0;
+ return termCount > 0 || queries.length > 0;
}
}
Modified: lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/IndexFileDeleter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/IndexFileDeleter.java?rev=1160700&r1=1160699&r2=1160700&view=diff
==============================================================================
--- lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/IndexFileDeleter.java (original)
+++ lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/IndexFileDeleter.java Tue Aug 23 14:06:58 2011
@@ -447,7 +447,7 @@ final class IndexFileDeleter {
public void checkpoint(SegmentInfos segmentInfos, boolean isCommit) throws IOException {
if (infoStream != null) {
- message("now checkpoint \"" + segmentInfos + "\" [" + segmentInfos.size() + " segments " + "; isCommit = " + isCommit + "]");
+ message("now checkpoint \"" + segmentInfos.toString(directory) + "\" [" + segmentInfos.size() + " segments " + "; isCommit = " + isCommit + "]");
}
// Try again now to delete any previously un-deletable
Modified: lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/IndexUpgrader.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/IndexUpgrader.java?rev=1160700&r1=1160699&r2=1160700&view=diff
==============================================================================
--- lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/IndexUpgrader.java (original)
+++ lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/IndexUpgrader.java Tue Aug 23 14:06:58 2011
@@ -114,12 +114,12 @@ public final class IndexUpgrader {
}
public void upgrade() throws IOException {
- if (!IndexReader.indexExists(dir)) {
+ if (!IndexReader.indexExists(dir, iwc.getCodecProvider())) {
throw new IndexNotFoundException(dir.toString());
}
if (!deletePriorCommits) {
- final Collection<IndexCommit> commits = IndexReader.listCommits(dir);
+ final Collection<IndexCommit> commits = DirectoryReader.listCommits(dir, iwc.getCodecProvider());
if (commits.size() > 1) {
throw new IllegalArgumentException("This tool was invoked to not delete prior commit points, but the following commits were found: " + commits);
}
Modified: lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/IndexWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/IndexWriter.java?rev=1160700&r1=1160699&r2=1160700&view=diff
==============================================================================
--- lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/IndexWriter.java (original)
+++ lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/IndexWriter.java Tue Aug 23 14:06:58 2011
@@ -53,7 +53,6 @@ import org.apache.lucene.store.Lock;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.store.MergeInfo;
import org.apache.lucene.util.BitVector;
-import org.apache.lucene.util.Bits;
import org.apache.lucene.util.Constants;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.StringHelper;
@@ -355,7 +354,7 @@ public class IndexWriter implements Clos
poolReaders = true;
final IndexReader r;
doBeforeFlush();
- final boolean anySegmentFlushed;
+ boolean anySegmentFlushed = false;
/*
* for releasing a NRT reader we must ensure that
* DW doesn't add any segments or deletes until we are
@@ -378,14 +377,18 @@ public class IndexWriter implements Clos
// just like we do when loading segments_N
synchronized(this) {
maybeApplyDeletes(applyAllDeletes);
- r = new DirectoryReader(this, segmentInfos, config.getReaderTermsIndexDivisor(), codecs, applyAllDeletes);
+ r = new DirectoryReader(this, segmentInfos, codecs, applyAllDeletes);
if (infoStream != null) {
message("return reader version=" + r.getVersion() + " reader=" + r);
}
}
+ } catch (OutOfMemoryError oom) {
+ handleOOM(oom, "getReader");
+ // never reached but javac disagrees:
+ return null;
} finally {
if (!success && infoStream != null) {
- message("hit exception during while NRT reader");
+ message("hit exception during NRT reader");
}
// Done: finish the full flush!
docWriter.finishFullFlush(success);
@@ -416,18 +419,48 @@ public class IndexWriter implements Clos
* has been called on this instance). */
class ReaderPool {
+
+ final class SegmentCacheKey {
+ public final SegmentInfo si;
+ public final IOContext.Context context;
+
+ public SegmentCacheKey(SegmentInfo segInfo, IOContext.Context context) {
+ assert context == IOContext.Context.MERGE || context == IOContext.Context.READ;
+ this.si = segInfo;
+ this.context = context;
+ }
+
+ @Override
+ public int hashCode() {
+ return si.hashCode() + context.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return "SegmentCacheKey(" + si + "," + context + ")";
+ }
+
+ @Override
+ public boolean equals(Object _other) {
+ if (!(_other instanceof SegmentCacheKey)) {
+ return false;
+ }
+ final SegmentCacheKey other = (SegmentCacheKey) _other;
+ return si.equals(other.si) && context == other.context;
+ }
+ }
- private final Map<SegmentInfo,SegmentReader> readerMap = new HashMap<SegmentInfo,SegmentReader>();
+ private final Map<SegmentCacheKey,SegmentReader> readerMap = new HashMap<SegmentCacheKey,SegmentReader>();
/** Forcefully clear changes for the specified segments. This is called on successful merge. */
synchronized void clear(List<SegmentInfo> infos) throws IOException {
if (infos == null) {
- for (Map.Entry<SegmentInfo,SegmentReader> ent: readerMap.entrySet()) {
+ for (Map.Entry<SegmentCacheKey,SegmentReader> ent: readerMap.entrySet()) {
ent.getValue().hasChanges = false;
}
} else {
for (final SegmentInfo info: infos) {
- final SegmentReader r = readerMap.get(info);
+ final SegmentReader r = readerMap.get(new SegmentCacheKey(info, IOContext.Context.MERGE));
if (r != null) {
r.hasChanges = false;
}
@@ -437,9 +470,13 @@ public class IndexWriter implements Clos
// used only by asserts
public synchronized boolean infoIsLive(SegmentInfo info) {
+ return infoIsLive(info, "");
+ }
+
+ public synchronized boolean infoIsLive(SegmentInfo info, String message) {
int idx = segmentInfos.indexOf(info);
- assert idx != -1: "info=" + info + " isn't in pool";
- assert segmentInfos.info(idx) == info: "info=" + info + " doesn't match live info in segmentInfos";
+ assert idx != -1: "info=" + info + " isn't live: " + message;
+ assert segmentInfos.info(idx) == info: "info=" + info + " doesn't match live info in segmentInfos: " + message;
return true;
}
@@ -460,8 +497,8 @@ public class IndexWriter implements Clos
* @param sr
* @throws IOException
*/
- public synchronized boolean release(SegmentReader sr) throws IOException {
- return release(sr, false);
+ public synchronized boolean release(SegmentReader sr, IOContext.Context context) throws IOException {
+ return release(sr, false, context);
}
/**
@@ -474,10 +511,32 @@ public class IndexWriter implements Clos
* @throws IOException
*/
public synchronized boolean release(SegmentReader sr, boolean drop) throws IOException {
+ final SegmentCacheKey cacheKey = new SegmentCacheKey(sr.getSegmentInfo(), IOContext.Context.READ);
+ final SegmentReader other = readerMap.get(cacheKey);
+ if (sr == other) {
+ return release(sr, drop, IOContext.Context.READ);
+ } else {
+ assert sr == readerMap.get(new SegmentCacheKey(sr.getSegmentInfo(), IOContext.Context.MERGE));
+ return release(sr, drop, IOContext.Context.MERGE);
+ }
+ }
+
+ /**
+ * Release the segment reader (i.e. decRef it and close if there
+ * are no more references.
+ * @return true if this release altered the index (eg
+ * the SegmentReader had pending changes to del docs and
+ * was closed). Caller must call checkpoint() if so.
+ * @param sr
+ * @throws IOException
+ */
+ public synchronized boolean release(SegmentReader sr, boolean drop, IOContext.Context context) throws IOException {
- final boolean pooled = readerMap.containsKey(sr.getSegmentInfo());
+ SegmentCacheKey cacheKey = new SegmentCacheKey(sr.getSegmentInfo(), context);
+
+ final boolean pooled = readerMap.containsKey(cacheKey);
- assert !pooled || readerMap.get(sr.getSegmentInfo()) == sr;
+ assert !pooled || readerMap.get(cacheKey) == sr;
// Drop caller's ref; for an external reader (not
// pooled), this decRef will close it
@@ -492,9 +551,12 @@ public class IndexWriter implements Clos
// Discard (don't save) changes when we are dropping
// the reader; this is used only on the sub-readers
// after a successful merge.
- sr.hasChanges &= !drop;
-
- final boolean hasChanges = sr.hasChanges;
+ final boolean hasChanges;
+ if (drop) {
+ hasChanges = sr.hasChanges = false;
+ } else {
+ hasChanges = sr.hasChanges;
+ }
// Drop our ref -- this will commit any pending
// changes to the dir
@@ -502,7 +564,20 @@ public class IndexWriter implements Clos
// We are the last ref to this reader; since we're
// not pooling readers, we release it:
- readerMap.remove(sr.getSegmentInfo());
+ readerMap.remove(cacheKey);
+
+ if (drop && context == IOContext.Context.MERGE) {
+ // Also drop the READ reader if present: we don't
+ // need its deletes since they've been carried
+ // over to the merged segment
+ cacheKey = new SegmentCacheKey(sr.getSegmentInfo(), IOContext.Context.READ);
+ SegmentReader sr2 = readerMap.get(cacheKey);
+ if (sr2 != null) {
+ readerMap.remove(cacheKey);
+ sr2.hasChanges = false;
+ sr2.close();
+ }
+ }
return hasChanges;
}
@@ -511,16 +586,26 @@ public class IndexWriter implements Clos
}
public synchronized void drop(List<SegmentInfo> infos) throws IOException {
+ drop(infos, IOContext.Context.READ);
+ drop(infos, IOContext.Context.MERGE);
+ }
+
+ public synchronized void drop(List<SegmentInfo> infos, IOContext.Context context) throws IOException {
for(SegmentInfo info : infos) {
- drop(info);
+ drop(info, context);
}
}
public synchronized void drop(SegmentInfo info) throws IOException {
- final SegmentReader sr = readerMap.get(info);
- if (sr != null) {
+ drop(info, IOContext.Context.READ);
+ drop(info, IOContext.Context.MERGE);
+ }
+
+ public synchronized void drop(SegmentInfo info, IOContext.Context context) throws IOException {
+ final SegmentReader sr;
+ if ((sr = readerMap.remove(new SegmentCacheKey(info, context))) != null) {
sr.hasChanges = false;
- readerMap.remove(info);
+ readerMap.remove(new SegmentCacheKey(info, context));
sr.close();
}
}
@@ -532,14 +617,14 @@ public class IndexWriter implements Clos
// sync'd on IW:
assert Thread.holdsLock(IndexWriter.this);
- Iterator<Map.Entry<SegmentInfo,SegmentReader>> iter = readerMap.entrySet().iterator();
+ Iterator<Map.Entry<SegmentCacheKey,SegmentReader>> iter = readerMap.entrySet().iterator();
while (iter.hasNext()) {
- Map.Entry<SegmentInfo,SegmentReader> ent = iter.next();
+ Map.Entry<SegmentCacheKey,SegmentReader> ent = iter.next();
SegmentReader sr = ent.getValue();
if (sr.hasChanges) {
- assert infoIsLive(sr.getSegmentInfo());
+ assert infoIsLive(sr.getSegmentInfo(), "key=" + ent.getKey());
sr.doCommit(null);
// Must checkpoint w/ deleter, because this
@@ -567,10 +652,9 @@ public class IndexWriter implements Clos
// We invoke deleter.checkpoint below, so we must be
// sync'd on IW:
assert Thread.holdsLock(IndexWriter.this);
-
+
for (SegmentInfo info : infos) {
-
- final SegmentReader sr = readerMap.get(info);
+ final SegmentReader sr = readerMap.get(new SegmentCacheKey(info, IOContext.Context.READ));
if (sr != null && sr.hasChanges) {
assert infoIsLive(info);
sr.doCommit(null);
@@ -582,13 +666,17 @@ public class IndexWriter implements Clos
}
}
+ public synchronized SegmentReader getReadOnlyClone(SegmentInfo info, IOContext context) throws IOException {
+ return getReadOnlyClone(info, true, context);
+ }
+
/**
* Returns a ref to a clone. NOTE: this clone is not
* enrolled in the pool, so you should simply close()
* it when you're done (ie, do not call release()).
*/
- public synchronized SegmentReader getReadOnlyClone(SegmentInfo info, boolean doOpenStores, int termInfosIndexDivisor, IOContext context) throws IOException {
- SegmentReader sr = get(info, doOpenStores, context, termInfosIndexDivisor);
+ public synchronized SegmentReader getReadOnlyClone(SegmentInfo info, boolean doOpenStores, IOContext context) throws IOException {
+ SegmentReader sr = get(info, doOpenStores, context);
try {
return (SegmentReader) sr.clone(true);
} finally {
@@ -596,62 +684,37 @@ public class IndexWriter implements Clos
}
}
- /**
- * Obtain a SegmentReader from the readerPool. The reader
- * must be returned by calling {@link #release(SegmentReader)}
- * @see #release(SegmentReader)
- * @param info
- * @param doOpenStores
- * @throws IOException
- */
- public synchronized SegmentReader get(SegmentInfo info, boolean doOpenStores, IOContext context) throws IOException {
- return get(info, doOpenStores, context, config.getReaderTermsIndexDivisor());
+ public synchronized SegmentReader get(SegmentInfo info, IOContext context) throws IOException {
+ return get(info, true, context);
}
/**
* Obtain a SegmentReader from the readerPool. The reader
* must be returned by calling {@link #release(SegmentReader)}
- *
* @see #release(SegmentReader)
* @param info
* @param doOpenStores
- * @param readBufferSize
- * @param termsIndexDivisor
* @throws IOException
*/
- public synchronized SegmentReader get(SegmentInfo info, boolean doOpenStores, IOContext context, int termsIndexDivisor) throws IOException {
-
- // if (poolReaders) {
- // readBufferSize = BufferedIndexInput.BUFFER_SIZE;
- // }
-
- // TODO: context should be part of the key used to cache that reader in the pool.
+ public synchronized SegmentReader get(SegmentInfo info, boolean doOpenStores, IOContext context) throws IOException {
- SegmentReader sr = readerMap.get(info);
+ SegmentCacheKey cacheKey = new SegmentCacheKey(info, context.context);
+ SegmentReader sr = readerMap.get(cacheKey);
if (sr == null) {
// TODO: we may want to avoid doing this while
// synchronized
// Returns a ref, which we xfer to readerMap:
- sr = SegmentReader.get(false, info.dir, info, doOpenStores, termsIndexDivisor, context);
+ sr = SegmentReader.get(false, info.dir, info, doOpenStores, context.context == IOContext.Context.MERGE ? -1 : config.getReaderTermsIndexDivisor(), context);
sr.readerFinishedListeners = readerFinishedListeners;
if (info.dir == directory) {
// Only pool if reader is not external
- readerMap.put(info, sr);
+ readerMap.put(cacheKey, sr);
}
} else {
if (doOpenStores) {
sr.openDocStores();
}
- if (termsIndexDivisor != -1) {
- // If this reader was originally opened because we
- // needed to merge it, we didn't load the terms
- // index. But now, if the caller wants the terms
- // index (eg because it's doing deletes, or an NRT
- // reader is being opened) we ask the reader to
- // load its terms index.
- sr.loadTermsIndex(termsIndexDivisor);
- }
}
// Return a ref to our caller
@@ -664,13 +727,23 @@ public class IndexWriter implements Clos
// Returns a ref
public synchronized SegmentReader getIfExists(SegmentInfo info) throws IOException {
- SegmentReader sr = readerMap.get(info);
+ SegmentReader sr = getIfExists(info, IOContext.Context.READ);
+ if (sr == null) {
+ sr = getIfExists(info, IOContext.Context.MERGE);
+ }
+ return sr;
+ }
+
+ // Returns a ref
+ public synchronized SegmentReader getIfExists(SegmentInfo info, IOContext.Context context) throws IOException {
+ SegmentCacheKey cacheKey = new SegmentCacheKey(info, context);
+ SegmentReader sr = readerMap.get(cacheKey);
if (sr != null) {
sr.incRef();
}
return sr;
}
- }
+ }
/**
* Obtain the number of deleted docs for a pooled reader.
@@ -681,13 +754,16 @@ public class IndexWriter implements Clos
SegmentReader reader = readerPool.getIfExists(info);
try {
if (reader != null) {
- return reader.numDeletedDocs();
+ // the pulled reader could be from an in-flight merge
+ // while the info we see has already new applied deletes after a commit
+ // we max out the delets since deletes never shrink
+ return Math.max(info.getDelCount(), reader.numDeletedDocs());
} else {
return info.getDelCount();
}
} finally {
if (reader != null) {
- readerPool.release(reader);
+ readerPool.release(reader, false);
}
}
}
@@ -759,6 +835,11 @@ public class IndexWriter implements Clos
bufferedDeletesStream.setInfoStream(infoStream);
poolReaders = conf.getReaderPooling();
+ writeLock = directory.makeLock(WRITE_LOCK_NAME);
+
+ if (!writeLock.obtain(conf.getWriteLockTimeout())) // obtain write lock
+ throw new LockObtainFailedException("Index locked for write: " + writeLock);
+
OpenMode mode = conf.getOpenMode();
boolean create;
if (mode == OpenMode.CREATE) {
@@ -769,12 +850,6 @@ public class IndexWriter implements Clos
// CREATE_OR_APPEND - create only if an index does not exist
create = !IndexReader.indexExists(directory);
}
-
- writeLock = directory.makeLock(WRITE_LOCK_NAME);
-
- if (!writeLock.obtain(conf.getWriteLockTimeout())) // obtain write lock
- throw new LockObtainFailedException("Index locked for write: " + writeLock);
-
boolean success = false;
// If index is too old, reading the segments will throw
@@ -1105,8 +1180,6 @@ public class IndexWriter implements Clos
/** Returns the Directory used by this index. */
public Directory getDirectory() {
- // Pass false because the flush during closing calls getDirectory
- ensureOpen(false);
return directory;
}
@@ -2269,6 +2342,10 @@ public class IndexWriter implements Clos
FrozenBufferedDeletes packet, FrozenBufferedDeletes globalPacket) throws IOException {
// Lock order IW -> BDS
synchronized (bufferedDeletesStream) {
+ if (infoStream != null) {
+ message("publishFlushedSegment");
+ }
+
if (globalPacket != null && globalPacket.any()) {
bufferedDeletesStream.push(globalPacket);
}
@@ -2282,6 +2359,9 @@ public class IndexWriter implements Clos
// generation right away
nextGen = bufferedDeletesStream.getNextGen();
}
+ if (infoStream != null) {
+ message("publish sets newSegment delGen=" + nextGen);
+ }
newSegment.setBufferedDeletesGen(nextGen);
segmentInfos.add(newSegment);
checkpoint();
@@ -2638,19 +2718,90 @@ public class IndexWriter implements Clos
*/
public final void prepareCommit(Map<String,String> commitUserData) throws CorruptIndexException, IOException {
+ if (infoStream != null) {
+ message("prepareCommit: flush");
+ message(" index before flush " + segString());
+ }
+
if (hitOOM) {
throw new IllegalStateException("this writer hit an OutOfMemoryError; cannot commit");
}
- if (pendingCommit != null)
+ if (pendingCommit != null) {
throw new IllegalStateException("prepareCommit was already called with no corresponding call to commit");
+ }
- if (infoStream != null)
- message("prepareCommit: flush");
+ doBeforeFlush();
+ assert testPoint("startDoFlush");
+ SegmentInfos toCommit = null;
+ boolean anySegmentsFlushed = false;
- flush(true, true);
+ // This is copied from doFlush, except it's modified to
+ // clone & incRef the flushed SegmentInfos inside the
+ // sync block:
- startCommit(commitUserData);
+ try {
+
+ synchronized (fullFlushLock) {
+ boolean flushSuccess = false;
+ boolean success = false;
+ try {
+ anySegmentsFlushed = docWriter.flushAllThreads();
+ if (!anySegmentsFlushed) {
+ // prevent double increment since docWriter#doFlush increments the flushcount
+ // if we flushed anything.
+ flushCount.incrementAndGet();
+ }
+ flushSuccess = true;
+
+ synchronized(this) {
+ maybeApplyDeletes(true);
+
+ readerPool.commit(segmentInfos);
+
+ // Must clone the segmentInfos while we still
+ // hold fullFlushLock and while sync'd so that
+ // no partial changes (eg a delete w/o
+ // corresponding add from an updateDocument) can
+ // sneak into the commit point:
+ toCommit = (SegmentInfos) segmentInfos.clone();
+
+ pendingCommitChangeCount = changeCount;
+
+ // This protects the segmentInfos we are now going
+ // to commit. This is important in case, eg, while
+ // we are trying to sync all referenced files, a
+ // merge completes which would otherwise have
+ // removed the files we are now syncing.
+ deleter.incRef(toCommit, false);
+ }
+ success = true;
+ } finally {
+ if (!success && infoStream != null) {
+ message("hit exception during prepareCommit");
+ }
+ // Done: finish the full flush!
+ docWriter.finishFullFlush(flushSuccess);
+ doAfterFlush();
+ }
+ }
+ } catch (OutOfMemoryError oom) {
+ handleOOM(oom, "prepareCommit");
+ }
+
+ boolean success = false;
+ try {
+ if (anySegmentsFlushed) {
+ maybeMerge();
+ }
+ success = true;
+ } finally {
+ if (!success) {
+ deleter.decRef(toCommit);
+ }
+ }
+
+ startCommit(toCommit, commitUserData);
}
// Used only by commit, below; lock order is commitLock -> IW
@@ -2841,19 +2992,18 @@ public class IndexWriter implements Clos
} else if (infoStream != null) {
message("don't apply deletes now delTermCount=" + bufferedDeletesStream.numTerms() + " bytesUsed=" + bufferedDeletesStream.bytesUsed());
}
-
}
final synchronized void applyAllDeletes() throws IOException {
flushDeletesCount.incrementAndGet();
- final BufferedDeletesStream.ApplyDeletesResult result = bufferedDeletesStream
- .applyDeletes(readerPool, segmentInfos.asList());
+ final BufferedDeletesStream.ApplyDeletesResult result;
+ result = bufferedDeletesStream.applyDeletes(readerPool, segmentInfos.asList());
if (result.anyDeletes) {
checkpoint();
}
if (!keepFullyDeletedSegments && result.allDeleted != null) {
if (infoStream != null) {
- message("drop 100% deleted segments: " + result.allDeleted);
+ message("drop 100% deleted segments: " + segString(result.allDeleted));
}
for (SegmentInfo info : result.allDeleted) {
// If a merge has already registered for this
@@ -2929,16 +3079,27 @@ public class IndexWriter implements Clos
for(int i=0; i < sourceSegments.size(); i++) {
SegmentInfo info = sourceSegments.get(i);
minGen = Math.min(info.getBufferedDeletesGen(), minGen);
- int docCount = info.docCount;
- final SegmentReader previousReader = merge.readerClones.get(i);
- if (previousReader == null) {
- // Reader was skipped because it was 100% deletions
- continue;
- }
- final Bits prevLiveDocs = previousReader.getLiveDocs();
- final SegmentReader currentReader = merge.readers.get(i);
- final Bits currentLiveDocs = currentReader.getLiveDocs();
- if (previousReader.hasDeletions()) {
+ final int docCount = info.docCount;
+ final BitVector prevLiveDocs = merge.readerLiveDocs.get(i);
+ final BitVector currentLiveDocs;
+ {
+ final SegmentReader currentReader = readerPool.getIfExists(info, IOContext.Context.READ);
+ if (currentReader != null) {
+ currentLiveDocs = (BitVector) currentReader.getLiveDocs();
+ readerPool.release(currentReader, false, IOContext.Context.READ);
+ } else {
+ assert readerPool.infoIsLive(info);
+ if (info.hasDeletions()) {
+ currentLiveDocs = new BitVector(directory,
+ info.getDelFileName(),
+ new IOContext(IOContext.Context.READ));
+ } else {
+ currentLiveDocs = null;
+ }
+ }
+ }
+
+ if (prevLiveDocs != null) {
// There were deletes on this segment when the merge
// started. The merge has collapsed away those
@@ -2947,14 +3108,14 @@ public class IndexWriter implements Clos
// newly flushed deletes but mapping them to the new
// docIDs.
- if (currentReader.numDeletedDocs() > previousReader.numDeletedDocs()) {
- // This means this segment has had new deletes
- // committed since we started the merge, so we
+ if (currentLiveDocs.count() < prevLiveDocs.count()) {
+ // This means this segment received new deletes
+ // since we started the merge, so we
// must merge them:
for(int j=0;j<docCount;j++) {
- if (!prevLiveDocs.get(j))
+ if (!prevLiveDocs.get(j)) {
assert !currentLiveDocs.get(j);
- else {
+ } else {
if (!currentLiveDocs.get(j)) {
mergedReader.doDelete(docUpto);
delCount++;
@@ -2963,9 +3124,10 @@ public class IndexWriter implements Clos
}
}
} else {
- docUpto += docCount - previousReader.numDeletedDocs();
+ assert currentLiveDocs.count() == prevLiveDocs.count(): "currentLiveDocs.count()==" + currentLiveDocs.count() + " vs prevLiveDocs.count()=" + prevLiveDocs.count() + " info=" + info;
+ docUpto += currentLiveDocs.count();
}
- } else if (currentReader.hasDeletions()) {
+ } else if (currentLiveDocs != null) {
// This segment had no deletes before but now it
// does:
for(int j=0; j<docCount; j++) {
@@ -2975,9 +3137,10 @@ public class IndexWriter implements Clos
}
docUpto++;
}
- } else
+ } else {
// No deletes before or after
docUpto += info.docCount;
+ }
}
assert mergedReader.numDeletedDocs() == delCount;
@@ -3373,13 +3536,14 @@ public class IndexWriter implements Clos
private final synchronized void closeMergeReaders(MergePolicy.OneMerge merge, boolean suppressExceptions) throws IOException {
final int numSegments = merge.readers.size();
Throwable th = null;
-
+
boolean anyChanges = false;
boolean drop = !suppressExceptions;
+
for (int i = 0; i < numSegments; i++) {
if (merge.readers.get(i) != null) {
try {
- anyChanges |= readerPool.release(merge.readers.get(i), drop);
+ anyChanges |= readerPool.release(merge.readers.get(i), drop, IOContext.Context.MERGE);
} catch (Throwable t) {
if (th == null) {
th = t;
@@ -3387,20 +3551,6 @@ public class IndexWriter implements Clos
}
merge.readers.set(i, null);
}
-
- if (i < merge.readerClones.size() && merge.readerClones.get(i) != null) {
- try {
- merge.readerClones.get(i).close();
- } catch (Throwable t) {
- if (th == null) {
- th = t;
- }
- }
- // This was a private clone and we had the
- // only reference
- assert merge.readerClones.get(i).getRefCount() == 0: "refCount should be 0 but is " + merge.readerClones.get(i).getRefCount();
- merge.readerClones.set(i, null);
- }
}
if (suppressExceptions && anyChanges) {
@@ -3416,6 +3566,27 @@ public class IndexWriter implements Clos
}
}
+ private synchronized BitVector getLiveDocsClone(SegmentInfo info, SegmentReader other) throws IOException {
+ final SegmentReader delReader = readerPool.getIfExists(info, IOContext.Context.READ);
+ BitVector liveDocs;
+ if (delReader != null) {
+ liveDocs = (BitVector) delReader.getLiveDocs();
+ readerPool.release(delReader, false, IOContext.Context.READ);
+ if (liveDocs != null) {
+ // We clone the del docs because other
+ // deletes may come in while we're merging. We
+ // need frozen deletes while merging, and then
+ // we carry over any new deletions in
+ // commitMergedDeletes.
+ liveDocs = (BitVector) liveDocs.clone();
+ }
+ } else {
+ liveDocs = (BitVector) other.getLiveDocs();
+ }
+
+ return liveDocs;
+ }
+
/** Does the actual (time-consuming) work of the merge,
* but without holding synchronized lock on IndexWriter
* instance */
@@ -3440,7 +3611,8 @@ public class IndexWriter implements Clos
}
merge.readers = new ArrayList<SegmentReader>();
- merge.readerClones = new ArrayList<SegmentReader>();
+ merge.readerLiveDocs = new ArrayList<BitVector>();
+
// This is try/finally to make sure merger's readers are
// closed:
boolean success = false;
@@ -3453,20 +3625,17 @@ public class IndexWriter implements Clos
// Hold onto the "live" reader; we will use this to
// commit merged deletes
- final SegmentReader reader = readerPool.get(info, true,
- context,
- -config.getReaderTermsIndexDivisor());
+ final SegmentReader reader = readerPool.get(info, context);
+
+ // Carefully pull the most recent live docs:
+ final BitVector liveDocs = getLiveDocsClone(info, reader);
+
+ merge.readerLiveDocs.add(liveDocs);
merge.readers.add(reader);
- // We clone the segment readers because other
- // deletes may come in while we're merging so we
- // need readers that will not change
- final SegmentReader clone = (SegmentReader) reader.clone(true);
- merge.readerClones.add(clone);
-
- if (clone.numDocs() > 0) {
- merger.add(clone);
- totDocCount += clone.numDocs();
+ if (liveDocs == null || liveDocs.count() > 0) {
+ merger.add(reader, liveDocs);
+ totDocCount += liveDocs == null ? reader.maxDoc() : liveDocs.count();
}
segUpto++;
}
@@ -3562,25 +3731,24 @@ public class IndexWriter implements Clos
}
final IndexReaderWarmer mergedSegmentWarmer = config.getMergedSegmentWarmer();
- final int termsIndexDivisor;
- final boolean loadDocStores;
+ // TODO: in the non-realtime case, we may want to only
+ // keep deletes (it's costly to open entire reader
+ // when we just need deletes)
+
+ final boolean loadDocStores;
if (mergedSegmentWarmer != null) {
// Load terms index & doc stores so the segment
// warmer can run searches, load documents/term
// vectors
- termsIndexDivisor = config.getReaderTermsIndexDivisor();
loadDocStores = true;
} else {
- termsIndexDivisor = -1;
loadDocStores = false;
}
- // TODO: in the non-realtime case, we may want to only
- // keep deletes (it's costly to open entire reader
- // when we just need deletes)
-
- final SegmentReader mergedReader = readerPool.get(merge.info, loadDocStores, context, termsIndexDivisor);
+ // Force READ context because we merge deletes onto
+ // this reader:
+ final SegmentReader mergedReader = readerPool.get(merge.info, loadDocStores, new IOContext(IOContext.Context.READ));
try {
if (poolReaders && mergedSegmentWarmer != null) {
mergedSegmentWarmer.warm(mergedReader);
@@ -3592,7 +3760,7 @@ public class IndexWriter implements Clos
}
} finally {
synchronized(this) {
- if (readerPool.release(mergedReader)) {
+ if (readerPool.release(mergedReader, IOContext.Context.READ)) {
// Must checkpoint after releasing the
// mergedReader since it may have written a new
// deletes file:
@@ -3667,7 +3835,7 @@ public class IndexWriter implements Clos
}
} finally {
if (reader != null) {
- readerPool.release(reader);
+ readerPool.release(reader, false);
}
}
return buffer.toString();
@@ -3721,7 +3889,7 @@ public class IndexWriter implements Clos
* if it wasn't already. If that succeeds, then we
* prepare a new segments_N file but do not fully commit
* it. */
- private void startCommit(Map<String,String> commitUserData) throws IOException {
+ private void startCommit(final SegmentInfos toSync, final Map<String,String> commitUserData) throws IOException {
assert testPoint("startStartCommit");
assert pendingCommit == null;
@@ -3732,44 +3900,31 @@ public class IndexWriter implements Clos
try {
- if (infoStream != null)
+ if (infoStream != null) {
message("startCommit(): start");
-
- final SegmentInfos toSync;
- final long myChangeCount;
+ }
synchronized(this) {
assert lastCommitChangeCount <= changeCount;
- myChangeCount = changeCount;
- if (changeCount == lastCommitChangeCount) {
- if (infoStream != null)
+ if (pendingCommitChangeCount == lastCommitChangeCount) {
+ if (infoStream != null) {
message(" skip startCommit(): no changes pending");
+ }
+ deleter.decRef(toSync);
return;
}
- // First, we clone & incref the segmentInfos we intend
- // to sync, then, without locking, we sync() all files
- // referenced by toSync, in the background.
-
- if (infoStream != null)
- message("startCommit index=" + segString(segmentInfos) + " changeCount=" + changeCount);
-
- readerPool.commit(segmentInfos);
- toSync = (SegmentInfos) segmentInfos.clone();
+ if (infoStream != null) {
+ message("startCommit index=" + segString(toSync) + " changeCount=" + changeCount);
+ }
assert filesExist(toSync);
- if (commitUserData != null)
+ if (commitUserData != null) {
toSync.setUserData(commitUserData);
-
- // This protects the segmentInfos we are now going
- // to commit. This is important in case, eg, while
- // we are trying to sync all referenced files, a
- // merge completes which would otherwise have
- // removed the files we are now syncing.
- deleter.incRef(toSync, false);
+ }
}
assert testPoint("midStartCommit");
@@ -3794,19 +3949,18 @@ public class IndexWriter implements Clos
// an exception)
toSync.prepareCommit(directory);
- pendingCommit = toSync;
pendingCommitSet = true;
- pendingCommitChangeCount = myChangeCount;
+ pendingCommit = toSync;
}
- if (infoStream != null)
+ if (infoStream != null) {
message("done all syncs");
+ }
assert testPoint("midStartCommitSuccess");
} finally {
synchronized(this) {
-
// Have our master segmentInfos record the
// generations we just prepared. We do this
// on error or success so we don't
@@ -3818,6 +3972,7 @@ public class IndexWriter implements Clos
message("hit exception committing segments file");
}
+ // Hit exception
deleter.decRef(toSync);
}
}
Modified: lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/IndexWriterConfig.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/IndexWriterConfig.java?rev=1160700&r1=1160699&r2=1160700&view=diff
==============================================================================
--- lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/IndexWriterConfig.java (original)
+++ lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/IndexWriterConfig.java Tue Aug 23 14:06:58 2011
@@ -164,6 +164,7 @@ public final class IndexWriterConfig imp
} else {
mergePolicy = new LogByteSizeMergePolicy();
}
+ flushPolicy = new FlushByRamOrCountsPolicy();
readerPooling = DEFAULT_READER_POOLING;
indexerThreadPool = new ThreadAffinityDocumentsWriterThreadPool();
readerTermsIndexDivisor = DEFAULT_READER_TERMS_INDEX_DIVISOR;
Modified: lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/MergePolicy.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/MergePolicy.java?rev=1160700&r1=1160699&r2=1160700&view=diff
==============================================================================
--- lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/MergePolicy.java (original)
+++ lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/MergePolicy.java Tue Aug 23 14:06:58 2011
@@ -17,16 +17,17 @@ package org.apache.lucene.index;
* limitations under the License.
*/
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.MergeInfo;
-import org.apache.lucene.util.SetOnce;
-import org.apache.lucene.util.SetOnce.AlreadySetException;
-
import java.io.IOException;
-import java.util.List;
import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.MergeInfo;
+import org.apache.lucene.util.BitVector;
+import org.apache.lucene.util.SetOnce.AlreadySetException;
+import org.apache.lucene.util.SetOnce;
+
/**
* <p>Expert: a MergePolicy determines the sequence of
* primitive merge operations to be used for overall merge
@@ -75,7 +76,7 @@ public abstract class MergePolicy implem
int maxNumSegmentsOptimize; // used by IndexWriter
public long estimatedMergeBytes; // used by IndexWriter
List<SegmentReader> readers; // used by IndexWriter
- List<SegmentReader> readerClones; // used by IndexWriter
+ List<BitVector> readerLiveDocs; // used by IndexWriter
public final List<SegmentInfo> segments;
public final int totalDocCount;
boolean aborted;
Modified: lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/MultiTerms.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/MultiTerms.java?rev=1160700&r1=1160699&r2=1160700&view=diff
==============================================================================
--- lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/MultiTerms.java (original)
+++ lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/MultiTerms.java Tue Aug 23 14:06:58 2011
@@ -17,13 +17,15 @@ package org.apache.lucene.index;
* limitations under the License.
*/
-import org.apache.lucene.util.BytesRef;
-import org.apache.lucene.util.ReaderUtil;
-
import java.io.IOException;
-import java.util.List;
import java.util.ArrayList;
import java.util.Comparator;
+import java.util.List;
+
+import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.ReaderUtil;
+
+import org.apache.lucene.util.automaton.CompiledAutomaton;
/**
* Exposes flex API, merged from flex API of
@@ -59,6 +61,23 @@ public final class MultiTerms extends Te
}
@Override
+ public TermsEnum intersect(CompiledAutomaton compiled, BytesRef startTerm) throws IOException {
+ final List<MultiTermsEnum.TermsEnumIndex> termsEnums = new ArrayList<MultiTermsEnum.TermsEnumIndex>();
+ for(int i=0;i<subs.length;i++) {
+ final TermsEnum termsEnum = subs[i].intersect(compiled, startTerm);
+ if (termsEnum != null) {
+ termsEnums.add(new MultiTermsEnum.TermsEnumIndex(termsEnum, i));
+ }
+ }
+
+ if (termsEnums.size() > 0) {
+ return new MultiTermsEnum(subSlices).reset(termsEnums.toArray(MultiTermsEnum.TermsEnumIndex.EMPTY_ARRAY));
+ } else {
+ return TermsEnum.EMPTY;
+ }
+ }
+
+ @Override
public TermsEnum iterator() throws IOException {
final List<MultiTermsEnum.TermsEnumIndex> termsEnums = new ArrayList<MultiTermsEnum.TermsEnumIndex>();
Modified: lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/PerFieldCodecWrapper.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/PerFieldCodecWrapper.java?rev=1160700&r1=1160699&r2=1160700&view=diff
==============================================================================
--- lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/PerFieldCodecWrapper.java (original)
+++ lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/PerFieldCodecWrapper.java Tue Aug 23 14:06:58 2011
@@ -174,14 +174,6 @@ final class PerFieldCodecWrapper extends
public void close() throws IOException {
IOUtils.closeSafely(false, codecs.values());
}
-
- @Override
- public void loadTermsIndex(int indexDivisor) throws IOException {
- Iterator<FieldsProducer> it = codecs.values().iterator();
- while (it.hasNext()) {
- it.next().loadTermsIndex(indexDivisor);
- }
- }
}
@Override
Modified: lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/SegmentInfo.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/SegmentInfo.java?rev=1160700&r1=1160699&r2=1160700&view=diff
==============================================================================
--- lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/SegmentInfo.java (original)
+++ lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/SegmentInfo.java Tue Aug 23 14:06:58 2011
@@ -715,8 +715,14 @@ public final class SegmentInfo implement
if (getHasVectors()) {
s.append('v');
}
- } catch (IOException e) {
- throw new RuntimeException(e);
+ } catch (Throwable e) {
+ // Messy: because getHasVectors may be used in an
+ // un-thread-safe way, and may attempt to open an fnm
+ // file that has since (legitimately) been deleted by
+ // IndexWriter, instead of throwing these exceptions
+ // up, just add v? to indicate we don't know if this
+ // segment has vectors:
+ s.append("v?");
}
s.append(docCount);
Modified: lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/SegmentMerger.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/SegmentMerger.java?rev=1160700&r1=1160699&r2=1160700&view=diff
==============================================================================
--- lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/SegmentMerger.java (original)
+++ lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/SegmentMerger.java Tue Aug 23 14:06:58 2011
@@ -39,7 +39,6 @@ import org.apache.lucene.store.IndexInpu
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.IOUtils;
-import org.apache.lucene.util.MultiBits;
import org.apache.lucene.util.ReaderUtil;
/**
@@ -55,7 +54,7 @@ final class SegmentMerger {
private String segment;
private int termIndexInterval = IndexWriterConfig.DEFAULT_TERM_INDEX_INTERVAL;
- private List<IndexReader> readers = new ArrayList<IndexReader>();
+ private List<MergeState.IndexReaderAndLiveDocs> readers = new ArrayList<MergeState.IndexReaderAndLiveDocs>();
private final FieldInfos fieldInfos;
private int mergedDocs;
@@ -101,7 +100,21 @@ final class SegmentMerger {
* @param reader
*/
final void add(IndexReader reader) {
- ReaderUtil.gatherSubReaders(readers, reader);
+ try {
+ new ReaderUtil.Gather(reader) {
+ @Override
+ protected void add(int base, IndexReader r) {
+ readers.add(new MergeState.IndexReaderAndLiveDocs(r, r.getLiveDocs()));
+ }
+ }.run();
+ } catch (IOException ioe) {
+ // won't happen
+ throw new RuntimeException(ioe);
+ }
+ }
+
+ final void add(SegmentReader reader, Bits liveDocs) {
+ readers.add(new MergeState.IndexReaderAndLiveDocs(reader, liveDocs));
}
/**
@@ -123,8 +136,9 @@ final class SegmentMerger {
mergePerDoc();
mergeNorms();
- if (fieldInfos.hasVectors())
+ if (fieldInfos.hasVectors()) {
mergeVectors();
+ }
return mergedDocs;
}
@@ -189,9 +203,9 @@ final class SegmentMerger {
// FieldInfos, then we can do a bulk copy of the
// stored fields:
for (int i = 0; i < numReaders; i++) {
- IndexReader reader = readers.get(i);
- if (reader instanceof SegmentReader) {
- SegmentReader segmentReader = (SegmentReader) reader;
+ MergeState.IndexReaderAndLiveDocs reader = readers.get(i);
+ if (reader.reader instanceof SegmentReader) {
+ SegmentReader segmentReader = (SegmentReader) reader.reader;
boolean same = true;
FieldInfos segmentFieldInfos = segmentReader.fieldInfos();
for (FieldInfo fi : segmentFieldInfos) {
@@ -216,7 +230,8 @@ final class SegmentMerger {
* @throws IOException if there is a low-level IO error
*/
private int mergeFields() throws CorruptIndexException, IOException {
- for (IndexReader reader : readers) {
+ for (MergeState.IndexReaderAndLiveDocs readerAndLiveDocs : readers) {
+ final IndexReader reader = readerAndLiveDocs.reader;
if (reader instanceof SegmentReader) {
SegmentReader segmentReader = (SegmentReader) reader;
FieldInfos readerFieldInfos = segmentReader.fieldInfos();
@@ -245,7 +260,7 @@ final class SegmentMerger {
final FieldsWriter fieldsWriter = new FieldsWriter(directory, segment, context);
try {
int idx = 0;
- for (IndexReader reader : readers) {
+ for (MergeState.IndexReaderAndLiveDocs reader : readers) {
final SegmentReader matchingSegmentReader = matchingSegmentReaders[idx++];
FieldsReader matchingFieldsReader = null;
if (matchingSegmentReader != null) {
@@ -254,7 +269,7 @@ final class SegmentMerger {
matchingFieldsReader = fieldsReader;
}
}
- if (reader.hasDeletions()) {
+ if (reader.liveDocs != null) {
docCount += copyFieldsWithDeletions(fieldsWriter,
reader, matchingFieldsReader);
} else {
@@ -281,12 +296,12 @@ final class SegmentMerger {
return docCount;
}
- private int copyFieldsWithDeletions(final FieldsWriter fieldsWriter, final IndexReader reader,
+ private int copyFieldsWithDeletions(final FieldsWriter fieldsWriter, final MergeState.IndexReaderAndLiveDocs reader,
final FieldsReader matchingFieldsReader)
throws IOException, MergeAbortedException, CorruptIndexException {
int docCount = 0;
- final int maxDoc = reader.maxDoc();
- final Bits liveDocs = reader.getLiveDocs();
+ final int maxDoc = reader.reader.maxDoc();
+ final Bits liveDocs = reader.liveDocs;
assert liveDocs != null;
if (matchingFieldsReader != null) {
// We can bulk-copy because the fieldInfos are "congruent"
@@ -322,7 +337,7 @@ final class SegmentMerger {
}
// NOTE: it's very important to first assign to doc then pass it to
// termVectorsWriter.addAllDocVectors; see LUCENE-1282
- Document doc = reader.document(j);
+ Document doc = reader.reader.document(j);
fieldsWriter.addDocument(doc, fieldInfos);
docCount++;
checkAbort.work(300);
@@ -331,10 +346,10 @@ final class SegmentMerger {
return docCount;
}
- private int copyFieldsNoDeletions(final FieldsWriter fieldsWriter, final IndexReader reader,
+ private int copyFieldsNoDeletions(final FieldsWriter fieldsWriter, final MergeState.IndexReaderAndLiveDocs reader,
final FieldsReader matchingFieldsReader)
throws IOException, MergeAbortedException, CorruptIndexException {
- final int maxDoc = reader.maxDoc();
+ final int maxDoc = reader.reader.maxDoc();
int docCount = 0;
if (matchingFieldsReader != null) {
// We can bulk-copy because the fieldInfos are "congruent"
@@ -349,7 +364,7 @@ final class SegmentMerger {
for (; docCount < maxDoc; docCount++) {
// NOTE: it's very important to first assign to doc then pass it to
// termVectorsWriter.addAllDocVectors; see LUCENE-1282
- Document doc = reader.document(docCount);
+ Document doc = reader.reader.document(docCount);
fieldsWriter.addDocument(doc, fieldInfos);
checkAbort.work(300);
}
@@ -362,12 +377,11 @@ final class SegmentMerger {
* @throws IOException
*/
private final void mergeVectors() throws IOException {
- TermVectorsWriter termVectorsWriter =
- new TermVectorsWriter(directory, segment, fieldInfos, context);
+ TermVectorsWriter termVectorsWriter = new TermVectorsWriter(directory, segment, fieldInfos, context);
try {
int idx = 0;
- for (final IndexReader reader : readers) {
+ for (final MergeState.IndexReaderAndLiveDocs reader : readers) {
final SegmentReader matchingSegmentReader = matchingSegmentReaders[idx++];
TermVectorsReader matchingVectorsReader = null;
if (matchingSegmentReader != null) {
@@ -378,11 +392,10 @@ final class SegmentMerger {
matchingVectorsReader = vectorsReader;
}
}
- if (reader.hasDeletions()) {
+ if (reader.liveDocs != null) {
copyVectorsWithDeletions(termVectorsWriter, matchingVectorsReader, reader);
} else {
copyVectorsNoDeletions(termVectorsWriter, matchingVectorsReader, reader);
-
}
}
} finally {
@@ -403,10 +416,10 @@ final class SegmentMerger {
private void copyVectorsWithDeletions(final TermVectorsWriter termVectorsWriter,
final TermVectorsReader matchingVectorsReader,
- final IndexReader reader)
+ final MergeState.IndexReaderAndLiveDocs reader)
throws IOException, MergeAbortedException {
- final int maxDoc = reader.maxDoc();
- final Bits liveDocs = reader.getLiveDocs();
+ final int maxDoc = reader.reader.maxDoc();
+ final Bits liveDocs = reader.liveDocs;
if (matchingVectorsReader != null) {
// We can bulk-copy because the fieldInfos are "congruent"
for (int docNum = 0; docNum < maxDoc;) {
@@ -441,7 +454,7 @@ final class SegmentMerger {
// NOTE: it's very important to first assign to vectors then pass it to
// termVectorsWriter.addAllDocVectors; see LUCENE-1282
- TermFreqVector[] vectors = reader.getTermFreqVectors(docNum);
+ TermFreqVector[] vectors = reader.reader.getTermFreqVectors(docNum);
termVectorsWriter.addAllDocVectors(vectors);
checkAbort.work(300);
}
@@ -450,9 +463,9 @@ final class SegmentMerger {
private void copyVectorsNoDeletions(final TermVectorsWriter termVectorsWriter,
final TermVectorsReader matchingVectorsReader,
- final IndexReader reader)
+ final MergeState.IndexReaderAndLiveDocs reader)
throws IOException, MergeAbortedException {
- final int maxDoc = reader.maxDoc();
+ final int maxDoc = reader.reader.maxDoc();
if (matchingVectorsReader != null) {
// We can bulk-copy because the fieldInfos are "congruent"
int docCount = 0;
@@ -467,7 +480,7 @@ final class SegmentMerger {
for (int docNum = 0; docNum < maxDoc; docNum++) {
// NOTE: it's very important to first assign to vectors then pass it to
// termVectorsWriter.addAllDocVectors; see LUCENE-1282
- TermFreqVector[] vectors = reader.getTermFreqVectors(docNum);
+ TermFreqVector[] vectors = reader.reader.getTermFreqVectors(docNum);
termVectorsWriter.addAllDocVectors(vectors);
checkAbort.work(300);
}
@@ -488,23 +501,17 @@ final class SegmentMerger {
final List<Fields> fields = new ArrayList<Fields>();
final List<ReaderUtil.Slice> slices = new ArrayList<ReaderUtil.Slice>();
- final List<Bits> bits = new ArrayList<Bits>();
- final List<Integer> bitsStarts = new ArrayList<Integer>();
- for(IndexReader r : readers) {
- final Fields f = r.fields();
- final int maxDoc = r.maxDoc();
+ for(MergeState.IndexReaderAndLiveDocs r : readers) {
+ final Fields f = r.reader.fields();
+ final int maxDoc = r.reader.maxDoc();
if (f != null) {
slices.add(new ReaderUtil.Slice(docBase, maxDoc, fields.size()));
fields.add(f);
- bits.add(r.getLiveDocs());
- bitsStarts.add(docBase);
}
docBase += maxDoc;
}
- bitsStarts.add(docBase);
-
// we may gather more readers than mergeState.readerCount
mergeState = new MergeState();
mergeState.readers = readers;
@@ -513,7 +520,6 @@ final class SegmentMerger {
mergeState.mergedDocCount = mergedDocs;
// Remap docIDs
- mergeState.delCounts = new int[mergeState.readerCount];
mergeState.docMaps = new int[mergeState.readerCount][];
mergeState.docBase = new int[mergeState.readerCount];
mergeState.hasPayloadProcessorProvider = payloadProcessorProvider != null;
@@ -526,73 +532,62 @@ final class SegmentMerger {
for(int i=0;i<mergeState.readerCount;i++) {
- final IndexReader reader = readers.get(i);
+ final MergeState.IndexReaderAndLiveDocs reader = readers.get(i);
- mergeState.delCounts[i] = reader.numDeletedDocs();
mergeState.docBase[i] = docBase;
- docBase += reader.numDocs();
- inputDocBase += reader.maxDoc();
- if (mergeState.delCounts[i] != 0) {
+ inputDocBase += reader.reader.maxDoc();
+ final int maxDoc = reader.reader.maxDoc();
+ if (reader.liveDocs != null) {
int delCount = 0;
- final Bits liveDocs = reader.getLiveDocs();
+ final Bits liveDocs = reader.liveDocs;
assert liveDocs != null;
- final int maxDoc = reader.maxDoc();
final int[] docMap = mergeState.docMaps[i] = new int[maxDoc];
int newDocID = 0;
for(int j=0;j<maxDoc;j++) {
if (!liveDocs.get(j)) {
docMap[j] = -1;
- delCount++; // only for assert
+ delCount++;
} else {
docMap[j] = newDocID++;
}
}
- assert delCount == mergeState.delCounts[i]: "reader delCount=" + mergeState.delCounts[i] + " vs recomputed delCount=" + delCount;
+ docBase += maxDoc - delCount;
+ } else {
+ docBase += maxDoc;
}
if (payloadProcessorProvider != null) {
- mergeState.dirPayloadProcessor[i] = payloadProcessorProvider.getDirProcessor(reader.directory());
+ mergeState.dirPayloadProcessor[i] = payloadProcessorProvider.getDirProcessor(reader.reader.directory());
}
}
codec = segmentWriteState.segmentCodecs.codec();
final FieldsConsumer consumer = codec.fieldsConsumer(segmentWriteState);
+ boolean success = false;
try {
- // NOTE: this is silly, yet, necessary -- we create a
- // MultiBits as our skip docs only to have it broken
- // apart when we step through the docs enums in
- // MultiDocsEnum.
- mergeState.multiLiveDocs = new MultiBits(bits, bitsStarts, true);
-
consumer.merge(mergeState,
new MultiFields(fields.toArray(Fields.EMPTY_ARRAY),
slices.toArray(ReaderUtil.Slice.EMPTY_ARRAY)));
+ success = true;
} finally {
- consumer.close();
+ IOUtils.closeSafely(!success, consumer);
}
}
private void mergePerDoc() throws IOException {
final List<PerDocValues> perDocProducers = new ArrayList<PerDocValues>();
final List<ReaderUtil.Slice> perDocSlices = new ArrayList<ReaderUtil.Slice>();
- final List<Bits> perDocBits = new ArrayList<Bits>();
- final List<Integer> perDocBitsStarts = new ArrayList<Integer>();
int docBase = 0;
- for (IndexReader r : readers) {
- final int maxDoc = r.maxDoc();
- final PerDocValues producer = r.perDocValues();
+ for (MergeState.IndexReaderAndLiveDocs r : readers) {
+ final int maxDoc = r.reader.maxDoc();
+ final PerDocValues producer = r.reader.perDocValues();
if (producer != null) {
perDocSlices.add(new ReaderUtil.Slice(docBase, maxDoc, perDocProducers
.size()));
perDocProducers.add(producer);
- perDocBits.add(r.getLiveDocs());
- perDocBitsStarts.add(docBase);
}
docBase += maxDoc;
}
- perDocBitsStarts.add(docBase);
if (!perDocSlices.isEmpty()) {
- mergeState.multiLiveDocs = new MultiBits(perDocBits, perDocBitsStarts,
- true);
final PerDocConsumer docsConsumer = codec
.docsConsumer(new PerDocWriteState(segmentWriteState));
boolean success = false;
@@ -612,14 +607,6 @@ final class SegmentMerger {
private MergeState mergeState;
- int[][] getDocMaps() {
- return mergeState.docMaps;
- }
-
- int[] getDelCounts() {
- return mergeState.delCounts;
- }
-
public boolean getAnyNonBulkMerges() {
assert matchedCount <= readers.size();
return matchedCount != readers.size();
@@ -635,22 +622,22 @@ final class SegmentMerger {
output = directory.createOutput(IndexFileNames.segmentFileName(segment, "", IndexFileNames.NORMS_EXTENSION), context);
output.writeBytes(SegmentNorms.NORMS_HEADER, SegmentNorms.NORMS_HEADER.length);
}
- for (IndexReader reader : readers) {
- final int maxDoc = reader.maxDoc();
- byte normBuffer[] = reader.norms(fi.name);
+ for (MergeState.IndexReaderAndLiveDocs reader : readers) {
+ final int maxDoc = reader.reader.maxDoc();
+ byte normBuffer[] = reader.reader.norms(fi.name);
if (normBuffer == null) {
// Can be null if this segment doesn't have
// any docs with this field
normBuffer = new byte[maxDoc];
Arrays.fill(normBuffer, (byte)0);
}
- if (!reader.hasDeletions()) {
+ if (reader.liveDocs == null) {
//optimized case for segments without deleted docs
output.writeBytes(normBuffer, maxDoc);
} else {
// this segment has deleted docs, so we have to
// check for every doc if it is deleted or not
- final Bits liveDocs = reader.getLiveDocs();
+ final Bits liveDocs = reader.liveDocs;
for (int k = 0; k < maxDoc; k++) {
if (liveDocs.get(k)) {
output.writeByte(normBuffer[k]);
Modified: lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/SegmentReader.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/SegmentReader.java?rev=1160700&r1=1160699&r2=1160700&view=diff
==============================================================================
--- lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/SegmentReader.java (original)
+++ lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/SegmentReader.java Tue Aug 23 14:06:58 2011
@@ -31,7 +31,6 @@ import org.apache.lucene.document.Docume
import org.apache.lucene.document.FieldSelector;
import org.apache.lucene.index.FieldInfo.IndexOptions;
import org.apache.lucene.index.codecs.PerDocValues;
-import org.apache.lucene.store.BufferedIndexInput;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
@@ -161,9 +160,6 @@ public class SegmentReader extends Index
// NOTE: the bitvector is stored using the regular directory, not cfs
if (hasDeletions(si)) {
liveDocs = new BitVector(directory(), si.getDelFileName(), new IOContext(context, true));
- if (liveDocs.getVersion() < BitVector.VERSION_DGAPS_CLEARED) {
- liveDocs.invertAll();
- }
liveDocsRef = new AtomicInteger(1);
assert checkLiveCounts();
if (liveDocs.size() != si.docCount) {
@@ -637,15 +633,6 @@ public class SegmentReader extends Index
}
}
- // NOTE: only called from IndexWriter when a near
- // real-time reader is opened, or applyDeletes is run,
- // sharing a segment that's still being merged. This
- // method is not thread safe, and relies on the
- // synchronization in IndexWriter
- void loadTermsIndex(int indexDivisor) throws IOException {
- core.fields.loadTermsIndex(indexDivisor);
- }
-
// for testing only
boolean normsClosed() {
if (singleNormStream != null) {
Modified: lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/Terms.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/Terms.java?rev=1160700&r1=1160699&r2=1160700&view=diff
==============================================================================
--- lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/Terms.java (original)
+++ lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/Terms.java Tue Aug 23 14:06:58 2011
@@ -19,9 +19,11 @@ package org.apache.lucene.index;
import java.io.IOException;
import java.util.Comparator;
+
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.CloseableThreadLocal;
+import org.apache.lucene.util.automaton.CompiledAutomaton;
/**
* Access to the terms in a specific field. See {@link Fields}.
@@ -37,7 +39,40 @@ public abstract class Terms {
/** Returns an iterator that will step through all
* terms. This method will not return null.*/
public abstract TermsEnum iterator() throws IOException;
-
+
+ /** Returns a TermsEnum that iterates over all terms that
+ * are accepted by the provided {@link
+ * CompiledAutomaton}. If the <code>startTerm</code> is
+ * provided then the returned enum will only accept terms
+ * > <code>startTerm</code>, but you still must call
+ * next() first to get to the first term. Note that the
+ * provided <code>startTerm</code> must be accepted by
+ * the automaton.
+ *
+ * <p><b>NOTE</b>: the returned TermsEnum cannot
+ * seek</p>. */
+ public TermsEnum intersect(CompiledAutomaton compiled, final BytesRef startTerm) throws IOException {
+ // TODO: eventually we could support seekCeil/Exact on
+ // the returned enum, instead of only being able to seek
+ // at the start
+ if (compiled.type != CompiledAutomaton.AUTOMATON_TYPE.NORMAL) {
+ throw new IllegalArgumentException("please use CompiledAutomaton.getTermsEnum instead");
+ }
+ if (startTerm == null) {
+ return new AutomatonTermsEnum(iterator(), compiled);
+ } else {
+ return new AutomatonTermsEnum(iterator(), compiled) {
+ @Override
+ protected BytesRef nextSeekTerm(BytesRef term) throws IOException {
+ if (term == null) {
+ term = startTerm;
+ }
+ return super.nextSeekTerm(term);
+ }
+ };
+ }
+ }
+
/** Return the BytesRef Comparator used to sort terms
* provided by the iterator. This method may return null
* if there are no terms. This method may be invoked
Modified: lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/TieredMergePolicy.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/TieredMergePolicy.java?rev=1160700&r1=1160699&r2=1160700&view=diff
==============================================================================
--- lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/TieredMergePolicy.java (original)
+++ lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/TieredMergePolicy.java Tue Aug 23 14:06:58 2011
@@ -60,7 +60,7 @@ import java.util.ArrayList;
* <p><b>NOTE</b>: This policy always merges by byte size
* of the segments, always pro-rates by percent deletes,
* and does not apply any maximum segment size during
- * optimize (unlike {@link LogByteSizeMergePolicy}.
+ * optimize (unlike {@link LogByteSizeMergePolicy}).
*
* @lucene.experimental
*/
Modified: lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/codecs/BlockTermState.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/codecs/BlockTermState.java?rev=1160700&r1=1160699&r2=1160700&view=diff
==============================================================================
--- lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/codecs/BlockTermState.java (original)
+++ lucene/dev/branches/flexscoring/lucene/src/java/org/apache/lucene/index/codecs/BlockTermState.java Tue Aug 23 14:06:58 2011
@@ -29,10 +29,8 @@ public class BlockTermState extends OrdT
public int docFreq; // how many docs have this term
public long totalTermFreq; // total number of occurrences of this term
- public int termCount; // term ord are in the current block
- public long blockFilePointer; // fp into the terms dict primary file (_X.tib) that holds this term
-
- public int blockTermCount; // how many terms in current block
+ public int termBlockOrd; // the term's ord in the current block
+ public long blockFilePointer; // fp into the terms dict primary file (_X.tim) that holds this term
@Override
public void copyFrom(TermState _other) {
@@ -41,7 +39,7 @@ public class BlockTermState extends OrdT
super.copyFrom(_other);
docFreq = other.docFreq;
totalTermFreq = other.totalTermFreq;
- termCount = other.termCount;
+ termBlockOrd = other.termBlockOrd;
blockFilePointer = other.blockFilePointer;
// NOTE: don't copy blockTermCount;
@@ -51,6 +49,6 @@ public class BlockTermState extends OrdT
@Override
public String toString() {
- return "ord=" + ord + " docFreq=" + docFreq + " totalTermFreq=" + totalTermFreq + " termCount=" + termCount + " blockFP=" + blockFilePointer;
+ return "docFreq=" + docFreq + " totalTermFreq=" + totalTermFreq + " termBlockOrd=" + termBlockOrd + " blockFP=" + blockFilePointer;
}
}