You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by si...@apache.org on 2011/08/09 11:22:16 UTC

svn commit: r1155278 - in /lucene/dev/trunk: lucene/ lucene/src/java/org/apache/lucene/index/ lucene/src/test/org/apache/lucene/index/ solr/core/src/test/org/apache/solr/search/

Author: simonw
Date: Tue Aug  9 09:22:15 2011
New Revision: 1155278

URL: http://svn.apache.org/viewvc?rev=1155278&view=rev
Log:
LUCENE-3348: IndexWriter applies wrong deletes during concurrent flush-all

Added:
    lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestStressNRT.java
Modified:
    lucene/dev/trunk/lucene/CHANGES.txt
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/BufferedDeletes.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/BufferedDeletesStream.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DirectoryReader.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IndexFileDeleter.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IndexWriter.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/SegmentInfo.java
    lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestIndexWriterOnDiskFull.java
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java

Modified: lucene/dev/trunk/lucene/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/CHANGES.txt?rev=1155278&r1=1155277&r2=1155278&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/CHANGES.txt (original)
+++ lucene/dev/trunk/lucene/CHANGES.txt Tue Aug  9 09:22:15 2011
@@ -548,6 +548,10 @@ Bug fixes
   lucene version, you get the old buggy behavior for backwards compatibility.  
   (Trejkaz, Robert Muir)
 
