You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by si...@apache.org on 2012/01/16 16:07:04 UTC

svn commit: r1232017 - in /lucene/dev/trunk/lucene/src/java/org/apache/lucene/index: DocumentsWriter.java DocumentsWriterFlushQueue.java

Author: simonw
Date: Mon Jan 16 15:07:04 2012
New Revision: 1232017

URL: http://svn.apache.org/viewvc?rev=1232017&view=rev
Log:
LUCENE-3692: DocumentsWriter blocks flushes when applyDeletes takes forever

Added:
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushQueue.java
Modified:
    lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java

Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java?rev=1232017&r1=1232016&r2=1232017&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java Mon Jan 16 15:07:04 2012
@@ -20,13 +20,12 @@ package org.apache.lucene.index;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
-import java.util.Queue;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.codecs.Codec;
+import org.apache.lucene.index.DocumentsWriterFlushQueue.SegmentFlushTicket;
 import org.apache.lucene.index.DocumentsWriterPerThread.FlushedSegment;
 import org.apache.lucene.index.DocumentsWriterPerThread.IndexingChain;
 import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
@@ -117,7 +116,7 @@ final class DocumentsWriter {
 
   // TODO: cut over to BytesRefHash in BufferedDeletes
   volatile DocumentsWriterDeleteQueue deleteQueue = new DocumentsWriterDeleteQueue();
-  private final TicketQueue ticketQueue = new TicketQueue();
+  private final DocumentsWriterFlushQueue ticketQueue = new DocumentsWriterFlushQueue();
   /*
    * we preserve changes during a full flush since IW might not checkout before
    * we release all changes. NRT Readers otherwise suddenly return true from
@@ -177,12 +176,7 @@ final class DocumentsWriter {
   
   private void applyAllDeletes(DocumentsWriterDeleteQueue deleteQueue) throws IOException {
     if (deleteQueue != null && !flushControl.isFullFlush()) {
-      synchronized (ticketQueue) {
-        ticketQueue.incTicketCount();// first inc the ticket count - freeze opens a window for #anyChanges to fail
-        // Freeze and insert the delete flush ticket in the queue
-        ticketQueue.add(new FlushTicket(deleteQueue.freezeGlobalBuffer(null), false));
-        applyFlushTickets();
-      }
+      ticketQueue.addDeletesAndPurge(this, deleteQueue);
     }
     indexWriter.applyAllDeletes();
     indexWriter.flushCount.incrementAndGet();
@@ -401,7 +395,7 @@ final class DocumentsWriter {
     while (flushingDWPT != null) {
       maybeMerge = true;
       boolean success = false;
-      FlushTicket ticket = null;
+      SegmentFlushTicket ticket = null;
       try {
         assert currentFullFlushDelQueue == null
             || flushingDWPT.deleteQueue == currentFullFlushDelQueue : "expected: "
@@ -422,34 +416,27 @@ final class DocumentsWriter {
          * might miss to deletes documents in 'A'.
          */
         try {
-          synchronized (ticketQueue) {
-            // Each flush is assigned a ticket in the order they acquire the ticketQueue lock
-            ticket =  new FlushTicket(flushingDWPT.prepareFlush(), true);
-            ticketQueue.incrementAndAdd(ticket);
-          }
+          // Each flush is assigned a ticket in the order they acquire the ticketQueue lock
+          ticket = ticketQueue.addFlushTicket(flushingDWPT);
   
           // flush concurrently without locking
           final FlushedSegment newSegment = flushingDWPT.flush();
-          synchronized (ticketQueue) {
-            ticket.segment = newSegment;
-          }
+          ticketQueue.addSegment(ticket, newSegment);
           // flush was successful once we reached this point - new seg. has been assigned to the ticket!
           success = true;
         } finally {
           if (!success && ticket != null) {
-            synchronized (ticketQueue) {
-              // In the case of a failure make sure we are making progress and
-              // apply all the deletes since the segment flush failed since the flush
-              // ticket could hold global deletes see FlushTicket#canPublish()
-              ticket.isSegmentFlush = false;
-            }
+            // In the case of a failure make sure we are making progress and
+            // apply all the deletes since the segment flush failed since the flush
+            // ticket could hold global deletes see FlushTicket#canPublish()
+            ticketQueue.markTicketFailed(ticket);
           }
         }
         /*
          * Now we are done and try to flush the ticket queue if the head of the
          * queue has already finished the flush.
          */
-        applyFlushTickets();
+        ticketQueue.tryPurge(this);
       } finally {
         flushControl.doAfterFlush(flushingDWPT);
         flushingDWPT.checkAndResetHasAborted();
@@ -476,25 +463,7 @@ final class DocumentsWriter {
     return maybeMerge;
   }
 
-  private void applyFlushTickets() throws IOException {
-    synchronized (ticketQueue) {
-      while (true) {
-        // Keep publishing eligible flushed segments:
-        final FlushTicket head = ticketQueue.peek();
-        if (head != null && head.canPublish()) {
-          try {
-            finishFlush(head.segment, head.frozenDeletes);
-          } finally {
-            ticketQueue.poll();
-          }
-        } else {
-          break;
-        }
-      }
-    }
-  }
-
-  private void finishFlush(FlushedSegment newSegment, FrozenBufferedDeletes bufferedDeletes)
+  void finishFlush(FlushedSegment newSegment, FrozenBufferedDeletes bufferedDeletes)
       throws IOException {
     // Finish the flushed segment and publish it to IndexWriter
     if (newSegment == null) {
@@ -590,13 +559,11 @@ final class DocumentsWriter {
         if (infoStream.isEnabled("DW")) {
           infoStream.message("DW", Thread.currentThread().getName() + ": flush naked frozen global deletes");
         }
-        synchronized (ticketQueue) {
-          ticketQueue.incTicketCount(); // first inc the ticket count - freeze opens a window for #anyChanges to fail
-          ticketQueue.add(new FlushTicket(flushingDeleteQueue.freezeGlobalBuffer(null), false));
-        }
-        applyFlushTickets();
+        ticketQueue.addDeletesAndPurge(this, flushingDeleteQueue);
+      } else {
+        ticketQueue.forcePurge(this);
       }
-      assert !flushingDeleteQueue.anyChanges();
+      assert !flushingDeleteQueue.anyChanges() && !ticketQueue.hasTickets();
     } finally {
       assert flushingDeleteQueue == currentFullFlushDelQueue;
     }
@@ -621,61 +588,8 @@ final class DocumentsWriter {
     
   }
 
-  static final class FlushTicket {
-    final FrozenBufferedDeletes frozenDeletes;
-    /* access to non-final members must be synchronized on DW#ticketQueue */
-    FlushedSegment segment;
-    boolean isSegmentFlush;
-    
-    FlushTicket(FrozenBufferedDeletes frozenDeletes, boolean isSegmentFlush) {
-      this.frozenDeletes = frozenDeletes;
-      this.isSegmentFlush = isSegmentFlush;
-    }
-    
-    boolean canPublish() {
-      return (!isSegmentFlush || segment != null);  
-    }
-  }
   
-  static final class TicketQueue {
-    private final Queue<FlushTicket> queue = new LinkedList<FlushTicket>();
-    final AtomicInteger ticketCount = new AtomicInteger();
-    
-    void incTicketCount() {
-      ticketCount.incrementAndGet();
-    }
-    
-    public boolean hasTickets() {
-      assert ticketCount.get() >= 0;
-      return ticketCount.get() != 0;
-    }
-
-    void incrementAndAdd(FlushTicket ticket) {
-      incTicketCount();
-      add(ticket);
-    }
-    
-    void add(FlushTicket ticket) {
-      queue.add(ticket);
-    }
-    
-    FlushTicket peek() {
-      return queue.peek();
-    }
-    
-    FlushTicket poll() {
-      try {
-        return queue.poll();
-      } finally {
-        ticketCount.decrementAndGet();
-      }
-    }
-    
-    void clear() {
-      queue.clear();
-      ticketCount.set(0);
-    }
-  }
+ 
   
   // use by IW during close to assert all DWPT are inactive after final flush
   boolean assertNoActiveDWPT() {

Added: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushQueue.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushQueue.java?rev=1232017&view=auto
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushQueue.java (added)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushQueue.java Mon Jan 16 15:07:04 2012
@@ -0,0 +1,211 @@
+package org.apache.lucene.index;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.lucene.index.DocumentsWriterPerThread.FlushedSegment;
+
+
+/**
+ * @lucene.internal 
+ */
+public class DocumentsWriterFlushQueue {
+  private final Queue<FlushTicket> queue = new LinkedList<FlushTicket>();
+  // we track tickets separately since count must be present even before the ticket is
+  // constructed ie. queue.size would not reflect it.
+  private final AtomicInteger ticketCount = new AtomicInteger();
+  private final ReentrantLock purgeLock = new ReentrantLock();
+
+  synchronized void addDeletesAndPurge(DocumentsWriter writer,
+      DocumentsWriterDeleteQueue deleteQueue) throws IOException {
+   incTickets();// first inc the ticket count - freeze opens
+                // a window for #anyChanges to fail
+    boolean success = false;
+    try {
+      queue.add(new GlobalDeletesTicket(deleteQueue.freezeGlobalBuffer(null)));
+      success = true;
+    } finally {
+      if (!success) {
+       decTickets();
+      }
+    }
+    forcePurge(writer);
+  }
+  
+  private void incTickets() {
+    int numTickets = ticketCount.incrementAndGet();
+    assert numTickets > 0;
+  }
+  
+  private void decTickets() {
+    int numTickets = ticketCount.decrementAndGet();
+    assert numTickets >= 0;
+  }
+
+  synchronized SegmentFlushTicket addFlushTicket(DocumentsWriterPerThread dwpt) {
+    // Each flush is assigned a ticket in the order they acquire the ticketQueue
+    // lock
+    incTickets();
+    boolean success = false;
+    try {
+      // prepare flush freezes the global deletes - do in synced block!
+      final SegmentFlushTicket ticket = new SegmentFlushTicket(dwpt.prepareFlush());
+      queue.add(ticket);
+      success = true;
+      return ticket;
+    } finally {
+      if (!success) {
+        decTickets();
+      }
+    }
+  }
+  
+  synchronized void addSegment(SegmentFlushTicket ticket, FlushedSegment segment) {
+    // the actual flush is done asynchronously and once done the FlushedSegment
+    // is passed to the flush ticket
+    ticket.setSegment(segment);
+  }
+
+  synchronized void markTicketFailed(SegmentFlushTicket ticket) {
+    // to free the queue we mark tickets as failed just to clean up the queue.
+    ticket.setFailed();
+  }
+
+  boolean hasTickets() {
+    assert ticketCount.get() >= 0 : "ticketCount should be >= 0 but was: " + ticketCount.get();
+    return ticketCount.get() != 0;
+  }
+
+  private void innerPurge(DocumentsWriter writer) throws IOException {
+    assert purgeLock.isHeldByCurrentThread();
+    while (true) {
+      final FlushTicket head;
+      final boolean canPublish;
+      synchronized (this) {
+        head = queue.peek();
+        canPublish = head != null && head.canPublish(); // do this synced 
+      }
+      if (canPublish) {
+        try {
+          /*
+           * if we bock on publish -> lock IW -> lock BufferedDeletes we don't block
+           * concurrent segment flushes just because they want to append to the queue.
+           * the downside is that we need to force a purge on fullFlush since ther could
+           * be a ticket still in the queue. 
+           */
+          head.publish(writer);
+        } finally {
+          synchronized (this) {
+            // finally remove the publised ticket from the queue
+            final FlushTicket poll = queue.poll();
+            ticketCount.decrementAndGet();
+            assert poll == head;
+          }
+        }
+      } else {
+        break;
+      }
+    }
+  }
+
+  void forcePurge(DocumentsWriter writer) throws IOException {
+    purgeLock.lock();
+    try {
+      innerPurge(writer);
+    } finally {
+      purgeLock.unlock();
+    }
+  }
+
+  void tryPurge(DocumentsWriter writer) throws IOException {
+    if (purgeLock.tryLock()) {
+      try {
+        innerPurge(writer);
+      } finally {
+        purgeLock.unlock();
+      }
+    }
+  }
+
+  synchronized void clear() {
+    queue.clear();
+    ticketCount.set(0);
+  }
+
+  static abstract class FlushTicket {
+    protected FrozenBufferedDeletes frozenDeletes;
+    protected boolean published = false;
+
+    protected FlushTicket(FrozenBufferedDeletes frozenDeletes) {
+      assert frozenDeletes != null;
+      this.frozenDeletes = frozenDeletes;
+    }
+
+    protected abstract void publish(DocumentsWriter writer) throws IOException;
+    protected abstract boolean canPublish();
+  }
+  
+  static final class GlobalDeletesTicket extends FlushTicket{
+
+    protected GlobalDeletesTicket(FrozenBufferedDeletes frozenDeletes) {
+      super(frozenDeletes);
+    }
+    protected void publish(DocumentsWriter writer) throws IOException {
+      assert !published : "ticket was already publised - can not publish twice";
+      published = true;
+      // its a global ticket - no segment to publish
+      writer.finishFlush(null, frozenDeletes);
+    }
+
+    protected boolean canPublish() {
+      return true;
+    }
+  }
+  
+  static final class SegmentFlushTicket extends FlushTicket {
+    private FlushedSegment segment;
+    private boolean failed = false;
+    
+    protected SegmentFlushTicket(FrozenBufferedDeletes frozenDeletes) {
+      super(frozenDeletes);
+    }
+    
+    protected void publish(DocumentsWriter writer) throws IOException {
+      assert !published : "ticket was already publised - can not publish twice";
+      published = true;
+      writer.finishFlush(segment, frozenDeletes);
+    }
+    
+    protected void setSegment(FlushedSegment segment) {
+      assert !failed;
+      this.segment = segment;
+    }
+    
+    protected void setFailed() {
+      assert segment == null;
+      failed = true;
+    }
+
+    protected boolean canPublish() {
+      return segment != null || failed;
+    }
+  }
+}
\ No newline at end of file