You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by si...@apache.org on 2011/11/04 21:16:08 UTC

svn commit: r1197742 - in /lucene/dev/trunk/lucene/src: java/org/apache/lucene/index/DocumentsWriter.java java/org/apache/lucene/index/IndexWriter.java test/org/apache/lucene/index/TestIndexWriterNRTIsCurrent.java

Author: simonw
Date: Fri Nov  4 20:16:08 2011
New Revision: 1197742

URL: http://svn.apache.org/viewvc?rev=1197742&view=rev
Log:
LUCENE-3551: IW#nrtIsCurrent returns false-positive when full flush runs concurrently

Added:
    lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestIndexWriterNRTIsCurrent.java   (with props)
Modified:
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IndexWriter.java

Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java?rev=1197742&r1=1197741&r2=1197742&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java Fri Nov  4 20:16:08 2011
@@ -117,7 +117,14 @@ final class DocumentsWriter {
 
   // TODO: cut over to BytesRefHash in BufferedDeletes
   volatile DocumentsWriterDeleteQueue deleteQueue = new DocumentsWriterDeleteQueue();
-  private final Queue<FlushTicket> ticketQueue = new LinkedList<DocumentsWriter.FlushTicket>();
+  private final TicketQueue ticketQueue = new TicketQueue();
+  /*
+   * we preserve changes during a full flush since IW might not checkout before
+   * we release all changes. NRT Readers otherwise suddenly return true from
+   * isCurrent while there are actually changes currently committed. See also
+   * #anyChanges() & #flushAllThreads
+   */
+  private volatile boolean pendingChangesInCurrentFullFlush;
 
   private Collection<String> abortedFiles;               // List of files that were written before last abort()
 
@@ -170,6 +177,7 @@ final class DocumentsWriter {
   private void applyAllDeletes(DocumentsWriterDeleteQueue deleteQueue) throws IOException {
     if (deleteQueue != null && !flushControl.isFullFlush()) {
       synchronized (ticketQueue) {
+        ticketQueue.incTicketCount();// first inc the ticket count - freeze opens a window for #anyChanges to fail
         // Freeze and insert the delete flush ticket in the queue
         ticketQueue.add(new FlushTicket(deleteQueue.freezeGlobalBuffer(null), false));
         applyFlushTickets();
@@ -256,9 +264,22 @@ final class DocumentsWriter {
   }
 
   boolean anyChanges() {
-    return numDocsInRAM.get() != 0 || anyDeletions();
+    if (infoStream != null) {
+      message("docWriter: anyChanges? numDocsInRam=" + numDocsInRAM.get()
+          + " deletes=" + anyDeletions() + " hasTickets:"
+          + ticketQueue.hasTickets() + " pendingChangesInFullFlush: "
+          + pendingChangesInCurrentFullFlush);
+    }
+    /*
+     * changes are either in a DWPT or in the deleteQueue.
+     * yet if we currently flush deletes and / or dwpt there
+     * could be a window where all changes are in the ticket queue
+     * before they are published to the IW. ie we need to check if the 
+     * ticket queue has any tickets.
+     */
+    return numDocsInRAM.get() != 0 || anyDeletions() || ticketQueue.hasTickets() || pendingChangesInCurrentFullFlush;
   }
-
+  
   public int getBufferedDeleteTermsSize() {
     return deleteQueue.getBufferedDeleteTermsSize();
   }
@@ -283,7 +304,7 @@ final class DocumentsWriter {
     if (flushControl.anyStalledThreads() || flushControl.numQueuedFlushes() > 0) {
       // Help out flushing any queued DWPTs so we can un-stall:
       if (infoStream != null) {
-        message("DocumentsWriter has queued dwpt; will hijack this thread to flush pending segment(s)");
+        message("docWriter: DocumentsWriter has queued dwpt; will hijack this thread to flush pending segment(s)");
       }
       do {
         // Try pick up pending threads here if possible
@@ -417,7 +438,7 @@ final class DocumentsWriter {
           synchronized (ticketQueue) {
             // Each flush is assigned a ticket in the order they acquire the ticketQueue lock
             ticket =  new FlushTicket(flushingDWPT.prepareFlush(), true);
-            ticketQueue.add(ticket);
+            ticketQueue.incrementAndAdd(ticket);
           }
   
           // flush concurrently without locking
@@ -474,8 +495,11 @@ final class DocumentsWriter {
         // Keep publishing eligible flushed segments:
         final FlushTicket head = ticketQueue.peek();
         if (head != null && head.canPublish()) {
-          ticketQueue.poll();
-          finishFlush(head.segment, head.frozenDeletes);
+          try {
+            finishFlush(head.segment, head.frozenDeletes);
+          } finally {
+            ticketQueue.poll();
+          }
         } else {
           break;
         }
@@ -489,7 +513,7 @@ final class DocumentsWriter {
     if (newSegment == null) {
       assert bufferedDeletes != null;
       if (bufferedDeletes != null && bufferedDeletes.any()) {
-        indexWriter.bufferedDeletesStream.push(bufferedDeletes);
+        indexWriter.publishFrozenDeletes(bufferedDeletes);
         if (infoStream != null) {
           message("flush: push buffered deletes: " + bufferedDeletes);
         }
@@ -535,6 +559,7 @@ final class DocumentsWriter {
   
   // for asserts
   private volatile DocumentsWriterDeleteQueue currentFullFlushDelQueue = null;
+
   // for asserts
   private synchronized boolean setFlushingDeleteQueue(DocumentsWriterDeleteQueue session) {
     currentFullFlushDelQueue = session;
@@ -554,6 +579,7 @@ final class DocumentsWriter {
     }
     
     synchronized (this) {
+      pendingChangesInCurrentFullFlush = anyChanges();
       flushingDeleteQueue = deleteQueue;
       /* Cutover to a new delete queue.  This must be synced on the flush control
        * otherwise a new DWPT could sneak into the loop with an already flushing
@@ -573,11 +599,12 @@ final class DocumentsWriter {
       }
       // If a concurrent flush is still in flight wait for it
       flushControl.waitForFlush();  
-      if (!anythingFlushed) { // apply deletes if we did not flush any document
+      if (!anythingFlushed && flushingDeleteQueue.anyChanges()) { // apply deletes if we did not flush any document
         if (infoStream != null) {
          message(Thread.currentThread().getName() + ": flush naked frozen global deletes");
         }
         synchronized (ticketQueue) {
+          ticketQueue.incTicketCount(); // first inc the ticket count - freeze opens a window for #anyChanges to fail
           ticketQueue.add(new FlushTicket(flushingDeleteQueue.freezeGlobalBuffer(null), false));
         }
         applyFlushTickets();
@@ -590,16 +617,21 @@ final class DocumentsWriter {
   }
   
   final void finishFullFlush(boolean success) {
-    if (infoStream != null) {
-      message(Thread.currentThread().getName() + " finishFullFlush success=" + success);
-    }
-    assert setFlushingDeleteQueue(null);
-    if (success) {
-      // Release the flush lock
-      flushControl.finishFullFlush();
-    } else {
-      flushControl.abortFullFlushes();
+    try {
+      if (infoStream != null) {
+        message(Thread.currentThread().getName() + " finishFullFlush success=" + success);
+      }
+      assert setFlushingDeleteQueue(null);
+      if (success) {
+        // Release the flush lock
+        flushControl.finishFullFlush();
+      } else {
+        flushControl.abortFullFlushes();
+      }
+    } finally {
+      pendingChangesInCurrentFullFlush = false;
     }
+    
   }
 
   static final class FlushTicket {
@@ -618,6 +650,46 @@ final class DocumentsWriter {
     }
   }
   
+  static final class TicketQueue {
+    private final Queue<FlushTicket> queue = new LinkedList<FlushTicket>();
+    final AtomicInteger ticketCount = new AtomicInteger();
+    
+    void incTicketCount() {
+      ticketCount.incrementAndGet();
+    }
+    
+    public boolean hasTickets() {
+      assert ticketCount.get() >= 0;
+      return ticketCount.get() != 0;
+    }
+
+    void incrementAndAdd(FlushTicket ticket) {
+      incTicketCount();
+      add(ticket);
+    }
+    
+    void add(FlushTicket ticket) {
+      queue.add(ticket);
+    }
+    
+    FlushTicket peek() {
+      return queue.peek();
+    }
+    
+    FlushTicket poll() {
+      try {
+        return queue.poll();
+      } finally {
+        ticketCount.decrementAndGet();
+      }
+    }
+    
+    void clear() {
+      queue.clear();
+      ticketCount.set(0);
+    }
+  }
+  
   // use by IW during close to assert all DWPT are inactive after final flush
   boolean assertNoActiveDWPT() {
     Iterator<ThreadState> activePerThreadsIterator = perThreadPool.getAllPerThreadsIterator();

Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IndexWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IndexWriter.java?rev=1197742&r1=1197741&r2=1197742&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IndexWriter.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IndexWriter.java Fri Nov  4 20:16:08 2011
@@ -2354,6 +2354,13 @@ public class IndexWriter implements Clos
     return newSegment;
   }
   
+  synchronized void publishFrozenDeletes(FrozenBufferedDeletes packet) throws IOException {
+    assert packet != null && packet.any();
+    synchronized (bufferedDeletesStream) {
+      bufferedDeletesStream.push(packet);
+    }
+  }
+  
   /**
    * Atomically adds the segment private delete packet and publishes the flushed
    * segments SegmentInfo to the index writer. NOTE: use
@@ -2984,14 +2991,14 @@ public class IndexWriter implements Clos
       final boolean anySegmentFlushed;
       
       synchronized (fullFlushLock) {
+    	boolean flushSuccess = false;
         try {
           anySegmentFlushed = docWriter.flushAllThreads();
-          success = true;
+          flushSuccess = true;
         } finally {
-          docWriter.finishFullFlush(success);
+          docWriter.finishFullFlush(flushSuccess);
         }
       }
-      success = false;
       synchronized(this) {
         maybeApplyDeletes(applyAllDeletes);
         doAfterFlush();
@@ -4074,6 +4081,10 @@ public class IndexWriter implements Clos
   synchronized boolean nrtIsCurrent(SegmentInfos infos) {
     //System.out.println("IW.nrtIsCurrent " + (infos.version == segmentInfos.version && !docWriter.anyChanges() && !bufferedDeletesStream.any()));
     ensureOpen();
+    if (infoStream != null) {
+      message("nrtIsCurrent: infoVersion matches: " + (infos.version == segmentInfos.version) + " DW changes: " + docWriter.anyChanges() + " BD changes: "+bufferedDeletesStream.any());
+
+    }
     return infos.version == segmentInfos.version && !docWriter.anyChanges() && !bufferedDeletesStream.any();
   }
 

Added: lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestIndexWriterNRTIsCurrent.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestIndexWriterNRTIsCurrent.java?rev=1197742&view=auto
==============================================================================
--- lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestIndexWriterNRTIsCurrent.java (added)
+++ lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestIndexWriterNRTIsCurrent.java Fri Nov  4 20:16:08 2011
@@ -0,0 +1,203 @@
+package org.apache.lucene.index;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.lucene.analysis.MockAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.TextField;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.LockObtainFailedException;
+import org.apache.lucene.util.LuceneTestCase;
+
+public class TestIndexWriterNRTIsCurrent extends LuceneTestCase {
+
+  public static class ReaderHolder {
+    volatile IndexReader reader;
+    volatile boolean stop = false;
+  }
+
+  public void testIsCurrentWithThreads() throws CorruptIndexException,
+      LockObtainFailedException, IOException, InterruptedException {
+    Directory dir = newDirectory();
+    IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT,
+        new MockAnalyzer(random));
+    IndexWriter writer = new IndexWriter(dir, conf);
+    if (VERBOSE) {
+      writer.setInfoStream(System.out);
+    }
+    ReaderHolder holder = new ReaderHolder();
+    ReaderThread[] threads = new ReaderThread[atLeast(3)];
+    final CountDownLatch latch = new CountDownLatch(1);
+    WriterThread writerThread = new WriterThread(holder, writer,
+        atLeast(500), random, latch);
+    for (int i = 0; i < threads.length; i++) {
+      threads[i] = new ReaderThread(holder, latch);
+      threads[i].start();
+    }
+    writerThread.start();
+
+    writerThread.join();
+    boolean failed = writerThread.failed != null;
+    if (failed)
+      writerThread.failed.printStackTrace();
+    for (int i = 0; i < threads.length; i++) {
+      threads[i].join();
+      if (threads[i].failed != null) {
+        threads[i].failed.printStackTrace();
+        failed = true;
+      }
+    }
+    assertFalse(failed);
+    writer.close();
+    dir.close();
+
+  }
+
+  public static class WriterThread extends Thread {
+    private final ReaderHolder holder;
+    private final IndexWriter writer;
+    private final int numOps;
+    private final Random random;
+    private boolean countdown = true;
+    private final CountDownLatch latch;
+    Throwable failed;
+
+    WriterThread(ReaderHolder holder, IndexWriter writer, int numOps,
+        Random random, CountDownLatch latch) {
+      super();
+      this.holder = holder;
+      this.writer = writer;
+      this.numOps = numOps;
+      this.random = random;
+      this.latch = latch;
+    }
+
+    public void run() {
+      IndexReader currentReader = null;
+      try {
+        Document doc = new Document();
+        doc.add(new Field("id", "1", TextField.TYPE_UNSTORED));
+        writer.addDocument(doc);
+        holder.reader = currentReader = writer.getReader(true);
+        Term term = new Term("id");
+        for (int i = 0; i < numOps && !holder.stop; i++) {
+          float nextOp = random.nextFloat();
+          if (nextOp < 0.3) {
+            term.set("id", "1");
+            writer.updateDocument(term, doc);
+          } else if (nextOp < 0.5) {
+            writer.addDocument(doc);
+          } else {
+            term.set("id", "1");
+            writer.deleteDocuments(term);
+          }
+          if (holder.reader != currentReader) {
+            holder.reader = currentReader;
+            if (countdown) {
+              countdown = false;
+              latch.countDown();
+            }
+          }
+          if (random.nextBoolean()) {
+            writer.commit();
+            final IndexReader newReader = IndexReader
+                .openIfChanged(currentReader);
+            if (newReader != null) { 
+              currentReader.decRef();
+              currentReader = newReader;
+            }
+            if (currentReader.numDocs() == 0) {
+              writer.addDocument(doc);
+            }
+          }
+        }
+      } catch (Throwable e) {
+        failed = e;
+      } finally {
+        holder.reader = null;
+        if (countdown) {
+          latch.countDown();
+        }
+        if (currentReader != null) {
+          try {
+            currentReader.decRef();
+          } catch (IOException e) {
+          }
+        }
+      }
+      if (VERBOSE) {
+        System.out.println("writer stopped - forced by reader: " + holder.stop);
+      }
+    }
+    
+  }
+
+  public static final class ReaderThread extends Thread {
+    private final ReaderHolder holder;
+    private final CountDownLatch latch;
+    Throwable failed;
+
+    ReaderThread(ReaderHolder holder, CountDownLatch latch) {
+      super();
+      this.holder = holder;
+      this.latch = latch;
+    }
+
+    public void run() {
+      try {
+        latch.await();
+      } catch (InterruptedException e) {
+        failed = e;
+        return;
+      }
+      IndexReader reader;
+      while ((reader = holder.reader) != null) {
+        if (reader.tryIncRef()) {
+          try {
+            boolean current = reader.isCurrent();
+            if (VERBOSE) {
+              System.out.println("Thread: " + Thread.currentThread() + " Reader: " + reader + " isCurrent:" + current);
+            }
+
+            assertFalse(current);
+          } catch (Throwable e) {
+            if (VERBOSE) {
+              System.out.println("FAILED Thread: " + Thread.currentThread() + " Reader: " + reader + " isCurrent: false");
+            }
+            failed = e;
+            holder.stop = true;
+            return;
+          } finally {
+            try {
+              reader.decRef();
+            } catch (IOException e) {
+              if (failed == null) {
+                failed = e;
+              }
+              return;
+            }
+          }
+        }
+      }
+    }
+  }
+}