You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by si...@apache.org on 2012/01/16 16:07:04 UTC
svn commit: r1232017 - in
/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index:
DocumentsWriter.java DocumentsWriterFlushQueue.java
Author: simonw
Date: Mon Jan 16 15:07:04 2012
New Revision: 1232017
URL: http://svn.apache.org/viewvc?rev=1232017&view=rev
Log:
LUCENE-3692: DocumentsWriter blocks flushes when applyDeletes takes forever
Added:
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushQueue.java
Modified:
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java
Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java?rev=1232017&r1=1232016&r2=1232017&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java Mon Jan 16 15:07:04 2012
@@ -20,13 +20,12 @@ package org.apache.lucene.index;
import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
-import java.util.LinkedList;
import java.util.List;
-import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.codecs.Codec;
+import org.apache.lucene.index.DocumentsWriterFlushQueue.SegmentFlushTicket;
import org.apache.lucene.index.DocumentsWriterPerThread.FlushedSegment;
import org.apache.lucene.index.DocumentsWriterPerThread.IndexingChain;
import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
@@ -117,7 +116,7 @@ final class DocumentsWriter {
// TODO: cut over to BytesRefHash in BufferedDeletes
volatile DocumentsWriterDeleteQueue deleteQueue = new DocumentsWriterDeleteQueue();
- private final TicketQueue ticketQueue = new TicketQueue();
+ private final DocumentsWriterFlushQueue ticketQueue = new DocumentsWriterFlushQueue();
/*
* we preserve changes during a full flush since IW might not checkout before
* we release all changes. NRT Readers otherwise suddenly return true from
@@ -177,12 +176,7 @@ final class DocumentsWriter {
private void applyAllDeletes(DocumentsWriterDeleteQueue deleteQueue) throws IOException {
if (deleteQueue != null && !flushControl.isFullFlush()) {
- synchronized (ticketQueue) {
- ticketQueue.incTicketCount();// first inc the ticket count - freeze opens a window for #anyChanges to fail
- // Freeze and insert the delete flush ticket in the queue
- ticketQueue.add(new FlushTicket(deleteQueue.freezeGlobalBuffer(null), false));
- applyFlushTickets();
- }
+ ticketQueue.addDeletesAndPurge(this, deleteQueue);
}
indexWriter.applyAllDeletes();
indexWriter.flushCount.incrementAndGet();
@@ -401,7 +395,7 @@ final class DocumentsWriter {
while (flushingDWPT != null) {
maybeMerge = true;
boolean success = false;
- FlushTicket ticket = null;
+ SegmentFlushTicket ticket = null;
try {
assert currentFullFlushDelQueue == null
|| flushingDWPT.deleteQueue == currentFullFlushDelQueue : "expected: "
@@ -422,34 +416,27 @@ final class DocumentsWriter {
* might miss to deletes documents in 'A'.
*/
try {
- synchronized (ticketQueue) {
- // Each flush is assigned a ticket in the order they acquire the ticketQueue lock
- ticket = new FlushTicket(flushingDWPT.prepareFlush(), true);
- ticketQueue.incrementAndAdd(ticket);
- }
+ // Each flush is assigned a ticket in the order they acquire the ticketQueue lock
+ ticket = ticketQueue.addFlushTicket(flushingDWPT);
// flush concurrently without locking
final FlushedSegment newSegment = flushingDWPT.flush();
- synchronized (ticketQueue) {
- ticket.segment = newSegment;
- }
+ ticketQueue.addSegment(ticket, newSegment);
// flush was successful once we reached this point - new seg. has been assigned to the ticket!
success = true;
} finally {
if (!success && ticket != null) {
- synchronized (ticketQueue) {
- // In the case of a failure make sure we are making progress and
- // apply all the deletes since the segment flush failed since the flush
- // ticket could hold global deletes see FlushTicket#canPublish()
- ticket.isSegmentFlush = false;
- }
+ // In the case of a failure make sure we are making progress and
+ // apply all the deletes since the segment flush failed since the flush
+ // ticket could hold global deletes see FlushTicket#canPublish()
+ ticketQueue.markTicketFailed(ticket);
}
}
/*
* Now we are done and try to flush the ticket queue if the head of the
* queue has already finished the flush.
*/
- applyFlushTickets();
+ ticketQueue.tryPurge(this);
} finally {
flushControl.doAfterFlush(flushingDWPT);
flushingDWPT.checkAndResetHasAborted();
@@ -476,25 +463,7 @@ final class DocumentsWriter {
return maybeMerge;
}
- private void applyFlushTickets() throws IOException {
- synchronized (ticketQueue) {
- while (true) {
- // Keep publishing eligible flushed segments:
- final FlushTicket head = ticketQueue.peek();
- if (head != null && head.canPublish()) {
- try {
- finishFlush(head.segment, head.frozenDeletes);
- } finally {
- ticketQueue.poll();
- }
- } else {
- break;
- }
- }
- }
- }
-
- private void finishFlush(FlushedSegment newSegment, FrozenBufferedDeletes bufferedDeletes)
+ void finishFlush(FlushedSegment newSegment, FrozenBufferedDeletes bufferedDeletes)
throws IOException {
// Finish the flushed segment and publish it to IndexWriter
if (newSegment == null) {
@@ -590,13 +559,11 @@ final class DocumentsWriter {
if (infoStream.isEnabled("DW")) {
infoStream.message("DW", Thread.currentThread().getName() + ": flush naked frozen global deletes");
}
- synchronized (ticketQueue) {
- ticketQueue.incTicketCount(); // first inc the ticket count - freeze opens a window for #anyChanges to fail
- ticketQueue.add(new FlushTicket(flushingDeleteQueue.freezeGlobalBuffer(null), false));
- }
- applyFlushTickets();
+ ticketQueue.addDeletesAndPurge(this, flushingDeleteQueue);
+ } else {
+ ticketQueue.forcePurge(this);
}
- assert !flushingDeleteQueue.anyChanges();
+ assert !flushingDeleteQueue.anyChanges() && !ticketQueue.hasTickets();
} finally {
assert flushingDeleteQueue == currentFullFlushDelQueue;
}
@@ -621,61 +588,8 @@ final class DocumentsWriter {
}
- static final class FlushTicket {
- final FrozenBufferedDeletes frozenDeletes;
- /* access to non-final members must be synchronized on DW#ticketQueue */
- FlushedSegment segment;
- boolean isSegmentFlush;
-
- FlushTicket(FrozenBufferedDeletes frozenDeletes, boolean isSegmentFlush) {
- this.frozenDeletes = frozenDeletes;
- this.isSegmentFlush = isSegmentFlush;
- }
-
- boolean canPublish() {
- return (!isSegmentFlush || segment != null);
- }
- }
- static final class TicketQueue {
- private final Queue<FlushTicket> queue = new LinkedList<FlushTicket>();
- final AtomicInteger ticketCount = new AtomicInteger();
-
- void incTicketCount() {
- ticketCount.incrementAndGet();
- }
-
- public boolean hasTickets() {
- assert ticketCount.get() >= 0;
- return ticketCount.get() != 0;
- }
-
- void incrementAndAdd(FlushTicket ticket) {
- incTicketCount();
- add(ticket);
- }
-
- void add(FlushTicket ticket) {
- queue.add(ticket);
- }
-
- FlushTicket peek() {
- return queue.peek();
- }
-
- FlushTicket poll() {
- try {
- return queue.poll();
- } finally {
- ticketCount.decrementAndGet();
- }
- }
-
- void clear() {
- queue.clear();
- ticketCount.set(0);
- }
- }
+
// use by IW during close to assert all DWPT are inactive after final flush
boolean assertNoActiveDWPT() {
Added: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushQueue.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushQueue.java?rev=1232017&view=auto
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushQueue.java (added)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriterFlushQueue.java Mon Jan 16 15:07:04 2012
@@ -0,0 +1,211 @@
+package org.apache.lucene.index;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.lucene.index.DocumentsWriterPerThread.FlushedSegment;
+
+
+/**
+ * @lucene.internal
+ */
+public class DocumentsWriterFlushQueue {
+ private final Queue<FlushTicket> queue = new LinkedList<FlushTicket>();
+ // we track tickets separately since count must be present even before the ticket is
+ // constructed ie. queue.size would not reflect it.
+ private final AtomicInteger ticketCount = new AtomicInteger();
+ private final ReentrantLock purgeLock = new ReentrantLock();
+
+ synchronized void addDeletesAndPurge(DocumentsWriter writer,
+ DocumentsWriterDeleteQueue deleteQueue) throws IOException {
+ incTickets();// first inc the ticket count - freeze opens
+ // a window for #anyChanges to fail
+ boolean success = false;
+ try {
+ queue.add(new GlobalDeletesTicket(deleteQueue.freezeGlobalBuffer(null)));
+ success = true;
+ } finally {
+ if (!success) {
+ decTickets();
+ }
+ }
+ forcePurge(writer);
+ }
+
+ private void incTickets() {
+ int numTickets = ticketCount.incrementAndGet();
+ assert numTickets > 0;
+ }
+
+ private void decTickets() {
+ int numTickets = ticketCount.decrementAndGet();
+ assert numTickets >= 0;
+ }
+
+ synchronized SegmentFlushTicket addFlushTicket(DocumentsWriterPerThread dwpt) {
+ // Each flush is assigned a ticket in the order they acquire the ticketQueue
+ // lock
+ incTickets();
+ boolean success = false;
+ try {
+ // prepare flush freezes the global deletes - do in synced block!
+ final SegmentFlushTicket ticket = new SegmentFlushTicket(dwpt.prepareFlush());
+ queue.add(ticket);
+ success = true;
+ return ticket;
+ } finally {
+ if (!success) {
+ decTickets();
+ }
+ }
+ }
+
+ synchronized void addSegment(SegmentFlushTicket ticket, FlushedSegment segment) {
+ // the actual flush is done asynchronously and once done the FlushedSegment
+ // is passed to the flush ticket
+ ticket.setSegment(segment);
+ }
+
+ synchronized void markTicketFailed(SegmentFlushTicket ticket) {
+ // to free the queue we mark tickets as failed just to clean up the queue.
+ ticket.setFailed();
+ }
+
+ boolean hasTickets() {
+ assert ticketCount.get() >= 0 : "ticketCount should be >= 0 but was: " + ticketCount.get();
+ return ticketCount.get() != 0;
+ }
+
+ private void innerPurge(DocumentsWriter writer) throws IOException {
+ assert purgeLock.isHeldByCurrentThread();
+ while (true) {
+ final FlushTicket head;
+ final boolean canPublish;
+ synchronized (this) {
+ head = queue.peek();
+ canPublish = head != null && head.canPublish(); // do this synced
+ }
+ if (canPublish) {
+ try {
+ /*
+ * if we bock on publish -> lock IW -> lock BufferedDeletes we don't block
+ * concurrent segment flushes just because they want to append to the queue.
+ * the downside is that we need to force a purge on fullFlush since ther could
+ * be a ticket still in the queue.
+ */
+ head.publish(writer);
+ } finally {
+ synchronized (this) {
+ // finally remove the publised ticket from the queue
+ final FlushTicket poll = queue.poll();
+ ticketCount.decrementAndGet();
+ assert poll == head;
+ }
+ }
+ } else {
+ break;
+ }
+ }
+ }
+
+ void forcePurge(DocumentsWriter writer) throws IOException {
+ purgeLock.lock();
+ try {
+ innerPurge(writer);
+ } finally {
+ purgeLock.unlock();
+ }
+ }
+
+ void tryPurge(DocumentsWriter writer) throws IOException {
+ if (purgeLock.tryLock()) {
+ try {
+ innerPurge(writer);
+ } finally {
+ purgeLock.unlock();
+ }
+ }
+ }
+
+ synchronized void clear() {
+ queue.clear();
+ ticketCount.set(0);
+ }
+
+ static abstract class FlushTicket {
+ protected FrozenBufferedDeletes frozenDeletes;
+ protected boolean published = false;
+
+ protected FlushTicket(FrozenBufferedDeletes frozenDeletes) {
+ assert frozenDeletes != null;
+ this.frozenDeletes = frozenDeletes;
+ }
+
+ protected abstract void publish(DocumentsWriter writer) throws IOException;
+ protected abstract boolean canPublish();
+ }
+
+ static final class GlobalDeletesTicket extends FlushTicket{
+
+ protected GlobalDeletesTicket(FrozenBufferedDeletes frozenDeletes) {
+ super(frozenDeletes);
+ }
+ protected void publish(DocumentsWriter writer) throws IOException {
+ assert !published : "ticket was already publised - can not publish twice";
+ published = true;
+ // its a global ticket - no segment to publish
+ writer.finishFlush(null, frozenDeletes);
+ }
+
+ protected boolean canPublish() {
+ return true;
+ }
+ }
+
+ static final class SegmentFlushTicket extends FlushTicket {
+ private FlushedSegment segment;
+ private boolean failed = false;
+
+ protected SegmentFlushTicket(FrozenBufferedDeletes frozenDeletes) {
+ super(frozenDeletes);
+ }
+
+ protected void publish(DocumentsWriter writer) throws IOException {
+ assert !published : "ticket was already publised - can not publish twice";
+ published = true;
+ writer.finishFlush(segment, frozenDeletes);
+ }
+
+ protected void setSegment(FlushedSegment segment) {
+ assert !failed;
+ this.segment = segment;
+ }
+
+ protected void setFailed() {
+ assert segment == null;
+ failed = true;
+ }
+
+ protected boolean canPublish() {
+ return segment != null || failed;
+ }
+ }
+}
\ No newline at end of file