You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by si...@apache.org on 2011/08/09 11:22:16 UTC
svn commit: r1155278 - in /lucene/dev/trunk: lucene/
lucene/src/java/org/apache/lucene/index/
lucene/src/test/org/apache/lucene/index/
solr/core/src/test/org/apache/solr/search/
Author: simonw
Date: Tue Aug 9 09:22:15 2011
New Revision: 1155278
URL: http://svn.apache.org/viewvc?rev=1155278&view=rev
Log:
LUCENE-3348: IndexWriter applies wrong deletes during concurrent flush-all
Added:
lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestStressNRT.java
Modified:
lucene/dev/trunk/lucene/CHANGES.txt
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/BufferedDeletes.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/BufferedDeletesStream.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DirectoryReader.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IndexFileDeleter.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IndexWriter.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/SegmentInfo.java
lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestIndexWriterOnDiskFull.java
lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java
Modified: lucene/dev/trunk/lucene/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/CHANGES.txt?rev=1155278&r1=1155277&r2=1155278&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/CHANGES.txt (original)
+++ lucene/dev/trunk/lucene/CHANGES.txt Tue Aug 9 09:22:15 2011
@@ -548,6 +548,10 @@ Bug fixes
lucene version, you get the old buggy behavior for backwards compatibility.
(Trejkaz, Robert Muir)
+* LUCENE-3348: Fix thread safety hazards in IndexWriter that could
+ rarely cause deletions to be incorrectly applied. (Yonik Seeley,
+ Simon Willnauer, Mike McCandless)
+
New Features
* LUCENE-3290: Added FieldInvertState.numUniqueTerms
Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/BufferedDeletes.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/BufferedDeletes.java?rev=1155278&r1=1155277&r2=1155278&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/BufferedDeletes.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/BufferedDeletes.java Tue Aug 9 09:22:15 2011
@@ -93,7 +93,7 @@ class BufferedDeletes {
} else {
String s = "gen=" + gen;
if (numTermDeletes.get() != 0) {
- s += " " + numTermDeletes.get() + " deleted terms (unique count=" + terms.size() + ")";
+ s += " " + numTermDeletes.get() + " deleted terms (unique count=" + terms.size() + ") terms=" + terms.keySet();
}
if (queries.size() != 0) {
s += " " + queries.size() + " deleted queries";
Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/BufferedDeletesStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/BufferedDeletesStream.java?rev=1155278&r1=1155277&r2=1155278&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/BufferedDeletesStream.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/BufferedDeletesStream.java Tue Aug 9 09:22:15 2011
@@ -372,7 +372,8 @@ class BufferedDeletesStream {
DocsEnum docs = null;
assert checkDeleteTerm(null);
-
+
+ //System.out.println(Thread.currentThread().getName() + " del terms reader=" + reader);
for (Term term : termsIter) {
// Since we visit terms sorted, we gain performance
// by re-using the same TermsEnum and seeking only
@@ -401,6 +402,7 @@ class BufferedDeletesStream {
if (docsEnum != null) {
while (true) {
final int docID = docsEnum.nextDoc();
+ //System.out.println(Thread.currentThread().getName() + " del term=" + term + " doc=" + docID);
if (docID == DocsEnum.NO_MORE_DOCS) {
break;
}
Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DirectoryReader.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DirectoryReader.java?rev=1155278&r1=1155277&r2=1155278&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DirectoryReader.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DirectoryReader.java Tue Aug 9 09:22:15 2011
@@ -309,7 +309,7 @@ class DirectoryReader extends IndexReade
buffer.append('(');
final String segmentsFile = segmentInfos.getCurrentSegmentFileName();
if (segmentsFile != null) {
- buffer.append(segmentsFile);
+ buffer.append(segmentsFile).append(":").append(segmentInfos.getVersion());
}
if (writer != null) {
buffer.append(":nrt");
Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java?rev=1155278&r1=1155277&r2=1155278&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java Tue Aug 9 09:22:15 2011
@@ -165,7 +165,7 @@ final class DocumentsWriter {
}
private void applyAllDeletes(DocumentsWriterDeleteQueue deleteQueue) throws IOException {
- if (deleteQueue != null) {
+ if (deleteQueue != null && !flushControl.isFullFlush()) {
synchronized (ticketQueue) {
// Freeze and insert the delete flush ticket in the queue
ticketQueue.add(new FlushTicket(deleteQueue.freezeGlobalBuffer(null), false));
@@ -220,7 +220,7 @@ final class DocumentsWriter {
try {
if (infoStream != null) {
- message("docWriter: abort");
+ message("DW: abort");
}
final Iterator<ThreadState> threadsIterator = perThreadPool.getActivePerThreadsIterator();
@@ -324,7 +324,7 @@ final class DocumentsWriter {
final Term delTerm) throws CorruptIndexException, IOException {
boolean maybeMerge = preUpdate();
- final ThreadState perThread = perThreadPool.getAndLock(Thread.currentThread(), this);
+ final ThreadState perThread = flushControl.obtainAndLock();
final DocumentsWriterPerThread flushingDWPT;
try {
@@ -356,7 +356,8 @@ final class DocumentsWriter {
boolean maybeMerge = preUpdate();
- final ThreadState perThread = perThreadPool.getAndLock(Thread.currentThread(), this);
+ final ThreadState perThread = flushControl.obtainAndLock();
+
final DocumentsWriterPerThread flushingDWPT;
try {
@@ -513,6 +514,9 @@ final class DocumentsWriter {
assert newSegment != null;
final SegmentInfo segInfo = indexWriter.prepareFlushedSegment(newSegment);
final BufferedDeletes deletes = newSegment.segmentDeletes;
+ if (infoStream != null) {
+ message(Thread.currentThread().getName() + ": publishFlushedSegment seg-private deletes=" + deletes);
+ }
FrozenBufferedDeletes packet = null;
if (deletes != null && deletes.any()) {
// Segment private delete
@@ -542,7 +546,10 @@ final class DocumentsWriter {
final boolean flushAllThreads()
throws IOException {
final DocumentsWriterDeleteQueue flushingDeleteQueue;
-
+ if (infoStream != null) {
+ message(Thread.currentThread().getName() + " startFullFlush");
+ }
+
synchronized (this) {
flushingDeleteQueue = deleteQueue;
/* Cutover to a new delete queue. This must be synced on the flush control
@@ -564,6 +571,9 @@ final class DocumentsWriter {
// If a concurrent flush is still in flight wait for it
flushControl.waitForFlush();
if (!anythingFlushed) { // apply deletes if we did not flush any document
+ if (infoStream != null) {
+ message(Thread.currentThread().getName() + ": flush naked frozen global deletes");
+ }
synchronized (ticketQueue) {
ticketQueue.add(new FlushTicket(flushingDeleteQueue.freezeGlobalBuffer(null), false));
}
@@ -576,6 +586,9 @@ final class DocumentsWriter {
}
final void finishFullFlush(boolean success) {
+ if (infoStream != null) {
+ message(Thread.currentThread().getName() + " finishFullFlush success=" + success);
+ }
assert setFlushingDeleteQueue(null);
if (success) {
// Release the flush lock
@@ -609,7 +622,7 @@ final class DocumentsWriter {
next.lock();
try {
assert !next.isActive();
- } finally {
+ } finally {
next.unlock();
}
}
Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java?rev=1155278&r1=1155277&r2=1155278&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java Tue Aug 9 09:22:15 2011
@@ -16,6 +16,8 @@ package org.apache.lucene.index;
* License for the specific language governing permissions and limitations under
* the License.
*/
+
+import java.util.Arrays;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.locks.ReentrantLock;
@@ -110,6 +112,7 @@ final class DocumentsWriterDeleteQueue {
*/
void add(Term term, DeleteSlice slice) {
final TermNode termNode = new TermNode(term);
+// System.out.println(Thread.currentThread().getName() + ": push " + termNode + " this=" + this);
add(termNode);
/*
* this is an update request where the term is the updated documents
@@ -175,13 +178,14 @@ final class DocumentsWriterDeleteQueue {
void tryApplyGlobalSlice() {
if (globalBufferLock.tryLock()) {
/*
- * The global buffer must be locked but we don't need to upate them if
+ * The global buffer must be locked but we don't need to update them if
* there is an update going on right now. It is sufficient to apply the
* deletes that have been added after the current in-flight global slices
* tail the next time we can get the lock!
*/
try {
if (updateSlice(globalSlice)) {
+// System.out.println(Thread.currentThread() + ": apply globalSlice");
globalSlice.apply(globalBufferedDeletes, BufferedDeletes.MAX_INT);
}
} finally {
@@ -210,6 +214,7 @@ final class DocumentsWriterDeleteQueue {
globalSlice.apply(globalBufferedDeletes, BufferedDeletes.MAX_INT);
}
+// System.out.println(Thread.currentThread().getName() + ": now freeze global buffer " + globalBufferedDeletes);
final FrozenBufferedDeletes packet = new FrozenBufferedDeletes(
globalBufferedDeletes, false);
globalBufferedDeletes.clear();
@@ -262,6 +267,7 @@ final class DocumentsWriterDeleteQueue {
current = current.next;
assert current != null : "slice property violated between the head on the tail must not be a null node";
current.apply(del, docIDUpto);
+// System.out.println(Thread.currentThread().getName() + ": pull " + current + " docIDUpto=" + docIDUpto);
} while (current != sliceTail);
reset();
}
@@ -330,6 +336,11 @@ final class DocumentsWriterDeleteQueue {
void apply(BufferedDeletes bufferedDeletes, int docIDUpto) {
bufferedDeletes.addTerm(item, docIDUpto);
}
+
+ @Override
+ public String toString() {
+ return "del=" + item;
+ }
}
private static final class QueryArrayNode extends Node<Query[]> {
@@ -356,6 +367,11 @@ final class DocumentsWriterDeleteQueue {
bufferedDeletes.addTerm(term, docIDUpto);
}
}
+
+ @Override
+ public String toString() {
+ return "dels=" + Arrays.toString(item);
+ }
}
Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java?rev=1155278&r1=1155277&r2=1155278&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java Tue Aug 9 09:22:15 2011
@@ -18,6 +18,7 @@ package org.apache.lucene.index;
*/
import java.io.IOException;
import java.util.ArrayList;
+import java.util.List;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
@@ -362,10 +363,25 @@ public final class DocumentsWriterFlushC
return this.perThreadPool.getActiveThreadState();
}
+ ThreadState obtainAndLock() {
+ final ThreadState perThread = perThreadPool.getAndLock(Thread
+ .currentThread(), documentsWriter);
+ if (perThread.isActive()
+ && perThread.perThread.deleteQueue != documentsWriter.deleteQueue) {
+ // There is a flush-all in process and this DWPT is
+ // now stale -- enroll it for flush and try for
+ // another DWPT:
+ addFlushableState(perThread);
+ }
+ // simply return the ThreadState even in a flush all case sine we already hold the lock
+ return perThread;
+ }
+
void markForFullFlush() {
final DocumentsWriterDeleteQueue flushingQueue;
synchronized (this) {
- assert !fullFlush;
+ assert !fullFlush : "called DWFC#markForFullFlush() while full flush is still running";
+ assert fullFlushBuffer.isEmpty() : "full flush buffer should be empty: "+ fullFlushBuffer;
fullFlush = true;
flushingQueue = documentsWriter.deleteQueue;
// Set a new delete queue - all subsequent DWPT will use this queue until
@@ -373,9 +389,7 @@ public final class DocumentsWriterFlushC
DocumentsWriterDeleteQueue newQueue = new DocumentsWriterDeleteQueue(flushingQueue.generation+1);
documentsWriter.deleteQueue = newQueue;
}
- final Iterator<ThreadState> allActiveThreads = perThreadPool
- .getActivePerThreadsIterator();
- final ArrayList<DocumentsWriterPerThread> toFlush = new ArrayList<DocumentsWriterPerThread>();
+ final Iterator<ThreadState> allActiveThreads = perThreadPool.getActivePerThreadsIterator();
while (allActiveThreads.hasNext()) {
final ThreadState next = allActiveThreads.next();
next.lock();
@@ -395,25 +409,7 @@ public final class DocumentsWriterFlushC
// this one is already a new DWPT
continue;
}
- if (next.perThread.getNumDocsInRAM() > 0 ) {
- final DocumentsWriterPerThread dwpt = next.perThread; // just for assert
- synchronized (this) {
- if (!next.flushPending) {
- setFlushPending(next);
- }
- final DocumentsWriterPerThread flushingDWPT = internalTryCheckOutForFlush(next);
- assert flushingDWPT != null : "DWPT must never be null here since we hold the lock and it holds documents";
- assert dwpt == flushingDWPT : "flushControl returned different DWPT";
- toFlush.add(flushingDWPT);
- }
- } else {
- if (closed) {
- next.resetWriter(null); // make this state inactive
- } else {
- // get the new delete queue from DW
- next.perThread.initialize();
- }
- }
+ addFlushableState(next);
} finally {
next.unlock();
}
@@ -425,9 +421,55 @@ public final class DocumentsWriterFlushC
* blocking indexing.*/
pruneBlockedQueue(flushingQueue);
assert assertBlockedFlushes(documentsWriter.deleteQueue);
- flushQueue.addAll(toFlush);
+ flushQueue.addAll(fullFlushBuffer);
+ fullFlushBuffer.clear();
stallControl.updateStalled(this);
}
+ assert assertActiveDeleteQueue(documentsWriter.deleteQueue);
+ }
+
+ private boolean assertActiveDeleteQueue(DocumentsWriterDeleteQueue queue) {
+ final Iterator<ThreadState> allActiveThreads = perThreadPool.getActivePerThreadsIterator();
+ while (allActiveThreads.hasNext()) {
+ final ThreadState next = allActiveThreads.next();
+ next.lock();
+ try {
+ assert !next.isActive() || next.perThread.deleteQueue == queue;
+ } finally {
+ next.unlock();
+ }
+ }
+ return true;
+ }
+
+ private final List<DocumentsWriterPerThread> fullFlushBuffer = new ArrayList<DocumentsWriterPerThread>();
+
+ void addFlushableState(ThreadState perThread) {
+ if (documentsWriter.infoStream != null) {
+ documentsWriter.message("FC: " + Thread.currentThread().getName() + ": addFlushableState " + perThread.perThread);
+ }
+ final DocumentsWriterPerThread dwpt = perThread.perThread;
+ assert perThread.isHeldByCurrentThread();
+ assert perThread.isActive();
+ assert fullFlush;
+ assert dwpt.deleteQueue != documentsWriter.deleteQueue;
+ if (dwpt.getNumDocsInRAM() > 0) {
+ synchronized(this) {
+ if (!perThread.flushPending) {
+ setFlushPending(perThread);
+ }
+ final DocumentsWriterPerThread flushingDWPT = internalTryCheckOutForFlush(perThread);
+ assert flushingDWPT != null : "DWPT must never be null here since we hold the lock and it holds documents";
+ assert dwpt == flushingDWPT : "flushControl returned different DWPT";
+ fullFlushBuffer.add(flushingDWPT);
+ }
+ } else {
+ if (closed) {
+ perThread.resetWriter(null); // make this state inactive
+ } else {
+ dwpt.initialize();
+ }
+ }
}
/**
@@ -502,7 +544,7 @@ public final class DocumentsWriterFlushC
/**
* Returns <code>true</code> if a full flush is currently running
*/
- synchronized boolean isFullFlush() { // used by assert
+ synchronized boolean isFullFlush() {
return fullFlush;
}
Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java?rev=1155278&r1=1155277&r2=1155278&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java Tue Aug 9 09:22:15 2011
@@ -47,7 +47,7 @@ public class DocumentsWriterPerThread {
abstract static class IndexingChain {
abstract DocConsumer getChain(DocumentsWriterPerThread documentsWriterPerThread);
}
-
+
static final IndexingChain defaultIndexingChain = new IndexingChain() {
@@ -131,7 +131,7 @@ public class DocumentsWriterPerThread {
hasAborted = aborting = true;
try {
if (infoStream != null) {
- message("docWriter: now abort");
+ message("now abort");
}
try {
consumer.abort();
@@ -146,11 +146,11 @@ public class DocumentsWriterPerThread {
} finally {
aborting = false;
if (infoStream != null) {
- message("docWriter: done abort");
+ message("done abort");
}
}
}
-
+ private final static boolean INFO_VERBOSE = false;
final DocumentsWriter parent;
final IndexWriter writer;
final Directory directory;
@@ -223,8 +223,14 @@ public class DocumentsWriterPerThread {
// this call is synchronized on IndexWriter.segmentInfos
segment = writer.newSegmentName();
assert numDocsInRAM == 0;
+ if (INFO_VERBOSE) {
+ message(Thread.currentThread().getName() + " init seg=" + segment + " delQueue=" + deleteQueue);
+ }
+
+ }
+ if (INFO_VERBOSE) {
+ message(Thread.currentThread().getName() + " update delTerm=" + delTerm + " docID=" + docState.docID + " seg=" + segment);
}
-
boolean success = false;
try {
try {
@@ -265,8 +271,13 @@ public class DocumentsWriterPerThread {
// this call is synchronized on IndexWriter.segmentInfos
segment = writer.newSegmentName();
assert numDocsInRAM == 0;
+ if (INFO_VERBOSE) {
+ message(Thread.currentThread().getName() + " init seg=" + segment + " delQueue=" + deleteQueue);
+ }
+ }
+ if (INFO_VERBOSE) {
+ message(Thread.currentThread().getName() + " update delTerm=" + delTerm + " docID=" + docState.docID + " seg=" + segment);
}
-
int docCount = 0;
try {
for(Document doc : docs) {
@@ -552,4 +563,11 @@ public class DocumentsWriterPerThread {
this.infoStream = infoStream;
docState.infoStream = infoStream;
}
+
+ @Override
+ public String toString() {
+ return "DocumentsWriterPerThread [pendingDeletes=" + pendingDeletes
+ + ", segment=" + segment + ", aborting=" + aborting + ", numDocsInRAM="
+ + numDocsInRAM + ", deleteQueue=" + deleteQueue + "]";
+ }
}
Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IndexFileDeleter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IndexFileDeleter.java?rev=1155278&r1=1155277&r2=1155278&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IndexFileDeleter.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IndexFileDeleter.java Tue Aug 9 09:22:15 2011
@@ -447,7 +447,7 @@ final class IndexFileDeleter {
public void checkpoint(SegmentInfos segmentInfos, boolean isCommit) throws IOException {
if (infoStream != null) {
- message("now checkpoint \"" + segmentInfos + "\" [" + segmentInfos.size() + " segments " + "; isCommit = " + isCommit + "]");
+ message("now checkpoint \"" + segmentInfos.toString(directory) + "\" [" + segmentInfos.size() + " segments " + "; isCommit = " + isCommit + "]");
}
// Try again now to delete any previously un-deletable
Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IndexWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IndexWriter.java?rev=1155278&r1=1155277&r2=1155278&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IndexWriter.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IndexWriter.java Tue Aug 9 09:22:15 2011
@@ -354,7 +354,7 @@ public class IndexWriter implements Clos
poolReaders = true;
final IndexReader r;
doBeforeFlush();
- final boolean anySegmentFlushed;
+ boolean anySegmentFlushed = false;
/*
* for releasing a NRT reader we must ensure that
* DW doesn't add any segments or deletes until we are
@@ -382,9 +382,13 @@ public class IndexWriter implements Clos
message("return reader version=" + r.getVersion() + " reader=" + r);
}
}
+ } catch (OutOfMemoryError oom) {
+ handleOOM(oom, "getReader");
+ // never reached but javac disagrees:
+ return null;
} finally {
if (!success && infoStream != null) {
- message("hit exception during while NRT reader");
+ message("hit exception during NRT reader");
}
// Done: finish the full flush!
docWriter.finishFullFlush(success);
@@ -2341,6 +2345,10 @@ public class IndexWriter implements Clos
FrozenBufferedDeletes packet, FrozenBufferedDeletes globalPacket) throws IOException {
// Lock order IW -> BDS
synchronized (bufferedDeletesStream) {
+ if (infoStream != null) {
+ message("publishFlushedSegment");
+ }
+
if (globalPacket != null && globalPacket.any()) {
bufferedDeletesStream.push(globalPacket);
}
@@ -2354,6 +2362,9 @@ public class IndexWriter implements Clos
// generation right away
nextGen = bufferedDeletesStream.getNextGen();
}
+ if (infoStream != null) {
+ message("publish sets newSegment delGen=" + nextGen);
+ }
newSegment.setBufferedDeletesGen(nextGen);
segmentInfos.add(newSegment);
checkpoint();
@@ -2710,19 +2721,82 @@ public class IndexWriter implements Clos
*/
public final void prepareCommit(Map<String,String> commitUserData) throws CorruptIndexException, IOException {
+ if (infoStream != null) {
+ message("prepareCommit: flush");
+ message(" index before flush " + segString());
+ }
+
if (hitOOM) {
throw new IllegalStateException("this writer hit an OutOfMemoryError; cannot commit");
}
- if (pendingCommit != null)
+ if (pendingCommit != null) {
throw new IllegalStateException("prepareCommit was already called with no corresponding call to commit");
+ }
- if (infoStream != null)
- message("prepareCommit: flush");
+ doBeforeFlush();
+ assert testPoint("startDoFlush");
+ SegmentInfos toCommit = null;
+ boolean anySegmentsFlushed = false;
- flush(true, true);
+ // This is copied from doFlush, except it's modified to
+ // clone & incRef the flushed SegmentInfos inside the
+ // sync block:
+
+ try {
+
+ synchronized (fullFlushLock) {
+ boolean flushSuccess = false;
+ boolean success = false;
+ try {
+ anySegmentsFlushed = docWriter.flushAllThreads();
+ if (!anySegmentsFlushed) {
+ // prevent double increment since docWriter#doFlush increments the flushcount
+ // if we flushed anything.
+ flushCount.incrementAndGet();
+ }
+ flushSuccess = true;
+
+ synchronized(this) {
+ maybeApplyDeletes(true);
+
+ readerPool.commit(segmentInfos);
+
+ // Must clone the segmentInfos while we still
+ // hold fullFlushLock and while sync'd so that
+ // no partial changes (eg a delete w/o
+ // corresponding add from an updateDocument) can
+ // sneak into the commit point:
+ toCommit = (SegmentInfos) segmentInfos.clone();
+
+ pendingCommitChangeCount = changeCount;
+
+ // This protects the segmentInfos we are now going
+ // to commit. This is important in case, eg, while
+ // we are trying to sync all referenced files, a
+ // merge completes which would otherwise have
+ // removed the files we are now syncing.
+ deleter.incRef(toCommit, false);
+ }
+ success = true;
+ } finally {
+ if (!success && infoStream != null) {
+ message("hit exception during prepareCommit");
+ }
+ // Done: finish the full flush!
+ docWriter.finishFullFlush(flushSuccess);
+ doAfterFlush();
+ }
+ }
+ } catch (OutOfMemoryError oom) {
+ handleOOM(oom, "prepareCommit");
+ }
+
+ if (anySegmentsFlushed) {
+ maybeMerge();
+ }
- startCommit(commitUserData);
+ startCommit(toCommit, commitUserData);
}
// Used only by commit, below; lock order is commitLock -> IW
@@ -2913,13 +2987,12 @@ public class IndexWriter implements Clos
} else if (infoStream != null) {
message("don't apply deletes now delTermCount=" + bufferedDeletesStream.numTerms() + " bytesUsed=" + bufferedDeletesStream.bytesUsed());
}
-
}
final synchronized void applyAllDeletes() throws IOException {
flushDeletesCount.incrementAndGet();
- final BufferedDeletesStream.ApplyDeletesResult result = bufferedDeletesStream
- .applyDeletes(readerPool, segmentInfos.asList());
+ final BufferedDeletesStream.ApplyDeletesResult result;
+ result = bufferedDeletesStream.applyDeletes(readerPool, segmentInfos.asList());
if (result.anyDeletes) {
checkpoint();
}
@@ -3811,7 +3884,7 @@ public class IndexWriter implements Clos
* if it wasn't already. If that succeeds, then we
* prepare a new segments_N file but do not fully commit
* it. */
- private void startCommit(Map<String,String> commitUserData) throws IOException {
+ private void startCommit(final SegmentInfos toSync, final Map<String,String> commitUserData) throws IOException {
assert testPoint("startStartCommit");
assert pendingCommit == null;
@@ -3822,44 +3895,31 @@ public class IndexWriter implements Clos
try {
- if (infoStream != null)
+ if (infoStream != null) {
message("startCommit(): start");
-
- final SegmentInfos toSync;
- final long myChangeCount;
+ }
synchronized(this) {
assert lastCommitChangeCount <= changeCount;
- myChangeCount = changeCount;
- if (changeCount == lastCommitChangeCount) {
- if (infoStream != null)
+ if (pendingCommitChangeCount == lastCommitChangeCount) {
+ if (infoStream != null) {
message(" skip startCommit(): no changes pending");
+ }
+ deleter.decRef(toSync);
return;
}
- // First, we clone & incref the segmentInfos we intend
- // to sync, then, without locking, we sync() all files
- // referenced by toSync, in the background.
-
- if (infoStream != null)
- message("startCommit index=" + segString(segmentInfos) + " changeCount=" + changeCount);
-
- readerPool.commit(segmentInfos);
- toSync = (SegmentInfos) segmentInfos.clone();
+ if (infoStream != null) {
+ message("startCommit index=" + segString(toSync) + " changeCount=" + changeCount);
+ }
assert filesExist(toSync);
- if (commitUserData != null)
+ if (commitUserData != null) {
toSync.setUserData(commitUserData);
-
- // This protects the segmentInfos we are now going
- // to commit. This is important in case, eg, while
- // we are trying to sync all referenced files, a
- // merge completes which would otherwise have
- // removed the files we are now syncing.
- deleter.incRef(toSync, false);
+ }
}
assert testPoint("midStartCommit");
@@ -3884,19 +3944,18 @@ public class IndexWriter implements Clos
// an exception)
toSync.prepareCommit(directory);
- pendingCommit = toSync;
pendingCommitSet = true;
- pendingCommitChangeCount = myChangeCount;
+ pendingCommit = toSync;
}
- if (infoStream != null)
+ if (infoStream != null) {
message("done all syncs");
+ }
assert testPoint("midStartCommitSuccess");
} finally {
synchronized(this) {
-
// Have our master segmentInfos record the
// generations we just prepared. We do this
// on error or success so we don't
@@ -3908,6 +3967,7 @@ public class IndexWriter implements Clos
message("hit exception committing segments file");
}
+ // Hit exception
deleter.decRef(toSync);
}
}
Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/SegmentInfo.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/SegmentInfo.java?rev=1155278&r1=1155277&r2=1155278&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/SegmentInfo.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/SegmentInfo.java Tue Aug 9 09:22:15 2011
@@ -715,8 +715,14 @@ public final class SegmentInfo implement
if (getHasVectors()) {
s.append('v');
}
- } catch (IOException e) {
- throw new RuntimeException(e);
+ } catch (Throwable e) {
+ // Messy: because getHasVectors may be used in an
+ // un-thread-safe way, and may attempt to open an fnm
+ // file that has since (legitimately) been deleted by
+ // IndexWriter, instead of throwing these exceptions
+ // up, just add v? to indicate we don't know if this
+ // segment has vectors:
+ s.append("v?");
}
s.append(docCount);
Modified: lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestIndexWriterOnDiskFull.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestIndexWriterOnDiskFull.java?rev=1155278&r1=1155277&r2=1155278&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestIndexWriterOnDiskFull.java (original)
+++ lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestIndexWriterOnDiskFull.java Tue Aug 9 09:22:15 2011
@@ -492,7 +492,7 @@ public class TestIndexWriterOnDiskFull e
fail("fake disk full IOExceptions not hit");
} catch (IOException ioe) {
// expected
- assertTrue(ftdm.didFail1);
+ assertTrue(ftdm.didFail1 || ftdm.didFail2);
}
_TestUtil.checkIndex(dir);
ftdm.clearDoFail();
Added: lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestStressNRT.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestStressNRT.java?rev=1155278&view=auto
==============================================================================
--- lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestStressNRT.java (added)
+++ lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestStressNRT.java Tue Aug 9 09:22:15 2011
@@ -0,0 +1,383 @@
+package org.apache.lucene.index;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.lucene.analysis.MockAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.search.ScoreDoc;
+import org.apache.lucene.search.TermQuery;
+import org.apache.lucene.search.TopDocs;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.util.LuceneTestCase;
+import org.apache.lucene.util._TestUtil;
+
+public class TestStressNRT extends LuceneTestCase {
+ volatile IndexReader reader;
+
+ final ConcurrentHashMap<Integer,Long> model = new ConcurrentHashMap<Integer,Long>();
+ Map<Integer,Long> committedModel = new HashMap<Integer,Long>();
+ long snapshotCount;
+ long committedModelClock;
+ volatile int lastId;
+ final String field = "val_l";
+ Object[] syncArr;
+
+ private void initModel(int ndocs) {
+ snapshotCount = 0;
+ committedModelClock = 0;
+ lastId = 0;
+
+ syncArr = new Object[ndocs];
+
+ for (int i=0; i<ndocs; i++) {
+ model.put(i, -1L);
+ syncArr[i] = new Object();
+ }
+ committedModel.putAll(model);
+ }
+
+ public void test() throws Exception {
+ // update variables
+ final int commitPercent = random.nextInt(20);
+ final int softCommitPercent = random.nextInt(100); // what percent of the commits are soft
+ final int deletePercent = random.nextInt(50);
+ final int deleteByQueryPercent = random.nextInt(25);
+ final int ndocs = atLeast(50);
+ final int nWriteThreads = _TestUtil.nextInt(random, 1, TEST_NIGHTLY ? 10 : 5);
+ final int maxConcurrentCommits = _TestUtil.nextInt(random, 1, TEST_NIGHTLY ? 10 : 5); // number of committers at a time... needed if we want to avoid commit errors due to exceeding the max
+
+ final boolean tombstones = random.nextBoolean();
+
+ // query variables
+ final AtomicLong operations = new AtomicLong(atLeast(50000)); // number of query operations to perform in total
+
+ final int nReadThreads = _TestUtil.nextInt(random, 1, TEST_NIGHTLY ? 10 : 5);
+ initModel(ndocs);
+
+ if (VERBOSE) {
+ System.out.println("\n");
+ System.out.println("TEST: commitPercent=" + commitPercent);
+ System.out.println("TEST: softCommitPercent=" + softCommitPercent);
+ System.out.println("TEST: deletePercent=" + deletePercent);
+ System.out.println("TEST: deleteByQueryPercent=" + deleteByQueryPercent);
+ System.out.println("TEST: ndocs=" + ndocs);
+ System.out.println("TEST: nWriteThreads=" + nWriteThreads);
+ System.out.println("TEST: nReadThreads=" + nReadThreads);
+ System.out.println("TEST: maxConcurrentCommits=" + maxConcurrentCommits);
+ System.out.println("TEST: tombstones=" + tombstones);
+ System.out.println("TEST: operations=" + operations);
+ System.out.println("\n");
+ }
+
+ final AtomicInteger numCommitting = new AtomicInteger();
+
+ List<Thread> threads = new ArrayList<Thread>();
+
+ Directory dir = newDirectory();
+
+ final RandomIndexWriter writer = new RandomIndexWriter(random, dir, newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random)));
+ writer.setDoRandomOptimizeAssert(false);
+ writer.w.setInfoStream(VERBOSE ? System.out : null);
+ writer.commit();
+ reader = IndexReader.open(dir);
+
+ for (int i=0; i<nWriteThreads; i++) {
+ Thread thread = new Thread("WRITER"+i) {
+ Random rand = new Random(random.nextInt());
+
+ @Override
+ public void run() {
+ try {
+ while (operations.get() > 0) {
+ int oper = rand.nextInt(100);
+
+ if (oper < commitPercent) {
+ if (numCommitting.incrementAndGet() <= maxConcurrentCommits) {
+ Map<Integer,Long> newCommittedModel;
+ long version;
+ IndexReader oldReader;
+
+ synchronized(TestStressNRT.this) {
+ newCommittedModel = new HashMap<Integer,Long>(model); // take a snapshot
+ version = snapshotCount++;
+ oldReader = reader;
+ oldReader.incRef(); // increment the reference since we will use this for reopening
+ }
+
+ IndexReader newReader;
+ if (rand.nextInt(100) < softCommitPercent) {
+ // assertU(h.commit("softCommit","true"));
+ if (random.nextBoolean()) {
+ if (VERBOSE) {
+ System.out.println("TEST: " + Thread.currentThread().getName() + ": call writer.getReader");
+ }
+ newReader = writer.getReader(true);
+ } else {
+ if (VERBOSE) {
+ System.out.println("TEST: " + Thread.currentThread().getName() + ": reopen reader=" + oldReader + " version=" + version);
+ }
+ newReader = oldReader.reopen(writer.w, true);
+ }
+ } else {
+ // assertU(commit());
+ if (VERBOSE) {
+ System.out.println("TEST: " + Thread.currentThread().getName() + ": commit+reopen reader=" + oldReader + " version=" + version);
+ }
+ writer.commit();
+ if (VERBOSE) {
+ System.out.println("TEST: " + Thread.currentThread().getName() + ": now reopen after commit");
+ }
+ newReader = oldReader.reopen();
+ }
+
+ // Code below assumes newReader comes w/
+ // extra ref:
+ if (newReader == oldReader) {
+ newReader.incRef();
+ }
+
+ oldReader.decRef();
+
+ synchronized(TestStressNRT.this) {
+ // install the new reader if it's newest (and check the current version since another reader may have already been installed)
+ //System.out.println(Thread.currentThread().getName() + ": newVersion=" + newReader.getVersion());
+ assert newReader.getRefCount() > 0;
+ assert reader.getRefCount() > 0;
+ if (newReader.getVersion() > reader.getVersion()) {
+ if (VERBOSE) {
+ System.out.println("TEST: " + Thread.currentThread().getName() + ": install new reader=" + newReader);
+ }
+ reader.decRef();
+ reader = newReader;
+
+ // Silly: forces fieldInfos to be
+ // loaded so we don't hit IOE on later
+ // reader.toString
+ newReader.toString();
+
+ // install this snapshot only if it's newer than the current one
+ if (version >= committedModelClock) {
+ if (VERBOSE) {
+ System.out.println("TEST: " + Thread.currentThread().getName() + ": install new model version=" + version);
+ }
+ committedModel = newCommittedModel;
+ committedModelClock = version;
+ } else {
+ if (VERBOSE) {
+ System.out.println("TEST: " + Thread.currentThread().getName() + ": skip install new model version=" + version);
+ }
+ }
+ } else {
+ // if the same reader, don't decRef.
+ if (VERBOSE) {
+ System.out.println("TEST: " + Thread.currentThread().getName() + ": skip install new reader=" + newReader);
+ }
+ newReader.decRef();
+ }
+ }
+ }
+ numCommitting.decrementAndGet();
+ } else {
+
+ int id = rand.nextInt(ndocs);
+ Object sync = syncArr[id];
+
+ // set the lastId before we actually change it sometimes to try and
+ // uncover more race conditions between writing and reading
+ boolean before = random.nextBoolean();
+ if (before) {
+ lastId = id;
+ }
+
+ // We can't concurrently update the same document and retain our invariants of increasing values
+ // since we can't guarantee what order the updates will be executed.
+ synchronized (sync) {
+ Long val = model.get(id);
+ long nextVal = Math.abs(val)+1;
+
+ if (oper < commitPercent + deletePercent) {
+ // assertU("<delete><id>" + id + "</id></delete>");
+
+ // add tombstone first
+ if (tombstones) {
+ Document d = new Document();
+ d.add(new Field("id","-"+Integer.toString(id), Field.Store.YES, Field.Index.NOT_ANALYZED_NO_NORMS));
+ d.add(new Field(field, Long.toString(nextVal), Field.Store.YES, Field.Index.NO));
+ writer.updateDocument(new Term("id", "-"+Integer.toString(id)), d);
+ }
+
+ if (VERBOSE) {
+ System.out.println("TEST: " + Thread.currentThread().getName() + ": term delDocs id:" + id + " nextVal=" + nextVal);
+ }
+ writer.deleteDocuments(new Term("id",Integer.toString(id)));
+ model.put(id, -nextVal);
+ } else if (oper < commitPercent + deletePercent + deleteByQueryPercent) {
+ //assertU("<delete><query>id:" + id + "</query></delete>");
+
+ // add tombstone first
+ if (tombstones) {
+ Document d = new Document();
+ d.add(new Field("id","-"+Integer.toString(id), Field.Store.YES, Field.Index.NOT_ANALYZED_NO_NORMS));
+ d.add(new Field(field, Long.toString(nextVal), Field.Store.YES, Field.Index.NO));
+ writer.updateDocument(new Term("id", "-"+Integer.toString(id)), d);
+ }
+
+ if (VERBOSE) {
+ System.out.println("TEST: " + Thread.currentThread().getName() + ": query delDocs id:" + id + " nextVal=" + nextVal);
+ }
+ writer.deleteDocuments(new TermQuery(new Term("id", Integer.toString(id))));
+ model.put(id, -nextVal);
+ } else {
+ // assertU(adoc("id",Integer.toString(id), field, Long.toString(nextVal)));
+ Document d = new Document();
+ d.add(newField("id",Integer.toString(id), Field.Store.YES, Field.Index.NOT_ANALYZED_NO_NORMS));
+ d.add(newField(field, Long.toString(nextVal), Field.Store.YES, Field.Index.NO));
+ if (VERBOSE) {
+ System.out.println("TEST: " + Thread.currentThread().getName() + ": u id:" + id + " val=" + nextVal);
+ }
+ writer.updateDocument(new Term("id", Integer.toString(id)), d);
+ if (tombstones) {
+ // remove tombstone after new addition (this should be optional?)
+ writer.deleteDocuments(new Term("id","-"+Integer.toString(id)));
+ }
+ model.put(id, nextVal);
+ }
+ }
+
+ if (!before) {
+ lastId = id;
+ }
+ }
+ }
+ } catch (Throwable e) {
+ System.out.println(Thread.currentThread().getName() + ": FAILED: unexpected exception");
+ e.printStackTrace(System.out);
+ throw new RuntimeException(e);
+ }
+ }
+ };
+
+ threads.add(thread);
+ }
+
+ for (int i=0; i<nReadThreads; i++) {
+ Thread thread = new Thread("READER"+i) {
+ Random rand = new Random(random.nextInt());
+
+ @Override
+ public void run() {
+ try {
+ while (operations.decrementAndGet() >= 0) {
+ // bias toward a recently changed doc
+ int id = rand.nextInt(100) < 25 ? lastId : rand.nextInt(ndocs);
+
+ // when indexing, we update the index, then the model
+ // so when querying, we should first check the model, and then the index
+
+ long val;
+ IndexReader r;
+ synchronized(TestStressNRT.this) {
+ val = committedModel.get(id);
+ r = reader;
+ r.incRef();
+ }
+
+ if (VERBOSE) {
+ System.out.println("TEST: " + Thread.currentThread().getName() + ": s id=" + id + " val=" + val + " r=" + r.getVersion());
+ }
+
+ // sreq = req("wt","json", "q","id:"+Integer.toString(id), "omitHeader","true");
+ IndexSearcher searcher = new IndexSearcher(r);
+ Query q = new TermQuery(new Term("id",Integer.toString(id)));
+ TopDocs results = searcher.search(q, 10);
+
+ if (results.totalHits == 0 && tombstones) {
+ // if we couldn't find the doc, look for its tombstone
+ q = new TermQuery(new Term("id","-"+Integer.toString(id)));
+ results = searcher.search(q, 1);
+ if (results.totalHits == 0) {
+ if (val == -1L) {
+ // expected... no doc was added yet
+ r.decRef();
+ continue;
+ }
+ fail("No documents or tombstones found for id " + id + ", expected at least " + val + " reader=" + r);
+ }
+ }
+
+ if (results.totalHits == 0 && !tombstones) {
+ // nothing to do - we can't tell anything from a deleted doc without tombstones
+ } else {
+ // we should have found the document, or its tombstone
+ if (results.totalHits != 1) {
+ System.out.println("FAIL: hits id:" + id + " val=" + val);
+ for(ScoreDoc sd : results.scoreDocs) {
+ final Document doc = r.document(sd.doc);
+ System.out.println(" docID=" + sd.doc + " id:" + doc.get("id") + " foundVal=" + doc.get(field));
+ }
+ fail("id=" + id + " reader=" + r + " totalHits=" + results.totalHits);
+ }
+ Document doc = searcher.doc(results.scoreDocs[0].doc);
+ long foundVal = Long.parseLong(doc.get(field));
+ if (foundVal < Math.abs(val)) {
+ fail("foundVal=" + foundVal + " val=" + val + " id=" + id + " reader=" + r);
+ }
+ }
+
+ r.decRef();
+ }
+ } catch (Throwable e) {
+ operations.set(-1L);
+ System.out.println(Thread.currentThread().getName() + ": FAILED: unexpected exception");
+ e.printStackTrace(System.out);
+ throw new RuntimeException(e);
+ }
+ }
+ };
+
+ threads.add(thread);
+ }
+
+ for (Thread thread : threads) {
+ thread.start();
+ }
+
+ for (Thread thread : threads) {
+ thread.join();
+ }
+
+ writer.close();
+ if (VERBOSE) {
+ System.out.println("TEST: close reader=" + reader);
+ }
+ reader.close();
+ dir.close();
+ }
+}
Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java?rev=1155278&r1=1155277&r2=1155278&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java Tue Aug 9 09:22:15 2011
@@ -16,26 +16,12 @@
*/
package org.apache.solr.search;
-import org.apache.lucene.analysis.core.WhitespaceAnalyzer;
-import org.apache.lucene.document.Document;
-import org.apache.lucene.document.Field;
-import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.index.IndexWriterConfig;
-import org.apache.lucene.index.Term;
-import org.apache.lucene.search.IndexSearcher;
-import org.apache.lucene.search.Query;
-import org.apache.lucene.search.TermQuery;
-import org.apache.lucene.search.TopDocs;
-import org.apache.lucene.store.RAMDirectory;
-import org.apache.lucene.util.Version;
import org.apache.noggit.ObjectBuilder;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.common.SolrException;
import org.apache.solr.request.SolrQueryRequest;
import org.junit.BeforeClass;
import org.junit.Test;
-import org.junit.Ignore;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
@@ -123,7 +109,7 @@ public class TestRealTimeGet extends Sol
// query variables
final int percentRealtimeQuery = 0; // realtime get is not implemented yet
- final AtomicLong operations = new AtomicLong(0); // number of query operations to perform in total // TODO: once lucene level passes, we can move on to the solr level
+ final AtomicLong operations = new AtomicLong(1000); // number of query operations to perform in total // TODO: once lucene level passes, we can move on to the solr level
int nReadThreads = 10;
initModel(ndocs);
@@ -272,257 +258,5 @@ public class TestRealTimeGet extends Sol
for (Thread thread : threads) {
thread.join();
}
-
- }
-
-
-
-
- IndexReader reader;
-
- @Ignore
- @Test
- public void testStressLuceneNRT() throws Exception {
- // update variables
- final int commitPercent = 10;
- final int softCommitPercent = 50; // what percent of the commits are soft
- final int deletePercent = 8;
- final int deleteByQueryPercent = 4;
- final int ndocs = 100;
- int nWriteThreads = 10;
- final int maxConcurrentCommits = 2; // number of committers at a time... needed if we want to avoid commit errors due to exceeding the max
- final boolean tombstones = false;
-
- // query variables
- final AtomicLong operations = new AtomicLong(0); // number of query operations to perform in total // TODO: temporarily high due to lack of stability
- int nReadThreads = 10;
-
- initModel(ndocs);
-
- final AtomicInteger numCommitting = new AtomicInteger();
-
- List<Thread> threads = new ArrayList<Thread>();
-
- RAMDirectory dir = new RAMDirectory();
- final IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(Version.LUCENE_40, new WhitespaceAnalyzer(Version.LUCENE_40)));
- writer.commit();
- reader = IndexReader.open(dir);
-
- for (int i=0; i<nWriteThreads; i++) {
- Thread thread = new Thread("WRITER"+i) {
- Random rand = new Random(random.nextInt());
-
- @Override
- public void run() {
- try {
- while (operations.get() > 0) {
- int oper = rand.nextInt(100);
-
- if (oper < commitPercent) {
- if (numCommitting.incrementAndGet() <= maxConcurrentCommits) {
- Map<Integer,Long> newCommittedModel;
- long version;
- IndexReader oldReader;
-
- synchronized(TestRealTimeGet.this) {
- newCommittedModel = new HashMap<Integer,Long>(model); // take a snapshot
- version = snapshotCount++;
- oldReader = reader;
- oldReader.incRef(); // increment the reference since we will use this for reopening
- }
-
- IndexReader newReader;
- if (rand.nextInt(100) < softCommitPercent) {
- // assertU(h.commit("softCommit","true"));
- newReader = oldReader.reopen(writer, true);
- } else {
- // assertU(commit());
- writer.commit();
- newReader = oldReader.reopen();
- }
-
- synchronized(TestRealTimeGet.this) {
- // install the new reader if it's newest (and check the current version since another reader may have already been installed)
- if (newReader.getVersion() > reader.getVersion()) {
- reader.decRef();
- reader = newReader;
-
- // install this snapshot only if it's newer than the current one
- if (version >= committedModelClock) {
- committedModel = newCommittedModel;
- committedModelClock = version;
- }
-
- } else if (newReader != oldReader) {
- // if the same reader, don't decRef.
- newReader.decRef();
- }
-
- oldReader.decRef();
- }
- }
- numCommitting.decrementAndGet();
- continue;
- }
-
-
- int id = rand.nextInt(ndocs);
- Object sync = syncArr[id];
-
- // set the lastId before we actually change it sometimes to try and
- // uncover more race conditions between writing and reading
- boolean before = rand.nextBoolean();
- if (before) {
- lastId = id;
- }
-
- // We can't concurrently update the same document and retain our invariants of increasing values
- // since we can't guarantee what order the updates will be executed.
- synchronized (sync) {
- Long val = model.get(id);
- long nextVal = Math.abs(val)+1;
-
- if (oper < commitPercent + deletePercent) {
- // assertU("<delete><id>" + id + "</id></delete>");
-
- // add tombstone first
- if (tombstones) {
- Document d = new Document();
- d.add(new Field("id","-"+Integer.toString(id), Field.Store.YES, Field.Index.NOT_ANALYZED_NO_NORMS));
- d.add(new Field(field, Long.toString(nextVal), Field.Store.YES, Field.Index.NO));
- writer.updateDocument(new Term("id", "-"+Integer.toString(id)), d);
- }
-
- writer.deleteDocuments(new Term("id",Integer.toString(id)));
- model.put(id, -nextVal);
- } else if (oper < commitPercent + deletePercent + deleteByQueryPercent) {
- //assertU("<delete><query>id:" + id + "</query></delete>");
-
- // add tombstone first
- if (tombstones) {
- Document d = new Document();
- d.add(new Field("id","-"+Integer.toString(id), Field.Store.YES, Field.Index.NOT_ANALYZED_NO_NORMS));
- d.add(new Field(field, Long.toString(nextVal), Field.Store.YES, Field.Index.NO));
- writer.updateDocument(new Term("id", "-"+Integer.toString(id)), d);
- }
-
- writer.deleteDocuments(new TermQuery(new Term("id", Integer.toString(id))));
- model.put(id, -nextVal);
- } else {
- // assertU(adoc("id",Integer.toString(id), field, Long.toString(nextVal)));
- Document d = new Document();
- d.add(new Field("id",Integer.toString(id), Field.Store.YES, Field.Index.NOT_ANALYZED_NO_NORMS));
- d.add(new Field(field, Long.toString(nextVal), Field.Store.YES, Field.Index.NO));
- writer.updateDocument(new Term("id", Integer.toString(id)), d);
-
- if (tombstones) {
- // remove tombstone after new addition (this should be optional?)
- writer.deleteDocuments(new Term("id","-"+Integer.toString(id)));
- }
-
- model.put(id, nextVal);
- }
- }
-
- if (!before) {
- lastId = id;
- }
- }
- } catch (Exception ex) {
- throw new RuntimeException(ex);
- }
- }
- };
-
- threads.add(thread);
- }
-
-
- for (int i=0; i<nReadThreads; i++) {
- Thread thread = new Thread("READER"+i) {
- Random rand = new Random(random.nextInt());
-
- @Override
- public void run() {
- try {
- while (operations.decrementAndGet() >= 0) {
- // bias toward a recently changed doc
- int id = rand.nextInt(100) < 25 ? lastId : rand.nextInt(ndocs);
-
- // when indexing, we update the index, then the model
- // so when querying, we should first check the model, and then the index
-
- long val;
-
- synchronized(TestRealTimeGet.this) {
- val = committedModel.get(id);
- }
-
-
- IndexReader r;
- synchronized(TestRealTimeGet.this) {
- r = reader;
- r.incRef();
- }
-
- // sreq = req("wt","json", "q","id:"+Integer.toString(id), "omitHeader","true");
- IndexSearcher searcher = new IndexSearcher(r);
- Query q = new TermQuery(new Term("id",Integer.toString(id)));
- TopDocs results = searcher.search(q, 1);
-
- if (results.totalHits == 0 && tombstones) {
- // if we couldn't find the doc, look for it's tombstone
- q = new TermQuery(new Term("id","-"+Integer.toString(id)));
- results = searcher.search(q, 1);
- if (results.totalHits == 0) {
- if (val == -1L) {
- // expected... no doc was added yet
- continue;
- }
- fail("No documents or tombstones found for id " + id + ", expected at least " + val);
- }
- }
-
- if (results.totalHits == 0 && !tombstones) {
- // nothing to do - we can't tell anything from a deleted doc without tombstones
- } else {
- assertEquals(1, results.totalHits); // we should have found the document, or it's tombstone
- Document doc = searcher.doc(results.scoreDocs[0].doc);
- long foundVal = Long.parseLong(doc.get(field));
- if (foundVal < Math.abs(val)) {
- System.out.println("model_val="+val+" foundVal="+foundVal);
- }
- assertTrue(foundVal >= Math.abs(val));
- }
-
- r.decRef();
- }
- }
- catch (Throwable e) {
- operations.set(-1L);
- SolrException.log(log,e);
- fail(e.toString());
- }
- }
- };
-
- threads.add(thread);
- }
-
-
- for (Thread thread : threads) {
- thread.start();
- }
-
- for (Thread thread : threads) {
- thread.join();
- }
-
- writer.close();
- reader.close();
-
}
-
-
-
}