+* LUCENE-3348: Fix thread safety hazards in IndexWriter that could
+  rarely cause deletions to be incorrectly applied.  (Yonik Seeley,
+  Simon Willnauer, Mike McCandless)
+
 New Features
 
 * LUCENE-3290: Added FieldInvertState.numUniqueTerms 

Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/BufferedDeletes.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/BufferedDeletes.java?rev=1155278&r1=1155277&r2=1155278&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/BufferedDeletes.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/BufferedDeletes.java Tue Aug  9 09:22:15 2011
@@ -93,7 +93,7 @@ class BufferedDeletes {
     } else {
       String s = "gen=" + gen;
       if (numTermDeletes.get() != 0) {
-        s += " " + numTermDeletes.get() + " deleted terms (unique count=" + terms.size() + ")";
+        s += " " + numTermDeletes.get() + " deleted terms (unique count=" + terms.size() + ") terms=" + terms.keySet();
       }
       if (queries.size() != 0) {
         s += " " + queries.size() + " deleted queries";

Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/BufferedDeletesStream.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/BufferedDeletesStream.java?rev=1155278&r1=1155277&r2=1155278&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/BufferedDeletesStream.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/BufferedDeletesStream.java Tue Aug  9 09:22:15 2011
@@ -372,7 +372,8 @@ class BufferedDeletesStream {
     DocsEnum docs = null;
 
     assert checkDeleteTerm(null);
-
+    
+    //System.out.println(Thread.currentThread().getName() + " del terms reader=" + reader);
     for (Term term : termsIter) {
       // Since we visit terms sorted, we gain performance
       // by re-using the same TermsEnum and seeking only
@@ -401,6 +402,7 @@ class BufferedDeletesStream {
         if (docsEnum != null) {
           while (true) {
             final int docID = docsEnum.nextDoc();
+            //System.out.println(Thread.currentThread().getName() + " del term=" + term + " doc=" + docID);
             if (docID == DocsEnum.NO_MORE_DOCS) {
               break;
             }

Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DirectoryReader.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DirectoryReader.java?rev=1155278&r1=1155277&r2=1155278&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DirectoryReader.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DirectoryReader.java Tue Aug  9 09:22:15 2011
@@ -309,7 +309,7 @@ class DirectoryReader extends IndexReade
     buffer.append('(');
     final String segmentsFile = segmentInfos.getCurrentSegmentFileName();
     if (segmentsFile != null) {
-      buffer.append(segmentsFile);
+      buffer.append(segmentsFile).append(":").append(segmentInfos.getVersion());
     }
     if (writer != null) {
       buffer.append(":nrt");

Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java?rev=1155278&r1=1155277&r2=1155278&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java Tue Aug  9 09:22:15 2011
@@ -165,7 +165,7 @@ final class DocumentsWriter {
   }
   
   private void applyAllDeletes(DocumentsWriterDeleteQueue deleteQueue) throws IOException {
-    if (deleteQueue != null) {
+    if (deleteQueue != null && !flushControl.isFullFlush()) {
       synchronized (ticketQueue) {
         // Freeze and insert the delete flush ticket in the queue
         ticketQueue.add(new FlushTicket(deleteQueue.freezeGlobalBuffer(null), false));
@@ -220,7 +220,7 @@ final class DocumentsWriter {
 
     try {
       if (infoStream != null) {
-        message("docWriter: abort");
+        message("DW: abort");
       }
 
       final Iterator<ThreadState> threadsIterator = perThreadPool.getActivePerThreadsIterator();
@@ -324,7 +324,7 @@ final class DocumentsWriter {
                           final Term delTerm) throws CorruptIndexException, IOException {
     boolean maybeMerge = preUpdate();
 
-    final ThreadState perThread = perThreadPool.getAndLock(Thread.currentThread(), this);
+    final ThreadState perThread = flushControl.obtainAndLock();
     final DocumentsWriterPerThread flushingDWPT;
     
     try {
@@ -356,7 +356,8 @@ final class DocumentsWriter {
 
     boolean maybeMerge = preUpdate();
 
-    final ThreadState perThread = perThreadPool.getAndLock(Thread.currentThread(), this);
+    final ThreadState perThread = flushControl.obtainAndLock();
+
     final DocumentsWriterPerThread flushingDWPT;
     
     try {
@@ -513,6 +514,9 @@ final class DocumentsWriter {
     assert newSegment != null;
     final SegmentInfo segInfo = indexWriter.prepareFlushedSegment(newSegment);
     final BufferedDeletes deletes = newSegment.segmentDeletes;
+    if (infoStream != null) {
+      message(Thread.currentThread().getName() + ": publishFlushedSegment seg-private deletes=" + deletes);  
+    }
     FrozenBufferedDeletes packet = null;
     if (deletes != null && deletes.any()) {
       // Segment private delete
@@ -542,7 +546,10 @@ final class DocumentsWriter {
   final boolean flushAllThreads()
     throws IOException {
     final DocumentsWriterDeleteQueue flushingDeleteQueue;
-
+    if (infoStream != null) {
+      message(Thread.currentThread().getName() + " startFullFlush");
+    }
+    
     synchronized (this) {
       flushingDeleteQueue = deleteQueue;
       /* Cutover to a new delete queue.  This must be synced on the flush control
@@ -564,6 +571,9 @@ final class DocumentsWriter {
       // If a concurrent flush is still in flight wait for it
       flushControl.waitForFlush();  
       if (!anythingFlushed) { // apply deletes if we did not flush any document
+        if (infoStream != null) {
+         message(Thread.currentThread().getName() + ": flush naked frozen global deletes");
+        }
         synchronized (ticketQueue) {
           ticketQueue.add(new FlushTicket(flushingDeleteQueue.freezeGlobalBuffer(null), false));
         }
@@ -576,6 +586,9 @@ final class DocumentsWriter {
   }
   
   final void finishFullFlush(boolean success) {
+    if (infoStream != null) {
+      message(Thread.currentThread().getName() + " finishFullFlush success=" + success);
+    }
     assert setFlushingDeleteQueue(null);
     if (success) {
       // Release the flush lock
@@ -609,7 +622,7 @@ final class DocumentsWriter {
       next.lock();
       try {
         assert !next.isActive();
-      } finally  {
+      } finally {
         next.unlock();
       }
     }

Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java?rev=1155278&r1=1155277&r2=1155278&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java Tue Aug  9 09:22:15 2011
@@ -16,6 +16,8 @@ package org.apache.lucene.index;
  * License for the specific language governing permissions and limitations under
  * the License.
  */
+
+import java.util.Arrays;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -110,6 +112,7 @@ final class DocumentsWriterDeleteQueue {
    */
   void add(Term term, DeleteSlice slice) {
     final TermNode termNode = new TermNode(term);
+//    System.out.println(Thread.currentThread().getName() + ": push " + termNode + " this=" + this);
     add(termNode);
     /*
      * this is an update request where the term is the updated documents
@@ -175,13 +178,14 @@ final class DocumentsWriterDeleteQueue {
   void tryApplyGlobalSlice() {
     if (globalBufferLock.tryLock()) {
       /*
-       * The global buffer must be locked but we don't need to upate them if
+       * The global buffer must be locked but we don't need to update them if
        * there is an update going on right now. It is sufficient to apply the
        * deletes that have been added after the current in-flight global slices
        * tail the next time we can get the lock!
        */
       try {
         if (updateSlice(globalSlice)) {
+//          System.out.println(Thread.currentThread() + ": apply globalSlice");
           globalSlice.apply(globalBufferedDeletes, BufferedDeletes.MAX_INT);
         }
       } finally {
@@ -210,6 +214,7 @@ final class DocumentsWriterDeleteQueue {
         globalSlice.apply(globalBufferedDeletes, BufferedDeletes.MAX_INT);
       }
 
+//      System.out.println(Thread.currentThread().getName() + ": now freeze global buffer " + globalBufferedDeletes);
       final FrozenBufferedDeletes packet = new FrozenBufferedDeletes(
           globalBufferedDeletes, false);
       globalBufferedDeletes.clear();
@@ -262,6 +267,7 @@ final class DocumentsWriterDeleteQueue {
         current = current.next;
         assert current != null : "slice property violated between the head on the tail must not be a null node";
         current.apply(del, docIDUpto);
+//        System.out.println(Thread.currentThread().getName() + ": pull " + current + " docIDUpto=" + docIDUpto);
       } while (current != sliceTail);
       reset();
     }
@@ -330,6 +336,11 @@ final class DocumentsWriterDeleteQueue {
     void apply(BufferedDeletes bufferedDeletes, int docIDUpto) {
       bufferedDeletes.addTerm(item, docIDUpto);
     }
+
+    @Override
+    public String toString() {
+      return "del=" + item;
+    }
   }
 
   private static final class QueryArrayNode extends Node<Query[]> {
@@ -356,6 +367,11 @@ final class DocumentsWriterDeleteQueue {
         bufferedDeletes.addTerm(term, docIDUpto);  
       }
     }
+
+    @Override
+    public String toString() {
+      return "dels=" + Arrays.toString(item);
+    }
   }
 
 

Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java?rev=1155278&r1=1155277&r2=1155278&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java Tue Aug  9 09:22:15 2011
@@ -18,6 +18,7 @@ package org.apache.lucene.index;
  */
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.List;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -362,10 +363,25 @@ public final class DocumentsWriterFlushC
     return this.perThreadPool.getActiveThreadState();
   }
   
+  ThreadState obtainAndLock() {
+    final ThreadState perThread = perThreadPool.getAndLock(Thread
+        .currentThread(), documentsWriter);
+    if (perThread.isActive()
+        && perThread.perThread.deleteQueue != documentsWriter.deleteQueue) {
+      // There is a flush-all in process and this DWPT is
+      // now stale -- enroll it for flush and try for
+      // another DWPT:
+      addFlushableState(perThread);
+    }
+    // simply return the ThreadState even in a flush all case sine we already hold the lock
+    return perThread;
+  }
+  
   void markForFullFlush() {
     final DocumentsWriterDeleteQueue flushingQueue;
     synchronized (this) {
-      assert !fullFlush;
+      assert !fullFlush : "called DWFC#markForFullFlush() while full flush is still running";
+      assert fullFlushBuffer.isEmpty() : "full flush buffer should be empty: "+ fullFlushBuffer;
       fullFlush = true;
       flushingQueue = documentsWriter.deleteQueue;
       // Set a new delete queue - all subsequent DWPT will use this queue until
@@ -373,9 +389,7 @@ public final class DocumentsWriterFlushC
       DocumentsWriterDeleteQueue newQueue = new DocumentsWriterDeleteQueue(flushingQueue.generation+1);
       documentsWriter.deleteQueue = newQueue;
     }
-    final Iterator<ThreadState> allActiveThreads = perThreadPool
-    .getActivePerThreadsIterator();
-    final ArrayList<DocumentsWriterPerThread> toFlush = new ArrayList<DocumentsWriterPerThread>();
+    final Iterator<ThreadState> allActiveThreads = perThreadPool.getActivePerThreadsIterator();
     while (allActiveThreads.hasNext()) {
       final ThreadState next = allActiveThreads.next();
       next.lock();
@@ -395,25 +409,7 @@ public final class DocumentsWriterFlushC
           // this one is already a new DWPT
           continue;
         }
-        if (next.perThread.getNumDocsInRAM() > 0 ) {
-          final DocumentsWriterPerThread dwpt = next.perThread; // just for assert
-          synchronized (this) {
-            if (!next.flushPending) {
-              setFlushPending(next);
-            }
-            final DocumentsWriterPerThread flushingDWPT = internalTryCheckOutForFlush(next);
-            assert flushingDWPT != null : "DWPT must never be null here since we hold the lock and it holds documents";
-            assert dwpt == flushingDWPT : "flushControl returned different DWPT";
-            toFlush.add(flushingDWPT);
-          }
-        } else {
-          if (closed) {
-            next.resetWriter(null); // make this state inactive
-          } else {
-            // get the new delete queue from DW
-            next.perThread.initialize();
-          }
-        }
+        addFlushableState(next);
       } finally {
         next.unlock();
       }
@@ -425,9 +421,55 @@ public final class DocumentsWriterFlushC
        * blocking indexing.*/
       pruneBlockedQueue(flushingQueue);   
       assert assertBlockedFlushes(documentsWriter.deleteQueue);
-      flushQueue.addAll(toFlush);
+      flushQueue.addAll(fullFlushBuffer);
+      fullFlushBuffer.clear();
       stallControl.updateStalled(this);
     }
+    assert assertActiveDeleteQueue(documentsWriter.deleteQueue);
+  }
+  
+  private boolean assertActiveDeleteQueue(DocumentsWriterDeleteQueue queue) {
+    final Iterator<ThreadState> allActiveThreads = perThreadPool.getActivePerThreadsIterator();
+    while (allActiveThreads.hasNext()) {
+      final ThreadState next = allActiveThreads.next();
+      next.lock();
+      try {
+        assert !next.isActive() || next.perThread.deleteQueue == queue;
+      } finally {
+        next.unlock();
+      }
+    }
+    return true;
+  }
+
+  private final List<DocumentsWriterPerThread> fullFlushBuffer = new ArrayList<DocumentsWriterPerThread>();
+
+  void addFlushableState(ThreadState perThread) {
+    if (documentsWriter.infoStream != null) {
+      documentsWriter.message("FC: " + Thread.currentThread().getName() + ": addFlushableState " + perThread.perThread);
+    }
+    final DocumentsWriterPerThread dwpt = perThread.perThread;
+    assert perThread.isHeldByCurrentThread();
+    assert perThread.isActive();
+    assert fullFlush;
+    assert dwpt.deleteQueue != documentsWriter.deleteQueue;
+    if (dwpt.getNumDocsInRAM() > 0) {
+      synchronized(this) {
+        if (!perThread.flushPending) {
+          setFlushPending(perThread);
+        }
+        final DocumentsWriterPerThread flushingDWPT = internalTryCheckOutForFlush(perThread);
+        assert flushingDWPT != null : "DWPT must never be null here since we hold the lock and it holds documents";
+        assert dwpt == flushingDWPT : "flushControl returned different DWPT";
+        fullFlushBuffer.add(flushingDWPT);
+      }
+    } else {
+      if (closed) {
+        perThread.resetWriter(null); // make this state inactive
+      } else {
+        dwpt.initialize();
+      }
+    }
   }
   
   /**
@@ -502,7 +544,7 @@ public final class DocumentsWriterFlushC
   /**
    * Returns <code>true</code> if a full flush is currently running
    */
-  synchronized boolean isFullFlush() { // used by assert
+  synchronized boolean isFullFlush() {
     return fullFlush;
   }
 

Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java?rev=1155278&r1=1155277&r2=1155278&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java Tue Aug  9 09:22:15 2011
@@ -47,7 +47,7 @@ public class DocumentsWriterPerThread {
   abstract static class IndexingChain {
     abstract DocConsumer getChain(DocumentsWriterPerThread documentsWriterPerThread);
   }
-
+  
 
   static final IndexingChain defaultIndexingChain = new IndexingChain() {
 
@@ -131,7 +131,7 @@ public class DocumentsWriterPerThread {
     hasAborted = aborting = true;
     try {
       if (infoStream != null) {
-        message("docWriter: now abort");
+        message("now abort");
       }
       try {
         consumer.abort();
@@ -146,11 +146,11 @@ public class DocumentsWriterPerThread {
     } finally {
       aborting = false;
       if (infoStream != null) {
-        message("docWriter: done abort");
+        message("done abort");
       }
     }
   }
-
+  private final static boolean INFO_VERBOSE = false;
   final DocumentsWriter parent;
   final IndexWriter writer;
   final Directory directory;
@@ -223,8 +223,14 @@ public class DocumentsWriterPerThread {
       // this call is synchronized on IndexWriter.segmentInfos
       segment = writer.newSegmentName();
       assert numDocsInRAM == 0;
+      if (INFO_VERBOSE) {
+        message(Thread.currentThread().getName() + " init seg=" + segment + " delQueue=" + deleteQueue);  
+      }
+      
+    }
+    if (INFO_VERBOSE) {
+      message(Thread.currentThread().getName() + " update delTerm=" + delTerm + " docID=" + docState.docID + " seg=" + segment);
     }
-
     boolean success = false;
     try {
       try {
@@ -265,8 +271,13 @@ public class DocumentsWriterPerThread {
       // this call is synchronized on IndexWriter.segmentInfos
       segment = writer.newSegmentName();
       assert numDocsInRAM == 0;
+      if (INFO_VERBOSE) {
+        message(Thread.currentThread().getName() + " init seg=" + segment + " delQueue=" + deleteQueue);  
+      }
+    }
+    if (INFO_VERBOSE) {
+      message(Thread.currentThread().getName() + " update delTerm=" + delTerm + " docID=" + docState.docID + " seg=" + segment);
     }
-
     int docCount = 0;
     try {
       for(Document doc : docs) {
@@ -552,4 +563,11 @@ public class DocumentsWriterPerThread {
     this.infoStream = infoStream;
     docState.infoStream = infoStream;
   }
+
+  @Override
+  public String toString() {
+    return "DocumentsWriterPerThread [pendingDeletes=" + pendingDeletes
+        + ", segment=" + segment + ", aborting=" + aborting + ", numDocsInRAM="
+        + numDocsInRAM + ", deleteQueue=" + deleteQueue + "]";
+  }
 }

Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IndexFileDeleter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IndexFileDeleter.java?rev=1155278&r1=1155277&r2=1155278&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IndexFileDeleter.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IndexFileDeleter.java Tue Aug  9 09:22:15 2011
@@ -447,7 +447,7 @@ final class IndexFileDeleter {
   public void checkpoint(SegmentInfos segmentInfos, boolean isCommit) throws IOException {
 
     if (infoStream != null) {
-      message("now checkpoint \"" + segmentInfos + "\" [" + segmentInfos.size() + " segments " + "; isCommit = " + isCommit + "]");
+      message("now checkpoint \"" + segmentInfos.toString(directory) + "\" [" + segmentInfos.size() + " segments " + "; isCommit = " + isCommit + "]");
     }
 
     // Try again now to delete any previously un-deletable

Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IndexWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IndexWriter.java?rev=1155278&r1=1155277&r2=1155278&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IndexWriter.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IndexWriter.java Tue Aug  9 09:22:15 2011
@@ -354,7 +354,7 @@ public class IndexWriter implements Clos
     poolReaders = true;
     final IndexReader r;
     doBeforeFlush();
-    final boolean anySegmentFlushed;
+    boolean anySegmentFlushed = false;
     /*
      * for releasing a NRT reader we must ensure that 
      * DW doesn't add any segments or deletes until we are
@@ -382,9 +382,13 @@ public class IndexWriter implements Clos
             message("return reader version=" + r.getVersion() + " reader=" + r);
           }
         }
+      } catch (OutOfMemoryError oom) {
+        handleOOM(oom, "getReader");
+        // never reached but javac disagrees:
+        return null;
       } finally {
         if (!success && infoStream != null) {
-          message("hit exception during while NRT reader");
+          message("hit exception during NRT reader");
         }
         // Done: finish the full flush!
         docWriter.finishFullFlush(success);
@@ -2341,6 +2345,10 @@ public class IndexWriter implements Clos
       FrozenBufferedDeletes packet, FrozenBufferedDeletes globalPacket) throws IOException {
     // Lock order IW -> BDS
     synchronized (bufferedDeletesStream) {
+      if (infoStream != null) {
+        message("publishFlushedSegment");  
+      }
+      
       if (globalPacket != null && globalPacket.any()) {
         bufferedDeletesStream.push(globalPacket);
       } 
@@ -2354,6 +2362,9 @@ public class IndexWriter implements Clos
         // generation right away
         nextGen = bufferedDeletesStream.getNextGen();
       }
+      if (infoStream != null) {
+        message("publish sets newSegment delGen=" + nextGen);
+      }
       newSegment.setBufferedDeletesGen(nextGen);
       segmentInfos.add(newSegment);
       checkpoint();
@@ -2710,19 +2721,82 @@ public class IndexWriter implements Clos
    */
   public final void prepareCommit(Map<String,String> commitUserData) throws CorruptIndexException, IOException {
 
+    if (infoStream != null) {
+      message("prepareCommit: flush");
+      message("  index before flush " + segString());
+    }
+
     if (hitOOM) {
       throw new IllegalStateException("this writer hit an OutOfMemoryError; cannot commit");
     }
 
-    if (pendingCommit != null)
+    if (pendingCommit != null) {
       throw new IllegalStateException("prepareCommit was already called with no corresponding call to commit");
+    }
 
-    if (infoStream != null)
-      message("prepareCommit: flush");
+    doBeforeFlush();
+    assert testPoint("startDoFlush");
+    SegmentInfos toCommit = null;
+    boolean anySegmentsFlushed = false;
 
-    flush(true, true);
+    // This is copied from doFlush, except it's modified to
+    // clone & incRef the flushed SegmentInfos inside the
+    // sync block:
+
+    try {
+
+      synchronized (fullFlushLock) {
+        boolean flushSuccess = false;
+        boolean success = false;
+        try {
+          anySegmentsFlushed = docWriter.flushAllThreads();
+          if (!anySegmentsFlushed) {
+            // prevent double increment since docWriter#doFlush increments the flushcount
+            // if we flushed anything.
+            flushCount.incrementAndGet();
+          }
+          flushSuccess = true;
+
+          synchronized(this) {
+            maybeApplyDeletes(true);
+
+            readerPool.commit(segmentInfos);
+
+            // Must clone the segmentInfos while we still
+            // hold fullFlushLock and while sync'd so that
+            // no partial changes (eg a delete w/o
+            // corresponding add from an updateDocument) can
+            // sneak into the commit point:
+            toCommit = (SegmentInfos) segmentInfos.clone();
+
+            pendingCommitChangeCount = changeCount;
+
+            // This protects the segmentInfos we are now going
+            // to commit.  This is important in case, eg, while
+            // we are trying to sync all referenced files, a
+            // merge completes which would otherwise have
+            // removed the files we are now syncing.
+            deleter.incRef(toCommit, false);
+          }
+          success = true;
+        } finally {
+          if (!success && infoStream != null) {
+            message("hit exception during prepareCommit");
+          }
+          // Done: finish the full flush!
+          docWriter.finishFullFlush(flushSuccess);
+          doAfterFlush();
+        }
+      }
+    } catch (OutOfMemoryError oom) {
+      handleOOM(oom, "prepareCommit");
+    }
+
+    if (anySegmentsFlushed) {
+      maybeMerge();
+    }
 
-    startCommit(commitUserData);
+    startCommit(toCommit, commitUserData);
   }
 
   // Used only by commit, below; lock order is commitLock -> IW
@@ -2913,13 +2987,12 @@ public class IndexWriter implements Clos
     } else if (infoStream != null) {
       message("don't apply deletes now delTermCount=" + bufferedDeletesStream.numTerms() + " bytesUsed=" + bufferedDeletesStream.bytesUsed());
     }
-
   }
   
   final synchronized void applyAllDeletes() throws IOException {
     flushDeletesCount.incrementAndGet();
-    final BufferedDeletesStream.ApplyDeletesResult result = bufferedDeletesStream
-      .applyDeletes(readerPool, segmentInfos.asList());
+    final BufferedDeletesStream.ApplyDeletesResult result;
+    result = bufferedDeletesStream.applyDeletes(readerPool, segmentInfos.asList());
     if (result.anyDeletes) {
       checkpoint();
     }
@@ -3811,7 +3884,7 @@ public class IndexWriter implements Clos
    *  if it wasn't already.  If that succeeds, then we
    *  prepare a new segments_N file but do not fully commit
    *  it. */
-  private void startCommit(Map<String,String> commitUserData) throws IOException {
+  private void startCommit(final SegmentInfos toSync, final Map<String,String> commitUserData) throws IOException {
 
     assert testPoint("startStartCommit");
     assert pendingCommit == null;
@@ -3822,44 +3895,31 @@ public class IndexWriter implements Clos
 
     try {
 
-      if (infoStream != null)
+      if (infoStream != null) {
         message("startCommit(): start");
-
-      final SegmentInfos toSync;
-      final long myChangeCount;
+      }
 
       synchronized(this) {
 
         assert lastCommitChangeCount <= changeCount;
-        myChangeCount = changeCount;
 
-        if (changeCount == lastCommitChangeCount) {
-          if (infoStream != null)
+        if (pendingCommitChangeCount == lastCommitChangeCount) {
+          if (infoStream != null) {
             message("  skip startCommit(): no changes pending");
+          }
+          deleter.decRef(toSync);
           return;
         }
 
-        // First, we clone & incref the segmentInfos we intend
-        // to sync, then, without locking, we sync() all files
-        // referenced by toSync, in the background.
-
-        if (infoStream != null)
-          message("startCommit index=" + segString(segmentInfos) + " changeCount=" + changeCount);
-
-        readerPool.commit(segmentInfos);
-        toSync = (SegmentInfos) segmentInfos.clone();
+        if (infoStream != null) {
+          message("startCommit index=" + segString(toSync) + " changeCount=" + changeCount);
+        }
 
         assert filesExist(toSync);
 
-        if (commitUserData != null)
+        if (commitUserData != null) {
           toSync.setUserData(commitUserData);
-
-        // This protects the segmentInfos we are now going
-        // to commit.  This is important in case, eg, while
-        // we are trying to sync all referenced files, a
-        // merge completes which would otherwise have
-        // removed the files we are now syncing.
-        deleter.incRef(toSync, false);
+        }
       }
 
       assert testPoint("midStartCommit");
@@ -3884,19 +3944,18 @@ public class IndexWriter implements Clos
           // an exception)
           toSync.prepareCommit(directory);
 
-          pendingCommit = toSync;
           pendingCommitSet = true;
-          pendingCommitChangeCount = myChangeCount;
+          pendingCommit = toSync;
         }
 
-        if (infoStream != null)
+        if (infoStream != null) {
           message("done all syncs");
+        }
 
         assert testPoint("midStartCommitSuccess");
 
       } finally {
         synchronized(this) {
-
           // Have our master segmentInfos record the
           // generations we just prepared.  We do this
           // on error or success so we don't
@@ -3908,6 +3967,7 @@ public class IndexWriter implements Clos
               message("hit exception committing segments file");
             }
 
+            // Hit exception
             deleter.decRef(toSync);
           }
         }

Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/SegmentInfo.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/SegmentInfo.java?rev=1155278&r1=1155277&r2=1155278&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/SegmentInfo.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/SegmentInfo.java Tue Aug  9 09:22:15 2011
@@ -715,8 +715,14 @@ public final class SegmentInfo implement
       if (getHasVectors()) {
         s.append('v');
       }
-    } catch (IOException e) {
-      throw new RuntimeException(e);
+    } catch (Throwable e) {
+      // Messy: because getHasVectors may be used in an
+      // un-thread-safe way, and may attempt to open an fnm
+      // file that has since (legitimately) been deleted by
+      // IndexWriter, instead of throwing these exceptions
+      // up, just add v? to indicate we don't know if this
+      // segment has vectors:
+      s.append("v?");
     }
     s.append(docCount);
 

Modified: lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestIndexWriterOnDiskFull.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestIndexWriterOnDiskFull.java?rev=1155278&r1=1155277&r2=1155278&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestIndexWriterOnDiskFull.java (original)
+++ lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestIndexWriterOnDiskFull.java Tue Aug  9 09:22:15 2011
@@ -492,7 +492,7 @@ public class TestIndexWriterOnDiskFull e
       fail("fake disk full IOExceptions not hit");
     } catch (IOException ioe) {
       // expected
-      assertTrue(ftdm.didFail1);
+      assertTrue(ftdm.didFail1 || ftdm.didFail2);
     }
     _TestUtil.checkIndex(dir);
     ftdm.clearDoFail();

Added: lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestStressNRT.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestStressNRT.java?rev=1155278&view=auto
==============================================================================
--- lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestStressNRT.java (added)
+++ lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestStressNRT.java Tue Aug  9 09:22:15 2011
@@ -0,0 +1,383 @@
+package org.apache.lucene.index;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.lucene.analysis.MockAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.search.ScoreDoc;
+import org.apache.lucene.search.TermQuery;
+import org.apache.lucene.search.TopDocs;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.util.LuceneTestCase;
+import org.apache.lucene.util._TestUtil;
+
+public class TestStressNRT extends LuceneTestCase {
+  volatile IndexReader reader;
+
+  final ConcurrentHashMap<Integer,Long> model = new ConcurrentHashMap<Integer,Long>();
+  Map<Integer,Long> committedModel = new HashMap<Integer,Long>();
+  long snapshotCount;
+  long committedModelClock;
+  volatile int lastId;
+  final String field = "val_l";
+  Object[] syncArr;
+
+  private void initModel(int ndocs) {
+    snapshotCount = 0;
+    committedModelClock = 0;
+    lastId = 0;
+
+    syncArr = new Object[ndocs];
+
+    for (int i=0; i<ndocs; i++) {
+      model.put(i, -1L);
+      syncArr[i] = new Object();
+    }
+    committedModel.putAll(model);
+  }
+
+  public void test() throws Exception {
+    // update variables
+    final int commitPercent = random.nextInt(20);
+    final int softCommitPercent = random.nextInt(100); // what percent of the commits are soft
+    final int deletePercent = random.nextInt(50);
+    final int deleteByQueryPercent = random.nextInt(25);
+    final int ndocs = atLeast(50);
+    final int nWriteThreads = _TestUtil.nextInt(random, 1, TEST_NIGHTLY ? 10 : 5);
+    final int maxConcurrentCommits = _TestUtil.nextInt(random, 1, TEST_NIGHTLY ? 10 : 5);   // number of committers at a time... needed if we want to avoid commit errors due to exceeding the max
+    
+    final boolean tombstones = random.nextBoolean();
+
+    // query variables
+    final AtomicLong operations = new AtomicLong(atLeast(50000));  // number of query operations to perform in total
+
+    final int nReadThreads = _TestUtil.nextInt(random, 1, TEST_NIGHTLY ? 10 : 5);
+    initModel(ndocs);
+
+    if (VERBOSE) {
+      System.out.println("\n");
+      System.out.println("TEST: commitPercent=" + commitPercent);
+      System.out.println("TEST: softCommitPercent=" + softCommitPercent);
+      System.out.println("TEST: deletePercent=" + deletePercent);
+      System.out.println("TEST: deleteByQueryPercent=" + deleteByQueryPercent);
+      System.out.println("TEST: ndocs=" + ndocs);
+      System.out.println("TEST: nWriteThreads=" + nWriteThreads);
+      System.out.println("TEST: nReadThreads=" + nReadThreads);
+      System.out.println("TEST: maxConcurrentCommits=" + maxConcurrentCommits);
+      System.out.println("TEST: tombstones=" + tombstones);
+      System.out.println("TEST: operations=" + operations);
+      System.out.println("\n");
+    }
+
+    final AtomicInteger numCommitting = new AtomicInteger();
+
+    List<Thread> threads = new ArrayList<Thread>();
+
+    Directory dir = newDirectory();
+
+    final RandomIndexWriter writer = new RandomIndexWriter(random, dir, newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random)));
+    writer.setDoRandomOptimizeAssert(false);
+    writer.w.setInfoStream(VERBOSE ? System.out : null);
+    writer.commit();
+    reader = IndexReader.open(dir);
+
+    for (int i=0; i<nWriteThreads; i++) {
+      Thread thread = new Thread("WRITER"+i) {
+        Random rand = new Random(random.nextInt());
+
+        @Override
+        public void run() {
+          try {
+            while (operations.get() > 0) {
+              int oper = rand.nextInt(100);
+
+              if (oper < commitPercent) {
+                if (numCommitting.incrementAndGet() <= maxConcurrentCommits) {
+                  Map<Integer,Long> newCommittedModel;
+                  long version;
+                  IndexReader oldReader;
+
+                  synchronized(TestStressNRT.this) {
+                    newCommittedModel = new HashMap<Integer,Long>(model);  // take a snapshot
+                    version = snapshotCount++;
+                    oldReader = reader;
+                    oldReader.incRef();  // increment the reference since we will use this for reopening
+                  }
+
+                  IndexReader newReader;
+                  if (rand.nextInt(100) < softCommitPercent) {
+                    // assertU(h.commit("softCommit","true"));
+                    if (random.nextBoolean()) {
+                      if (VERBOSE) {
+                        System.out.println("TEST: " + Thread.currentThread().getName() + ": call writer.getReader");
+                      }
+                      newReader = writer.getReader(true);
+                    } else {
+                      if (VERBOSE) {
+                        System.out.println("TEST: " + Thread.currentThread().getName() + ": reopen reader=" + oldReader + " version=" + version);
+                      }
+                      newReader = oldReader.reopen(writer.w, true);
+                    }
+                  } else {
+                    // assertU(commit());
+                    if (VERBOSE) {
+                      System.out.println("TEST: " + Thread.currentThread().getName() + ": commit+reopen reader=" + oldReader + " version=" + version);
+                    }
+                    writer.commit();
+                    if (VERBOSE) {
+                      System.out.println("TEST: " + Thread.currentThread().getName() + ": now reopen after commit");
+                    }
+                    newReader = oldReader.reopen();
+                  }
+
+                  // Code below assumes newReader comes w/
+                  // extra ref:
+                  if (newReader == oldReader) {
+                    newReader.incRef();
+                  }
+
+                  oldReader.decRef();
+
+                  synchronized(TestStressNRT.this) {
+                    // install the new reader if it's newest (and check the current version since another reader may have already been installed)
+                    //System.out.println(Thread.currentThread().getName() + ": newVersion=" + newReader.getVersion());
+                    assert newReader.getRefCount() > 0;
+                    assert reader.getRefCount() > 0;
+                    if (newReader.getVersion() > reader.getVersion()) {
+                      if (VERBOSE) {
+                        System.out.println("TEST: " + Thread.currentThread().getName() + ": install new reader=" + newReader);
+                      }
+                      reader.decRef();
+                      reader = newReader;
+
+                      // Silly: forces fieldInfos to be
+                      // loaded so we don't hit IOE on later
+                      // reader.toString
+                      newReader.toString();
+
+                      // install this snapshot only if it's newer than the current one
+                      if (version >= committedModelClock) {
+                        if (VERBOSE) {
+                          System.out.println("TEST: " + Thread.currentThread().getName() + ": install new model version=" + version);
+                        }
+                        committedModel = newCommittedModel;
+                        committedModelClock = version;
+                      } else {
+                        if (VERBOSE) {
+                          System.out.println("TEST: " + Thread.currentThread().getName() + ": skip install new model version=" + version);
+                        }
+                      }
+                    } else {
+                      // if the same reader, don't decRef.
+                      if (VERBOSE) {
+                        System.out.println("TEST: " + Thread.currentThread().getName() + ": skip install new reader=" + newReader);
+                      }
+                      newReader.decRef();
+                    }
+                  }
+                }
+                numCommitting.decrementAndGet();
+              } else {
+
+                int id = rand.nextInt(ndocs);
+                Object sync = syncArr[id];
+
+                // set the lastId before we actually change it sometimes to try and
+                // uncover more race conditions between writing and reading
+                boolean before = random.nextBoolean();
+                if (before) {
+                  lastId = id;
+                }
+
+                // We can't concurrently update the same document and retain our invariants of increasing values
+                // since we can't guarantee what order the updates will be executed.
+                synchronized (sync) {
+                  Long val = model.get(id);
+                  long nextVal = Math.abs(val)+1;
+
+                  if (oper < commitPercent + deletePercent) {
+                    // assertU("<delete><id>" + id + "</id></delete>");
+
+                    // add tombstone first
+                    if (tombstones) {
+                      Document d = new Document();
+                      d.add(new Field("id","-"+Integer.toString(id), Field.Store.YES, Field.Index.NOT_ANALYZED_NO_NORMS));
+                      d.add(new Field(field, Long.toString(nextVal), Field.Store.YES, Field.Index.NO));
+                      writer.updateDocument(new Term("id", "-"+Integer.toString(id)), d);
+                    }
+
+                    if (VERBOSE) {
+                      System.out.println("TEST: " + Thread.currentThread().getName() + ": term delDocs id:" + id + " nextVal=" + nextVal);
+                    }
+                    writer.deleteDocuments(new Term("id",Integer.toString(id)));
+                    model.put(id, -nextVal);
+                  } else if (oper < commitPercent + deletePercent + deleteByQueryPercent) {
+                    //assertU("<delete><query>id:" + id + "</query></delete>");
+
+                    // add tombstone first
+                    if (tombstones) {
+                      Document d = new Document();
+                      d.add(new Field("id","-"+Integer.toString(id), Field.Store.YES, Field.Index.NOT_ANALYZED_NO_NORMS));
+                      d.add(new Field(field, Long.toString(nextVal), Field.Store.YES, Field.Index.NO));
+                      writer.updateDocument(new Term("id", "-"+Integer.toString(id)), d);
+                    }
+
+                    if (VERBOSE) {
+                      System.out.println("TEST: " + Thread.currentThread().getName() + ": query delDocs id:" + id + " nextVal=" + nextVal);
+                    }
+                    writer.deleteDocuments(new TermQuery(new Term("id", Integer.toString(id))));
+                    model.put(id, -nextVal);
+                  } else {
+                    // assertU(adoc("id",Integer.toString(id), field, Long.toString(nextVal)));
+                    Document d = new Document();
+                    d.add(newField("id",Integer.toString(id), Field.Store.YES, Field.Index.NOT_ANALYZED_NO_NORMS));
+                    d.add(newField(field, Long.toString(nextVal), Field.Store.YES, Field.Index.NO));
+                    if (VERBOSE) {
+                      System.out.println("TEST: " + Thread.currentThread().getName() + ": u id:" + id + " val=" + nextVal);
+                    }
+                    writer.updateDocument(new Term("id", Integer.toString(id)), d);
+                    if (tombstones) {
+                      // remove tombstone after new addition (this should be optional?)
+                      writer.deleteDocuments(new Term("id","-"+Integer.toString(id)));
+                    }
+                    model.put(id, nextVal);
+                  }
+                }
+
+                if (!before) {
+                  lastId = id;
+                }
+              }
+            }
+          } catch (Throwable e) {
+            System.out.println(Thread.currentThread().getName() + ": FAILED: unexpected exception");
+            e.printStackTrace(System.out);
+            throw new RuntimeException(e);
+          }
+        }
+      };
+
+      threads.add(thread);
+    }
+
+    for (int i=0; i<nReadThreads; i++) {
+      Thread thread = new Thread("READER"+i) {
+        Random rand = new Random(random.nextInt());
+
+        @Override
+        public void run() {
+          try {
+            while (operations.decrementAndGet() >= 0) {
+              // bias toward a recently changed doc
+              int id = rand.nextInt(100) < 25 ? lastId : rand.nextInt(ndocs);
+
+              // when indexing, we update the index, then the model
+              // so when querying, we should first check the model, and then the index
+
+              long val;
+              IndexReader r;
+              synchronized(TestStressNRT.this) {
+                val = committedModel.get(id);
+                r = reader;
+                r.incRef();
+              }
+
+              if (VERBOSE) {
+                System.out.println("TEST: " + Thread.currentThread().getName() + ": s id=" + id + " val=" + val + " r=" + r.getVersion());
+              }
+
+              //  sreq = req("wt","json", "q","id:"+Integer.toString(id), "omitHeader","true");
+              IndexSearcher searcher = new IndexSearcher(r);
+              Query q = new TermQuery(new Term("id",Integer.toString(id)));
+              TopDocs results = searcher.search(q, 10);
+
+              if (results.totalHits == 0 && tombstones) {
+                // if we couldn't find the doc, look for its tombstone
+                q = new TermQuery(new Term("id","-"+Integer.toString(id)));
+                results = searcher.search(q, 1);
+                if (results.totalHits == 0) {
+                  if (val == -1L) {
+                    // expected... no doc was added yet
+                    r.decRef();
+                    continue;
+                  }
+                  fail("No documents or tombstones found for id " + id + ", expected at least " + val + " reader=" + r);
+                }
+              }
+
+              if (results.totalHits == 0 && !tombstones) {
+                // nothing to do - we can't tell anything from a deleted doc without tombstones
+              } else {
+                // we should have found the document, or its tombstone
+                if (results.totalHits != 1) {
+                  System.out.println("FAIL: hits id:" + id + " val=" + val);
+                  for(ScoreDoc sd : results.scoreDocs) {
+                    final Document doc = r.document(sd.doc);
+                    System.out.println("  docID=" + sd.doc + " id:" + doc.get("id") + " foundVal=" + doc.get(field));
+                  }
+                  fail("id=" + id + " reader=" + r + " totalHits=" + results.totalHits);
+                }
+                Document doc = searcher.doc(results.scoreDocs[0].doc);
+                long foundVal = Long.parseLong(doc.get(field));
+                if (foundVal < Math.abs(val)) {
+                  fail("foundVal=" + foundVal + " val=" + val + " id=" + id + " reader=" + r);
+                }
+              }
+
+              r.decRef();
+            }
+          } catch (Throwable e) {
+            operations.set(-1L);
+            System.out.println(Thread.currentThread().getName() + ": FAILED: unexpected exception");
+            e.printStackTrace(System.out);
+            throw new RuntimeException(e);
+          }
+        }
+      };
+
+      threads.add(thread);
+    }
+
+    for (Thread thread : threads) {
+      thread.start();
+    }
+
+    for (Thread thread : threads) {
+      thread.join();
+    }
+
+    writer.close();
+    if (VERBOSE) {
+      System.out.println("TEST: close reader=" + reader);
+    }
+    reader.close();
+    dir.close();
+  }
+}

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java?rev=1155278&r1=1155277&r2=1155278&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java Tue Aug  9 09:22:15 2011
@@ -16,26 +16,12 @@
  */
 package org.apache.solr.search;
 
-import org.apache.lucene.analysis.core.WhitespaceAnalyzer;
-import org.apache.lucene.document.Document;
-import org.apache.lucene.document.Field;
-import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.index.IndexWriterConfig;
-import org.apache.lucene.index.Term;
-import org.apache.lucene.search.IndexSearcher;
-import org.apache.lucene.search.Query;
-import org.apache.lucene.search.TermQuery;
-import org.apache.lucene.search.TopDocs;
-import org.apache.lucene.store.RAMDirectory;
-import org.apache.lucene.util.Version;
 import org.apache.noggit.ObjectBuilder;
 import org.apache.solr.SolrTestCaseJ4;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.request.SolrQueryRequest;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.junit.Ignore;
 
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
@@ -123,7 +109,7 @@ public class TestRealTimeGet extends Sol
 
     // query variables
     final int percentRealtimeQuery = 0;   // realtime get is not implemented yet
-    final AtomicLong operations = new AtomicLong(0);  // number of query operations to perform in total       // TODO: once lucene level passes, we can move on to the solr level
+    final AtomicLong operations = new AtomicLong(1000);  // number of query operations to perform in total       // TODO: once lucene level passes, we can move on to the solr level
     int nReadThreads = 10;
 
     initModel(ndocs);
@@ -272,257 +258,5 @@ public class TestRealTimeGet extends Sol
     for (Thread thread : threads) {
       thread.join();
     }
-
-  }
-
-
-
-
-  IndexReader reader;
-
-  @Ignore
-  @Test
-  public void testStressLuceneNRT() throws Exception {
-    // update variables
-    final int commitPercent = 10;
-    final int softCommitPercent = 50; // what percent of the commits are soft
-    final int deletePercent = 8;
-    final int deleteByQueryPercent = 4;
-    final int ndocs = 100;
-    int nWriteThreads = 10;
-    final int maxConcurrentCommits = 2;   // number of committers at a time... needed if we want to avoid commit errors due to exceeding the max
-    final boolean tombstones = false;
-
-    // query variables
-    final AtomicLong operations = new AtomicLong(0);  // number of query operations to perform in total       // TODO: temporarily high due to lack of stability
-    int nReadThreads = 10;
-
-    initModel(ndocs);
-
-    final AtomicInteger numCommitting = new AtomicInteger();
-
-    List<Thread> threads = new ArrayList<Thread>();
-
-    RAMDirectory dir = new RAMDirectory();
-    final IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(Version.LUCENE_40, new WhitespaceAnalyzer(Version.LUCENE_40)));
-    writer.commit();
-    reader = IndexReader.open(dir);
-
-    for (int i=0; i<nWriteThreads; i++) {
-      Thread thread = new Thread("WRITER"+i) {
-        Random rand = new Random(random.nextInt());
-
-        @Override
-        public void run() {
-          try {
-            while (operations.get() > 0) {
-              int oper = rand.nextInt(100);
-
-              if (oper < commitPercent) {
-                if (numCommitting.incrementAndGet() <= maxConcurrentCommits) {
-                  Map<Integer,Long> newCommittedModel;
-                  long version;
-                  IndexReader oldReader;
-
-                  synchronized(TestRealTimeGet.this) {
-                    newCommittedModel = new HashMap<Integer,Long>(model);  // take a snapshot
-                    version = snapshotCount++;
-                    oldReader = reader;
-                    oldReader.incRef();  // increment the reference since we will use this for reopening
-                  }
-
-                  IndexReader newReader;
-                  if (rand.nextInt(100) < softCommitPercent) {
-                    // assertU(h.commit("softCommit","true"));
-                    newReader = oldReader.reopen(writer, true);
-                  } else {
-                    // assertU(commit());
-                    writer.commit();
-                    newReader = oldReader.reopen();
-                  }
-
-                  synchronized(TestRealTimeGet.this) {
-                    // install the new reader if it's newest (and check the current version since another reader may have already been installed)
-                    if (newReader.getVersion() > reader.getVersion()) {
-                      reader.decRef();
-                      reader = newReader;
-
-                      // install this snapshot only if it's newer than the current one
-                      if (version >= committedModelClock) {
-                        committedModel = newCommittedModel;
-                        committedModelClock = version;
-                      }
-
-                    } else if (newReader != oldReader) {
-                      // if the same reader, don't decRef.
-                      newReader.decRef();
-                    }
-
-                    oldReader.decRef();
-                  }
-                }
-                numCommitting.decrementAndGet();
-                continue;
-              }
-
-
-              int id = rand.nextInt(ndocs);
-              Object sync = syncArr[id];
-
-              // set the lastId before we actually change it sometimes to try and
-              // uncover more race conditions between writing and reading
-              boolean before = rand.nextBoolean();
-              if (before) {
-                lastId = id;
-              }
-
-              // We can't concurrently update the same document and retain our invariants of increasing values
-              // since we can't guarantee what order the updates will be executed.
-              synchronized (sync) {
-                Long val = model.get(id);
-                long nextVal = Math.abs(val)+1;
-
-                if (oper < commitPercent + deletePercent) {
-                  // assertU("<delete><id>" + id + "</id></delete>");
-
-                  // add tombstone first
-                  if (tombstones) {
-                    Document d = new Document();
-                    d.add(new Field("id","-"+Integer.toString(id), Field.Store.YES, Field.Index.NOT_ANALYZED_NO_NORMS));
-                    d.add(new Field(field, Long.toString(nextVal), Field.Store.YES, Field.Index.NO));
-                    writer.updateDocument(new Term("id", "-"+Integer.toString(id)), d);
-                  }
-
-                  writer.deleteDocuments(new Term("id",Integer.toString(id)));
-                  model.put(id, -nextVal);
-                } else if (oper < commitPercent + deletePercent + deleteByQueryPercent) {
-                  //assertU("<delete><query>id:" + id + "</query></delete>");
-
-                  // add tombstone first
-                  if (tombstones) {
-                    Document d = new Document();
-                    d.add(new Field("id","-"+Integer.toString(id), Field.Store.YES, Field.Index.NOT_ANALYZED_NO_NORMS));
-                    d.add(new Field(field, Long.toString(nextVal), Field.Store.YES, Field.Index.NO));
-                    writer.updateDocument(new Term("id", "-"+Integer.toString(id)), d);
-                  }
-
-                  writer.deleteDocuments(new TermQuery(new Term("id", Integer.toString(id))));
-                  model.put(id, -nextVal);
-                } else {
-                  // assertU(adoc("id",Integer.toString(id), field, Long.toString(nextVal)));
-                  Document d = new Document();
-                  d.add(new Field("id",Integer.toString(id), Field.Store.YES, Field.Index.NOT_ANALYZED_NO_NORMS));
-                  d.add(new Field(field, Long.toString(nextVal), Field.Store.YES, Field.Index.NO));
-                  writer.updateDocument(new Term("id", Integer.toString(id)), d);
-
-                  if (tombstones) {
-                    // remove tombstone after new addition (this should be optional?)
-                    writer.deleteDocuments(new Term("id","-"+Integer.toString(id)));
-                  }
-
-                  model.put(id, nextVal);
-                }
-              }
-
-              if (!before) {
-                lastId = id;
-              }
-            }
-          } catch (Exception  ex) {
-            throw new RuntimeException(ex);
-          }
-        }
-      };
-
-      threads.add(thread);
-    }
-
-
-    for (int i=0; i<nReadThreads; i++) {
-      Thread thread = new Thread("READER"+i) {
-        Random rand = new Random(random.nextInt());
-
-        @Override
-        public void run() {
-          try {
-            while (operations.decrementAndGet() >= 0) {
-              // bias toward a recently changed doc
-              int id = rand.nextInt(100) < 25 ? lastId : rand.nextInt(ndocs);
-
-              // when indexing, we update the index, then the model
-              // so when querying, we should first check the model, and then the index
-
-              long val;
-
-              synchronized(TestRealTimeGet.this) {
-                val = committedModel.get(id);
-              }
-
-
-              IndexReader r;
-              synchronized(TestRealTimeGet.this) {
-                r = reader;
-                r.incRef();
-              }
-
-              //  sreq = req("wt","json", "q","id:"+Integer.toString(id), "omitHeader","true");
-              IndexSearcher searcher = new IndexSearcher(r);
-              Query q = new TermQuery(new Term("id",Integer.toString(id)));
-              TopDocs results = searcher.search(q, 1);
-
-              if (results.totalHits == 0 && tombstones) {
-                // if we couldn't find the doc, look for it's tombstone
-                q = new TermQuery(new Term("id","-"+Integer.toString(id)));
-                results = searcher.search(q, 1);
-                if (results.totalHits == 0) {
-                  if (val == -1L) {
-                    // expected... no doc was added yet
-                    continue;
-                  }
-                  fail("No documents or tombstones found for id " + id + ", expected at least " + val);
-                }
-              }
-
-              if (results.totalHits == 0 && !tombstones) {
-                // nothing to do - we can't tell anything from a deleted doc without tombstones
-              } else {
-                assertEquals(1, results.totalHits);   // we should have found the document, or it's tombstone
-                Document doc = searcher.doc(results.scoreDocs[0].doc);
-                long foundVal = Long.parseLong(doc.get(field));
-                if (foundVal < Math.abs(val)) {
-                  System.out.println("model_val="+val+" foundVal="+foundVal);
-                }
-                assertTrue(foundVal >= Math.abs(val));
-              }
-
-              r.decRef();
-            }
-          }
-          catch (Throwable e) {
-            operations.set(-1L);
-            SolrException.log(log,e);
-            fail(e.toString());
-          }
-        }
-      };
-
-      threads.add(thread);
-    }
-
-
-    for (Thread thread : threads) {
-      thread.start();
-    }
-
-    for (Thread thread : threads) {
-      thread.join();
-    }
-
-    writer.close();
-    reader.close();
-
   }
-
-
-
 }