You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by si...@apache.org on 2011/11/04 21:16:08 UTC
svn commit: r1197742 - in /lucene/dev/trunk/lucene/src:
java/org/apache/lucene/index/DocumentsWriter.java
java/org/apache/lucene/index/IndexWriter.java
test/org/apache/lucene/index/TestIndexWriterNRTIsCurrent.java
Author: simonw
Date: Fri Nov 4 20:16:08 2011
New Revision: 1197742
URL: http://svn.apache.org/viewvc?rev=1197742&view=rev
Log:
LUCENE-3551: IW#nrtIsCurrent returns false-positive when full flush runs concurrently
Added:
lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestIndexWriterNRTIsCurrent.java (with props)
Modified:
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IndexWriter.java
Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java?rev=1197742&r1=1197741&r2=1197742&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java Fri Nov 4 20:16:08 2011
@@ -117,7 +117,14 @@ final class DocumentsWriter {
// TODO: cut over to BytesRefHash in BufferedDeletes
volatile DocumentsWriterDeleteQueue deleteQueue = new DocumentsWriterDeleteQueue();
- private final Queue<FlushTicket> ticketQueue = new LinkedList<DocumentsWriter.FlushTicket>();
+ private final TicketQueue ticketQueue = new TicketQueue();
+ /*
+ * we preserve changes during a full flush since IW might not checkout before
+ * we release all changes. NRT Readers otherwise suddenly return true from
+ * isCurrent while there are actually changes currently committed. See also
+ * #anyChanges() & #flushAllThreads
+ */
+ private volatile boolean pendingChangesInCurrentFullFlush;
private Collection<String> abortedFiles; // List of files that were written before last abort()
@@ -170,6 +177,7 @@ final class DocumentsWriter {
private void applyAllDeletes(DocumentsWriterDeleteQueue deleteQueue) throws IOException {
if (deleteQueue != null && !flushControl.isFullFlush()) {
synchronized (ticketQueue) {
+ ticketQueue.incTicketCount();// first inc the ticket count - freeze opens a window for #anyChanges to fail
// Freeze and insert the delete flush ticket in the queue
ticketQueue.add(new FlushTicket(deleteQueue.freezeGlobalBuffer(null), false));
applyFlushTickets();
@@ -256,9 +264,22 @@ final class DocumentsWriter {
}
boolean anyChanges() {
- return numDocsInRAM.get() != 0 || anyDeletions();
+ if (infoStream != null) {
+ message("docWriter: anyChanges? numDocsInRam=" + numDocsInRAM.get()
+ + " deletes=" + anyDeletions() + " hasTickets:"
+ + ticketQueue.hasTickets() + " pendingChangesInFullFlush: "
+ + pendingChangesInCurrentFullFlush);
+ }
+ /*
+ * changes are either in a DWPT or in the deleteQueue.
+ * yet if we currently flush deletes and / or dwpt there
+ * could be a window where all changes are in the ticket queue
+ * before they are published to the IW. ie we need to check if the
+ * ticket queue has any tickets.
+ */
+ return numDocsInRAM.get() != 0 || anyDeletions() || ticketQueue.hasTickets() || pendingChangesInCurrentFullFlush;
}
-
+
public int getBufferedDeleteTermsSize() {
return deleteQueue.getBufferedDeleteTermsSize();
}
@@ -283,7 +304,7 @@ final class DocumentsWriter {
if (flushControl.anyStalledThreads() || flushControl.numQueuedFlushes() > 0) {
// Help out flushing any queued DWPTs so we can un-stall:
if (infoStream != null) {
- message("DocumentsWriter has queued dwpt; will hijack this thread to flush pending segment(s)");
+ message("docWriter: DocumentsWriter has queued dwpt; will hijack this thread to flush pending segment(s)");
}
do {
// Try pick up pending threads here if possible
@@ -417,7 +438,7 @@ final class DocumentsWriter {
synchronized (ticketQueue) {
// Each flush is assigned a ticket in the order they acquire the ticketQueue lock
ticket = new FlushTicket(flushingDWPT.prepareFlush(), true);
- ticketQueue.add(ticket);
+ ticketQueue.incrementAndAdd(ticket);
}
// flush concurrently without locking
@@ -474,8 +495,11 @@ final class DocumentsWriter {
// Keep publishing eligible flushed segments:
final FlushTicket head = ticketQueue.peek();
if (head != null && head.canPublish()) {
- ticketQueue.poll();
- finishFlush(head.segment, head.frozenDeletes);
+ try {
+ finishFlush(head.segment, head.frozenDeletes);
+ } finally {
+ ticketQueue.poll();
+ }
} else {
break;
}
@@ -489,7 +513,7 @@ final class DocumentsWriter {
if (newSegment == null) {
assert bufferedDeletes != null;
if (bufferedDeletes != null && bufferedDeletes.any()) {
- indexWriter.bufferedDeletesStream.push(bufferedDeletes);
+ indexWriter.publishFrozenDeletes(bufferedDeletes);
if (infoStream != null) {
message("flush: push buffered deletes: " + bufferedDeletes);
}
@@ -535,6 +559,7 @@ final class DocumentsWriter {
// for asserts
private volatile DocumentsWriterDeleteQueue currentFullFlushDelQueue = null;
+
// for asserts
private synchronized boolean setFlushingDeleteQueue(DocumentsWriterDeleteQueue session) {
currentFullFlushDelQueue = session;
@@ -554,6 +579,7 @@ final class DocumentsWriter {
}
synchronized (this) {
+ pendingChangesInCurrentFullFlush = anyChanges();
flushingDeleteQueue = deleteQueue;
/* Cutover to a new delete queue. This must be synced on the flush control
* otherwise a new DWPT could sneak into the loop with an already flushing
@@ -573,11 +599,12 @@ final class DocumentsWriter {
}
// If a concurrent flush is still in flight wait for it
flushControl.waitForFlush();
- if (!anythingFlushed) { // apply deletes if we did not flush any document
+ if (!anythingFlushed && flushingDeleteQueue.anyChanges()) { // apply deletes if we did not flush any document
if (infoStream != null) {
message(Thread.currentThread().getName() + ": flush naked frozen global deletes");
}
synchronized (ticketQueue) {
+ ticketQueue.incTicketCount(); // first inc the ticket count - freeze opens a window for #anyChanges to fail
ticketQueue.add(new FlushTicket(flushingDeleteQueue.freezeGlobalBuffer(null), false));
}
applyFlushTickets();
@@ -590,16 +617,21 @@ final class DocumentsWriter {
}
final void finishFullFlush(boolean success) {
- if (infoStream != null) {
- message(Thread.currentThread().getName() + " finishFullFlush success=" + success);
- }
- assert setFlushingDeleteQueue(null);
- if (success) {
- // Release the flush lock
- flushControl.finishFullFlush();
- } else {
- flushControl.abortFullFlushes();
+ try {
+ if (infoStream != null) {
+ message(Thread.currentThread().getName() + " finishFullFlush success=" + success);
+ }
+ assert setFlushingDeleteQueue(null);
+ if (success) {
+ // Release the flush lock
+ flushControl.finishFullFlush();
+ } else {
+ flushControl.abortFullFlushes();
+ }
+ } finally {
+ pendingChangesInCurrentFullFlush = false;
}
+
}
static final class FlushTicket {
@@ -618,6 +650,46 @@ final class DocumentsWriter {
}
}
+ static final class TicketQueue {
+ private final Queue<FlushTicket> queue = new LinkedList<FlushTicket>();
+ final AtomicInteger ticketCount = new AtomicInteger();
+
+ void incTicketCount() {
+ ticketCount.incrementAndGet();
+ }
+
+ public boolean hasTickets() {
+ assert ticketCount.get() >= 0;
+ return ticketCount.get() != 0;
+ }
+
+ void incrementAndAdd(FlushTicket ticket) {
+ incTicketCount();
+ add(ticket);
+ }
+
+ void add(FlushTicket ticket) {
+ queue.add(ticket);
+ }
+
+ FlushTicket peek() {
+ return queue.peek();
+ }
+
+ FlushTicket poll() {
+ try {
+ return queue.poll();
+ } finally {
+ ticketCount.decrementAndGet();
+ }
+ }
+
+ void clear() {
+ queue.clear();
+ ticketCount.set(0);
+ }
+ }
+
// use by IW during close to assert all DWPT are inactive after final flush
boolean assertNoActiveDWPT() {
Iterator<ThreadState> activePerThreadsIterator = perThreadPool.getAllPerThreadsIterator();
Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IndexWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IndexWriter.java?rev=1197742&r1=1197741&r2=1197742&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IndexWriter.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/IndexWriter.java Fri Nov 4 20:16:08 2011
@@ -2354,6 +2354,13 @@ public class IndexWriter implements Clos
return newSegment;
}
+ synchronized void publishFrozenDeletes(FrozenBufferedDeletes packet) throws IOException {
+ assert packet != null && packet.any();
+ synchronized (bufferedDeletesStream) {
+ bufferedDeletesStream.push(packet);
+ }
+ }
+
/**
* Atomically adds the segment private delete packet and publishes the flushed
* segments SegmentInfo to the index writer. NOTE: use
@@ -2984,14 +2991,14 @@ public class IndexWriter implements Clos
final boolean anySegmentFlushed;
synchronized (fullFlushLock) {
+ boolean flushSuccess = false;
try {
anySegmentFlushed = docWriter.flushAllThreads();
- success = true;
+ flushSuccess = true;
} finally {
- docWriter.finishFullFlush(success);
+ docWriter.finishFullFlush(flushSuccess);
}
}
- success = false;
synchronized(this) {
maybeApplyDeletes(applyAllDeletes);
doAfterFlush();
@@ -4074,6 +4081,10 @@ public class IndexWriter implements Clos
synchronized boolean nrtIsCurrent(SegmentInfos infos) {
//System.out.println("IW.nrtIsCurrent " + (infos.version == segmentInfos.version && !docWriter.anyChanges() && !bufferedDeletesStream.any()));
ensureOpen();
+ if (infoStream != null) {
+ message("nrtIsCurrent: infoVersion matches: " + (infos.version == segmentInfos.version) + " DW changes: " + docWriter.anyChanges() + " BD changes: "+bufferedDeletesStream.any());
+
+ }
return infos.version == segmentInfos.version && !docWriter.anyChanges() && !bufferedDeletesStream.any();
}
Added: lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestIndexWriterNRTIsCurrent.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestIndexWriterNRTIsCurrent.java?rev=1197742&view=auto
==============================================================================
--- lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestIndexWriterNRTIsCurrent.java (added)
+++ lucene/dev/trunk/lucene/src/test/org/apache/lucene/index/TestIndexWriterNRTIsCurrent.java Fri Nov 4 20:16:08 2011
@@ -0,0 +1,203 @@
+package org.apache.lucene.index;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.lucene.analysis.MockAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.TextField;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.LockObtainFailedException;
+import org.apache.lucene.util.LuceneTestCase;
+
+public class TestIndexWriterNRTIsCurrent extends LuceneTestCase {
+
+ public static class ReaderHolder {
+ volatile IndexReader reader;
+ volatile boolean stop = false;
+ }
+
+ public void testIsCurrentWithThreads() throws CorruptIndexException,
+ LockObtainFailedException, IOException, InterruptedException {
+ Directory dir = newDirectory();
+ IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT,
+ new MockAnalyzer(random));
+ IndexWriter writer = new IndexWriter(dir, conf);
+ if (VERBOSE) {
+ writer.setInfoStream(System.out);
+ }
+ ReaderHolder holder = new ReaderHolder();
+ ReaderThread[] threads = new ReaderThread[atLeast(3)];
+ final CountDownLatch latch = new CountDownLatch(1);
+ WriterThread writerThread = new WriterThread(holder, writer,
+ atLeast(500), random, latch);
+ for (int i = 0; i < threads.length; i++) {
+ threads[i] = new ReaderThread(holder, latch);
+ threads[i].start();
+ }
+ writerThread.start();
+
+ writerThread.join();
+ boolean failed = writerThread.failed != null;
+ if (failed)
+ writerThread.failed.printStackTrace();
+ for (int i = 0; i < threads.length; i++) {
+ threads[i].join();
+ if (threads[i].failed != null) {
+ threads[i].failed.printStackTrace();
+ failed = true;
+ }
+ }
+ assertFalse(failed);
+ writer.close();
+ dir.close();
+
+ }
+
+ public static class WriterThread extends Thread {
+ private final ReaderHolder holder;
+ private final IndexWriter writer;
+ private final int numOps;
+ private final Random random;
+ private boolean countdown = true;
+ private final CountDownLatch latch;
+ Throwable failed;
+
+ WriterThread(ReaderHolder holder, IndexWriter writer, int numOps,
+ Random random, CountDownLatch latch) {
+ super();
+ this.holder = holder;
+ this.writer = writer;
+ this.numOps = numOps;
+ this.random = random;
+ this.latch = latch;
+ }
+
+ public void run() {
+ IndexReader currentReader = null;
+ try {
+ Document doc = new Document();
+ doc.add(new Field("id", "1", TextField.TYPE_UNSTORED));
+ writer.addDocument(doc);
+ holder.reader = currentReader = writer.getReader(true);
+ Term term = new Term("id");
+ for (int i = 0; i < numOps && !holder.stop; i++) {
+ float nextOp = random.nextFloat();
+ if (nextOp < 0.3) {
+ term.set("id", "1");
+ writer.updateDocument(term, doc);
+ } else if (nextOp < 0.5) {
+ writer.addDocument(doc);
+ } else {
+ term.set("id", "1");
+ writer.deleteDocuments(term);
+ }
+ if (holder.reader != currentReader) {
+ holder.reader = currentReader;
+ if (countdown) {
+ countdown = false;
+ latch.countDown();
+ }
+ }
+ if (random.nextBoolean()) {
+ writer.commit();
+ final IndexReader newReader = IndexReader
+ .openIfChanged(currentReader);
+ if (newReader != null) {
+ currentReader.decRef();
+ currentReader = newReader;
+ }
+ if (currentReader.numDocs() == 0) {
+ writer.addDocument(doc);
+ }
+ }
+ }
+ } catch (Throwable e) {
+ failed = e;
+ } finally {
+ holder.reader = null;
+ if (countdown) {
+ latch.countDown();
+ }
+ if (currentReader != null) {
+ try {
+ currentReader.decRef();
+ } catch (IOException e) {
+ }
+ }
+ }
+ if (VERBOSE) {
+ System.out.println("writer stopped - forced by reader: " + holder.stop);
+ }
+ }
+
+ }
+
+ public static final class ReaderThread extends Thread {
+ private final ReaderHolder holder;
+ private final CountDownLatch latch;
+ Throwable failed;
+
+ ReaderThread(ReaderHolder holder, CountDownLatch latch) {
+ super();
+ this.holder = holder;
+ this.latch = latch;
+ }
+
+ public void run() {
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ failed = e;
+ return;
+ }
+ IndexReader reader;
+ while ((reader = holder.reader) != null) {
+ if (reader.tryIncRef()) {
+ try {
+ boolean current = reader.isCurrent();
+ if (VERBOSE) {
+ System.out.println("Thread: " + Thread.currentThread() + " Reader: " + reader + " isCurrent:" + current);
+ }
+
+ assertFalse(current);
+ } catch (Throwable e) {
+ if (VERBOSE) {
+ System.out.println("FAILED Thread: " + Thread.currentThread() + " Reader: " + reader + " isCurrent: false");
+ }
+ failed = e;
+ holder.stop = true;
+ return;
+ } finally {
+ try {
+ reader.decRef();
+ } catch (IOException e) {
+ if (failed == null) {
+ failed = e;
+ }
+ return;
+ }
+ }
+ }
+ }
+ }
+ }
+}