You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by mi...@apache.org on 2016/06/11 19:38:00 UTC
[01/15] lucene-solr:branch_6x: initial sequence numbers patch
Repository: lucene-solr
Updated Branches:
refs/heads/branch_6x 270d3859a -> 00584579b
initial sequence numbers patch
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/88dd1aad
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/88dd1aad
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/88dd1aad
Branch: refs/heads/branch_6x
Commit: 88dd1aad774bda9a628bd5807d5d376a18b6fb7f
Parents: 270d385
Author: Mike McCandless <mi...@apache.org>
Authored: Tue May 24 10:20:30 2016 -0400
Committer: Mike McCandless <mi...@apache.org>
Committed: Sat Jun 11 14:06:00 2016 -0400
----------------------------------------------------------------------
.../apache/lucene/index/BufferedUpdates.java | 2 +-
.../apache/lucene/index/DocumentsWriter.java | 37 ++-
.../index/DocumentsWriterDeleteQueue.java | 49 ++--
.../index/DocumentsWriterFlushControl.java | 18 +-
.../lucene/index/DocumentsWriterPerThread.java | 17 +-
.../index/DocumentsWriterPerThreadPool.java | 15 -
.../org/apache/lucene/index/IndexWriter.java | 74 ++++-
.../lucene/index/StandardDirectoryReader.java | 2 +-
.../lucene/index/TrackingIndexWriter.java | 1 +
.../org/apache/lucene/index/TwoPhaseCommit.java | 5 +-
.../index/TestIndexingSequenceNumbers.java | 293 +++++++++++++++++++
11 files changed, 437 insertions(+), 76 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/88dd1aad/lucene/core/src/java/org/apache/lucene/index/BufferedUpdates.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/BufferedUpdates.java b/lucene/core/src/java/org/apache/lucene/index/BufferedUpdates.java
index 6225337..b59c616 100644
--- a/lucene/core/src/java/org/apache/lucene/index/BufferedUpdates.java
+++ b/lucene/core/src/java/org/apache/lucene/index/BufferedUpdates.java
@@ -34,7 +34,7 @@ import org.apache.lucene.util.RamUsageEstimator;
* single segment. This is used to hold buffered pending
* deletes and updates against the to-be-flushed segment. Once the
* deletes and updates are pushed (on flush in DocumentsWriter), they
- * are converted to a FrozenDeletes instance. */
+ * are converted to a FrozenBufferedUpdates instance. */
// NOTE: instances of this class are accessed either via a private
// instance on DocumentWriterPerThread, or via sync'd code by
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/88dd1aad/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
index d5c1597..bbbef71 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
@@ -141,18 +141,22 @@ final class DocumentsWriter implements Closeable, Accountable {
final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue;
deleteQueue.addDelete(queries);
flushControl.doOnDelete();
+ // nocommit long
return applyAllDeletes(deleteQueue);
}
// TODO: we could check w/ FreqProxTermsWriter: if the
// term doesn't exist, don't bother buffering into the
// per-DWPT map (but still must go into the global map)
- synchronized boolean deleteTerms(final Term... terms) throws IOException {
+ synchronized long deleteTerms(final Term... terms) throws IOException {
// TODO why is this synchronized?
final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue;
- deleteQueue.addDelete(terms);
+ long seqNo = deleteQueue.addDelete(terms);
flushControl.doOnDelete();
- return applyAllDeletes( deleteQueue);
+ if (applyAllDeletes(deleteQueue)) {
+ seqNo = -seqNo;
+ }
+ return seqNo;
}
synchronized boolean updateDocValues(DocValuesUpdate... updates) throws IOException {
@@ -429,7 +433,7 @@ final class DocumentsWriter implements Closeable, Accountable {
return postUpdate(flushingDWPT, hasEvents);
}
- boolean updateDocument(final Iterable<? extends IndexableField> doc, final Analyzer analyzer,
+ long updateDocument(final Iterable<? extends IndexableField> doc, final Analyzer analyzer,
final Term delTerm) throws IOException, AbortingException {
boolean hasEvents = preUpdate();
@@ -437,6 +441,7 @@ final class DocumentsWriter implements Closeable, Accountable {
final ThreadState perThread = flushControl.obtainAndLock();
final DocumentsWriterPerThread flushingDWPT;
+ final long seqno;
try {
// This must happen after we've pulled the ThreadState because IW.close
// waits for all ThreadStates to be released:
@@ -446,7 +451,7 @@ final class DocumentsWriter implements Closeable, Accountable {
final DocumentsWriterPerThread dwpt = perThread.dwpt;
final int dwptNumDocs = dwpt.getNumDocsInRAM();
try {
- dwpt.updateDocument(doc, analyzer, delTerm);
+ seqno = dwpt.updateDocument(doc, analyzer, delTerm);
} catch (AbortingException ae) {
flushControl.doOnAbort(perThread);
dwpt.abort();
@@ -463,7 +468,11 @@ final class DocumentsWriter implements Closeable, Accountable {
perThreadPool.release(perThread);
}
- return postUpdate(flushingDWPT, hasEvents);
+ if (postUpdate(flushingDWPT, hasEvents)) {
+ return -seqno;
+ } else {
+ return seqno;
+ }
}
private boolean doFlush(DocumentsWriterPerThread flushingDWPT) throws IOException, AbortingException {
@@ -587,20 +596,22 @@ final class DocumentsWriter implements Closeable, Accountable {
* two stage operation; the caller must ensure (in try/finally) that finishFlush
* is called after this method, to release the flush lock in DWFlushControl
*/
- boolean flushAllThreads()
+ long flushAllThreads()
throws IOException, AbortingException {
final DocumentsWriterDeleteQueue flushingDeleteQueue;
if (infoStream.isEnabled("DW")) {
infoStream.message("DW", "startFullFlush");
}
-
+
+ long seqNo;
+
synchronized (this) {
pendingChangesInCurrentFullFlush = anyChanges();
flushingDeleteQueue = deleteQueue;
/* Cutover to a new delete queue. This must be synced on the flush control
* otherwise a new DWPT could sneak into the loop with an already flushing
* delete queue */
- flushControl.markForFullFlush(); // swaps the delQueue synced on FlushControl
+ seqNo = flushControl.markForFullFlush(); // swaps the delQueue synced on FlushControl
assert setFlushingDeleteQueue(flushingDeleteQueue);
}
assert currentFullFlushDelQueue != null;
@@ -620,13 +631,17 @@ final class DocumentsWriter implements Closeable, Accountable {
infoStream.message("DW", Thread.currentThread().getName() + ": flush naked frozen global deletes");
}
ticketQueue.addDeletes(flushingDeleteQueue);
- }
+ }
ticketQueue.forcePurge(writer);
assert !flushingDeleteQueue.anyChanges() && !ticketQueue.hasTickets();
} finally {
assert flushingDeleteQueue == currentFullFlushDelQueue;
}
- return anythingFlushed;
+ if (anythingFlushed) {
+ return -seqNo;
+ } else {
+ return seqNo;
+ }
}
void finishFullFlush(IndexWriter indexWriter, boolean success) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/88dd1aad/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java
index 04a0cfb..5d0e83d 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java
@@ -17,6 +17,7 @@
package org.apache.lucene.index;
import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.locks.ReentrantLock;
@@ -76,23 +77,28 @@ final class DocumentsWriterDeleteQueue implements Accountable {
private final DeleteSlice globalSlice;
private final BufferedUpdates globalBufferedUpdates;
+ private long gen;
// only acquired to update the global deletes, pkg-private for access by tests:
final ReentrantLock globalBufferLock = new ReentrantLock();
final long generation;
+
+ final AtomicLong seqNo;
DocumentsWriterDeleteQueue() {
- this(0);
+ // seqNo must start at 1 because some APIs negate this to encode a boolean
+ this(0, 1);
}
- DocumentsWriterDeleteQueue(long generation) {
- this(new BufferedUpdates(), generation);
+ DocumentsWriterDeleteQueue(long generation, long startSeqNo) {
+ this(new BufferedUpdates(), generation, startSeqNo);
}
- DocumentsWriterDeleteQueue(BufferedUpdates globalBufferedUpdates, long generation) {
+ DocumentsWriterDeleteQueue(BufferedUpdates globalBufferedUpdates, long generation, long startSeqNo) {
this.globalBufferedUpdates = globalBufferedUpdates;
this.generation = generation;
+ this.seqNo = new AtomicLong(startSeqNo);
/*
* we use a sentinel instance as our initial tail. No slice will ever try to
* apply this tail since the head is always omitted.
@@ -101,28 +107,31 @@ final class DocumentsWriterDeleteQueue implements Accountable {
globalSlice = new DeleteSlice(tail);
}
- void addDelete(Query... queries) {
- add(new QueryArrayNode(queries));
+ long addDelete(Query... queries) {
+ long seqNo = add(new QueryArrayNode(queries));
tryApplyGlobalSlice();
+ return seqNo;
}
- void addDelete(Term... terms) {
- add(new TermArrayNode(terms));
+ long addDelete(Term... terms) {
+ long seqNo = add(new TermArrayNode(terms));
tryApplyGlobalSlice();
+ return seqNo;
}
- void addDocValuesUpdates(DocValuesUpdate... updates) {
- add(new DocValuesUpdatesNode(updates));
+ long addDocValuesUpdates(DocValuesUpdate... updates) {
+ long seqNo = add(new DocValuesUpdatesNode(updates));
tryApplyGlobalSlice();
+ return seqNo;
}
/**
* invariant for document update
*/
- void add(Term term, DeleteSlice slice) {
+ long add(Term term, DeleteSlice slice) {
final TermNode termNode = new TermNode(term);
// System.out.println(Thread.currentThread().getName() + ": push " + termNode + " this=" + this);
- add(termNode);
+ long seqNo = add(termNode);
/*
* this is an update request where the term is the updated documents
* delTerm. in that case we need to guarantee that this insert is atomic
@@ -137,9 +146,12 @@ final class DocumentsWriterDeleteQueue implements Accountable {
assert slice.sliceHead != slice.sliceTail : "slice head and tail must differ after add";
tryApplyGlobalSlice(); // TODO doing this each time is not necessary maybe
// we can do it just every n times or so?
+
+ return seqNo;
}
- void add(Node<?> item) {
+ // nocommit can we remove the sync'd
+ synchronized long add(Node<?> newNode) {
/*
* this non-blocking / 'wait-free' linked list add was inspired by Apache
* Harmony's ConcurrentLinkedQueue Implementation.
@@ -157,18 +169,18 @@ final class DocumentsWriterDeleteQueue implements Accountable {
tailUpdater.compareAndSet(this, currentTail, tailNext); // can fail
} else {
/*
- * we are in quiescent state and can try to insert the item to the
+ * we are in quiescent state and can try to insert the new node to the
* current tail if we fail to insert we just retry the operation since
* somebody else has already added its item
*/
- if (currentTail.casNext(null, item)) {
+ if (currentTail.casNext(null, newNode)) {
/*
* now that we are done we need to advance the tail while another
* thread could have advanced it already so we can ignore the return
* type of this CAS call
*/
- tailUpdater.compareAndSet(this, currentTail, item);
- return;
+ tailUpdater.compareAndSet(this, currentTail, newNode);
+ return seqNo.getAndIncrement();
}
}
}
@@ -230,8 +242,7 @@ final class DocumentsWriterDeleteQueue implements Accountable {
}
// System.out.println(Thread.currentThread().getName() + ": now freeze global buffer " + globalBufferedDeletes);
- final FrozenBufferedUpdates packet = new FrozenBufferedUpdates(
- globalBufferedUpdates, false);
+ final FrozenBufferedUpdates packet = new FrozenBufferedUpdates(globalBufferedUpdates, false);
globalBufferedUpdates.clear();
return packet;
} finally {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/88dd1aad/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
index 38547e6..ec390d9 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
@@ -141,8 +141,7 @@ final class DocumentsWriterFlushControl implements Accountable {
}
private void commitPerThreadBytes(ThreadState perThread) {
- final long delta = perThread.dwpt.bytesUsed()
- - perThread.bytesUsed;
+ final long delta = perThread.dwpt.bytesUsed() - perThread.bytesUsed;
perThread.bytesUsed += delta;
/*
* We need to differentiate here if we are pending since setFlushPending
@@ -167,8 +166,7 @@ final class DocumentsWriterFlushControl implements Accountable {
return true;
}
- synchronized DocumentsWriterPerThread doAfterDocument(ThreadState perThread,
- boolean isUpdate) {
+ synchronized DocumentsWriterPerThread doAfterDocument(ThreadState perThread, boolean isUpdate) {
try {
commitPerThreadBytes(perThread);
if (!perThread.flushPending) {
@@ -471,8 +469,9 @@ final class DocumentsWriterFlushControl implements Accountable {
}
}
- void markForFullFlush() {
+ long markForFullFlush() {
final DocumentsWriterDeleteQueue flushingQueue;
+ long seqNo;
synchronized (this) {
assert !fullFlush : "called DWFC#markForFullFlush() while full flush is still running";
assert fullFlushBuffer.isEmpty() : "full flush buffer should be empty: "+ fullFlushBuffer;
@@ -480,7 +479,13 @@ final class DocumentsWriterFlushControl implements Accountable {
flushingQueue = documentsWriter.deleteQueue;
// Set a new delete queue - all subsequent DWPT will use this queue until
// we do another full flush
- DocumentsWriterDeleteQueue newQueue = new DocumentsWriterDeleteQueue(flushingQueue.generation+1);
+ //System.out.println("DWFC: fullFLush old seqNo=" + documentsWriter.deleteQueue.seqNo.get() + " activeThreadCount=" + perThreadPool.getActiveThreadStateCount());
+ seqNo = documentsWriter.deleteQueue.seqNo.get() + perThreadPool.getActiveThreadStateCount();
+
+ // nocommit is this (active thread state count) always enough of a gap? what if new indexing thread sneaks in just now? it would
+ // have to get this next delete queue?
+ DocumentsWriterDeleteQueue newQueue = new DocumentsWriterDeleteQueue(flushingQueue.generation+1, seqNo+1);
+
documentsWriter.deleteQueue = newQueue;
}
final int limit = perThreadPool.getActiveThreadStateCount();
@@ -520,6 +525,7 @@ final class DocumentsWriterFlushControl implements Accountable {
updateStallState();
}
assert assertActiveDeleteQueue(documentsWriter.deleteQueue);
+ return seqNo;
}
private boolean assertActiveDeleteQueue(DocumentsWriterDeleteQueue queue) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/88dd1aad/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
index 3e8a227..ab00662 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
@@ -175,7 +175,6 @@ class DocumentsWriterPerThread {
intBlockAllocator = new IntBlockAllocator(bytesUsed);
this.deleteQueue = deleteQueue;
assert numDocsInRAM == 0 : "num docs " + numDocsInRAM;
- pendingUpdates.clear();
deleteSlice = deleteQueue.newSlice();
segmentInfo = new SegmentInfo(directoryOrig, Version.LATEST, segmentName, -1, false, codec, Collections.emptyMap(), StringHelper.randomId(), new HashMap<>(), null);
@@ -210,7 +209,7 @@ class DocumentsWriterPerThread {
}
}
- public void updateDocument(Iterable<? extends IndexableField> doc, Analyzer analyzer, Term delTerm) throws IOException, AbortingException {
+ public long updateDocument(Iterable<? extends IndexableField> doc, Analyzer analyzer, Term delTerm) throws IOException, AbortingException {
testPoint("DocumentsWriterPerThread addDocument start");
assert deleteQueue != null;
reserveOneDoc();
@@ -241,7 +240,8 @@ class DocumentsWriterPerThread {
numDocsInRAM++;
}
}
- finishDocument(delTerm);
+
+ return finishDocument(delTerm);
}
public int updateDocuments(Iterable<? extends Iterable<? extends IndexableField>> docs, Analyzer analyzer, Term delTerm) throws IOException, AbortingException {
@@ -291,6 +291,8 @@ class DocumentsWriterPerThread {
deleteSlice.apply(pendingUpdates, numDocsInRAM-docCount);
}
+ // nocommit return seqNo here
+
} finally {
if (!allDocsIndexed && !aborted) {
// the iterator threw an exception that is not aborting
@@ -308,7 +310,7 @@ class DocumentsWriterPerThread {
return docCount;
}
- private void finishDocument(Term delTerm) {
+ private long finishDocument(Term delTerm) {
/*
* here we actually finish the document in two steps 1. push the delete into
* the queue and update our slice. 2. increment the DWPT private document
@@ -318,11 +320,14 @@ class DocumentsWriterPerThread {
* since we updated the slice the last time.
*/
boolean applySlice = numDocsInRAM != 0;
+ long seqNo;
if (delTerm != null) {
- deleteQueue.add(delTerm, deleteSlice);
+ seqNo = deleteQueue.add(delTerm, deleteSlice);
assert deleteSlice.isTailItem(delTerm) : "expected the delete term as the tail item";
} else {
applySlice &= deleteQueue.updateSlice(deleteSlice);
+ // nocommit we don't need to increment here?
+ seqNo = deleteQueue.seqNo.get();
}
if (applySlice) {
@@ -331,6 +336,8 @@ class DocumentsWriterPerThread {
deleteSlice.reset();
}
++numDocsInRAM;
+
+ return seqNo;
}
// Buffer a specific docID for deletion. Currently only
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/88dd1aad/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
index 63d2e96..0b0ac84 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
@@ -229,19 +229,4 @@ final class DocumentsWriterPerThreadPool {
synchronized int getMaxThreadStates() {
return threadStates.size();
}
-
- /**
- * Returns the ThreadState with the minimum estimated number of threads
- * waiting to acquire its lock or <code>null</code> if no {@link ThreadState}
- * is yet visible to the calling thread.
- */
- ThreadState minContendedThreadState() {
- ThreadState minThreadState = null;
- for (ThreadState state : threadStates) {
- if (minThreadState == null || state.getQueueLength() < minThreadState.getQueueLength()) {
- minThreadState = state;
- }
- }
- return minThreadState;
- }
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/88dd1aad/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
index 159f591..1682135 100644
--- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
@@ -266,6 +266,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
private List<SegmentCommitInfo> rollbackSegments; // list of segmentInfo we will fallback to if the commit fails
volatile SegmentInfos pendingCommit; // set when a commit is pending (after prepareCommit() & before commit())
+ volatile long pendingSeqNo;
volatile long pendingCommitChangeCount;
private Collection<String> filesToCommit;
@@ -425,7 +426,14 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
boolean success = false;
synchronized (fullFlushLock) {
try {
- anyChanges = docWriter.flushAllThreads();
+ // nocommit should we make this available in the returned NRT reader?
+ long seqNo = docWriter.flushAllThreads();
+ if (seqNo < 0) {
+ anyChanges = true;
+ seqNo = -seqNo;
+ } else {
+ anyChanges = false;
+ }
if (!anyChanges) {
// prevent double increment since docWriter#doFlush increments the flushcount
// if we flushed anything.
@@ -1283,8 +1291,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* @throws CorruptIndexException if the index is corrupt
* @throws IOException if there is a low-level IO error
*/
- public void addDocument(Iterable<? extends IndexableField> doc) throws IOException {
- updateDocument(null, doc);
+ public long addDocument(Iterable<? extends IndexableField> doc) throws IOException {
+ return updateDocument(null, doc);
}
/**
@@ -1447,14 +1455,20 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* @throws CorruptIndexException if the index is corrupt
* @throws IOException if there is a low-level IO error
*/
- public void deleteDocuments(Term... terms) throws IOException {
+ public long deleteDocuments(Term... terms) throws IOException {
ensureOpen();
try {
- if (docWriter.deleteTerms(terms)) {
+ long seqNo = docWriter.deleteTerms(terms);
+ if (seqNo < 0) {
+ seqNo = -seqNo;
processEvents(true, false);
}
+ return seqNo;
} catch (VirtualMachineError tragedy) {
tragicEvent(tragedy, "deleteDocuments(Term..)");
+
+ // dead code but javac disagrees:
+ return -1;
}
}
@@ -1500,15 +1514,18 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* @throws CorruptIndexException if the index is corrupt
* @throws IOException if there is a low-level IO error
*/
- public void updateDocument(Term term, Iterable<? extends IndexableField> doc) throws IOException {
+ public long updateDocument(Term term, Iterable<? extends IndexableField> doc) throws IOException {
ensureOpen();
try {
boolean success = false;
try {
- if (docWriter.updateDocument(doc, analyzer, term)) {
+ long seqNo = docWriter.updateDocument(doc, analyzer, term);
+ if (seqNo < 0) {
+ seqNo = - seqNo;
processEvents(true, false);
}
success = true;
+ return seqNo;
} finally {
if (!success) {
if (infoStream.isEnabled("IW")) {
@@ -1518,6 +1535,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
}
} catch (AbortingException | VirtualMachineError tragedy) {
tragicEvent(tragedy, "updateDocument");
+
+ // dead code but javac disagrees:
+ return -1;
}
}
@@ -2807,12 +2827,13 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* will internally call prepareCommit.
*/
@Override
- public final void prepareCommit() throws IOException {
+ public final long prepareCommit() throws IOException {
ensureOpen();
- prepareCommitInternal(config.getMergePolicy());
+ pendingSeqNo = prepareCommitInternal(config.getMergePolicy());
+ return pendingSeqNo;
}
- private void prepareCommitInternal(MergePolicy mergePolicy) throws IOException {
+ private long prepareCommitInternal(MergePolicy mergePolicy) throws IOException {
startCommitTime = System.nanoTime();
synchronized(commitLock) {
ensureOpen(false);
@@ -2833,6 +2854,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
testPoint("startDoFlush");
SegmentInfos toCommit = null;
boolean anySegmentsFlushed = false;
+ long seqNo;
// This is copied from doFlush, except it's modified to
// clone & incRef the flushed SegmentInfos inside the
@@ -2844,7 +2866,11 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
boolean flushSuccess = false;
boolean success = false;
try {
- anySegmentsFlushed = docWriter.flushAllThreads();
+ seqNo = docWriter.flushAllThreads();
+ if (seqNo < 0) {
+ anySegmentsFlushed = true;
+ seqNo = -seqNo;
+ }
if (!anySegmentsFlushed) {
// prevent double increment since docWriter#doFlush increments the flushcount
// if we flushed anything.
@@ -2898,6 +2924,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
}
} catch (AbortingException | VirtualMachineError tragedy) {
tragicEvent(tragedy, "prepareCommit");
+
+ // dead code but javac disagrees:
+ seqNo = -1;
}
boolean success = false;
@@ -2907,6 +2936,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
}
startCommit(toCommit);
success = true;
+ return seqNo;
} finally {
if (!success) {
synchronized (this) {
@@ -2983,9 +3013,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* @see #prepareCommit
*/
@Override
- public final void commit() throws IOException {
+ public final long commit() throws IOException {
ensureOpen();
- commitInternal(config.getMergePolicy());
+ // nocommit should we put seq no into sis?
+ return commitInternal(config.getMergePolicy());
}
/** Returns true if there may be changes that have not been
@@ -3001,7 +3032,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
return changeCount.get() != lastCommitChangeCount || docWriter.anyChanges() || bufferedUpdatesStream.any();
}
- private final void commitInternal(MergePolicy mergePolicy) throws IOException {
+ private final long commitInternal(MergePolicy mergePolicy) throws IOException {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "commit: start");
@@ -3014,18 +3045,23 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
infoStream.message("IW", "commit: enter lock");
}
+ long seqNo;
+
if (pendingCommit == null) {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "commit: now prepare");
}
- prepareCommitInternal(mergePolicy);
+ seqNo = prepareCommitInternal(mergePolicy);
} else {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "commit: already prepared");
}
+ seqNo = pendingSeqNo;
}
finishCommit();
+
+ return seqNo;
}
}
@@ -3167,7 +3203,13 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
synchronized (fullFlushLock) {
boolean flushSuccess = false;
try {
- anyChanges = docWriter.flushAllThreads();
+ long seqNo = docWriter.flushAllThreads();
+ if (seqNo < 0) {
+ seqNo = -seqNo;
+ anyChanges = true;
+ } else {
+ anyChanges = false;
+ }
if (!anyChanges) {
// flushCount is incremented in flushAllThreads
flushCount.incrementAndGet();
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/88dd1aad/lucene/core/src/java/org/apache/lucene/index/StandardDirectoryReader.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/StandardDirectoryReader.java b/lucene/core/src/java/org/apache/lucene/index/StandardDirectoryReader.java
index c0425c6..e2f81da 100644
--- a/lucene/core/src/java/org/apache/lucene/index/StandardDirectoryReader.java
+++ b/lucene/core/src/java/org/apache/lucene/index/StandardDirectoryReader.java
@@ -421,7 +421,7 @@ public final class StandardDirectoryReader extends DirectoryReader {
@Override
public String toString() {
- return "DirectoryReader.ReaderCommit(" + segmentsFileName + ")";
+ return "StandardDirectoryReader.ReaderCommit(" + segmentsFileName + " files=" + files + ")";
}
@Override
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/88dd1aad/lucene/core/src/java/org/apache/lucene/index/TrackingIndexWriter.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/TrackingIndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/TrackingIndexWriter.java
index 67da49d..b50f53c 100644
--- a/lucene/core/src/java/org/apache/lucene/index/TrackingIndexWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/TrackingIndexWriter.java
@@ -37,6 +37,7 @@ import org.apache.lucene.store.Directory;
*
* @lucene.experimental */
+// nocommit removeme
public class TrackingIndexWriter {
private final IndexWriter writer;
private final AtomicLong indexingGen = new AtomicLong(1);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/88dd1aad/lucene/core/src/java/org/apache/lucene/index/TwoPhaseCommit.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/TwoPhaseCommit.java b/lucene/core/src/java/org/apache/lucene/index/TwoPhaseCommit.java
index b574dbd..ffa284e 100644
--- a/lucene/core/src/java/org/apache/lucene/index/TwoPhaseCommit.java
+++ b/lucene/core/src/java/org/apache/lucene/index/TwoPhaseCommit.java
@@ -34,7 +34,7 @@ public interface TwoPhaseCommit {
* 2-phase commit fails, {@link #rollback()} is called to discard all changes
* since last successful commit.
*/
- public void prepareCommit() throws IOException;
+ public long prepareCommit() throws IOException;
/**
* The second phase of a 2-phase commit. Implementations should ideally do
@@ -42,7 +42,7 @@ public interface TwoPhaseCommit {
* after it returns, the caller can assume that the changes were successfully
* committed to the underlying storage.
*/
- public void commit() throws IOException;
+ public long commit() throws IOException;
/**
* Discards any changes that have occurred since the last commit. In a 2-phase
@@ -50,6 +50,7 @@ public interface TwoPhaseCommit {
* {@link #prepareCommit()}, this method is used to roll all other objects
* back to their previous state.
*/
+ // nocommit return long?
public void rollback() throws IOException;
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/88dd1aad/lucene/core/src/test/org/apache/lucene/index/TestIndexingSequenceNumbers.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexingSequenceNumbers.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexingSequenceNumbers.java
new file mode 100644
index 0000000..198ff52
--- /dev/null
+++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexingSequenceNumbers.java
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.lucene.index;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.StoredField;
+import org.apache.lucene.document.StringField;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.TermQuery;
+import org.apache.lucene.search.TopDocs;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.util.LuceneTestCase;
+import org.apache.lucene.util.TestUtil;
+
+public class TestIndexingSequenceNumbers extends LuceneTestCase {
+
+ public void testBasic() throws Exception {
+ Directory dir = newDirectory();
+ IndexWriter w = new IndexWriter(dir, newIndexWriterConfig());
+ long a = w.addDocument(new Document());
+ long b = w.addDocument(new Document());
+ assertTrue(b > a);
+ w.close();
+ dir.close();
+ }
+
+ public void testAfterRefresh() throws Exception {
+ Directory dir = newDirectory();
+ IndexWriter w = new IndexWriter(dir, newIndexWriterConfig());
+ long a = w.addDocument(new Document());
+ DirectoryReader.open(w).close();
+ long b = w.addDocument(new Document());
+ assertTrue(b > a);
+ w.close();
+ dir.close();
+ }
+
+ public void testAfterCommit() throws Exception {
+ Directory dir = newDirectory();
+ IndexWriter w = new IndexWriter(dir, newIndexWriterConfig());
+ long a = w.addDocument(new Document());
+ w.commit();
+ long b = w.addDocument(new Document());
+ assertTrue(b > a);
+ w.close();
+ dir.close();
+ }
+
+ public void testStressUpdateSameID() throws Exception {
+ int iters = atLeast(100);
+ for(int iter=0;iter<iters;iter++) {
+ Directory dir = newDirectory();
+ // nocommit use RandomIndexWriter
+ final IndexWriter w = new IndexWriter(dir, newIndexWriterConfig());
+ Thread[] threads = new Thread[TestUtil.nextInt(random(), 2, 5)];
+ final CountDownLatch startingGun = new CountDownLatch(1);
+ final long[] seqNos = new long[threads.length];
+ final Term id = new Term("id", "id");
+ // multiple threads update the same document
+ for(int i=0;i<threads.length;i++) {
+ final int threadID = i;
+ threads[i] = new Thread() {
+ @Override
+ public void run() {
+ try {
+ Document doc = new Document();
+ doc.add(new StoredField("thread", threadID));
+ doc.add(new StringField("id", "id", Field.Store.NO));
+ startingGun.await();
+ for(int j=0;j<100;j++) {
+ seqNos[threadID] = w.updateDocument(id, doc);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ threads[i].start();
+ }
+ startingGun.countDown();
+ for(Thread thread : threads) {
+ thread.join();
+ }
+
+ // now confirm that the reported sequence numbers agree with the index:
+ int maxThread = 0;
+ Set<Long> allSeqNos = new HashSet<>();
+ for(int i=0;i<threads.length;i++) {
+ allSeqNos.add(seqNos[i]);
+ if (seqNos[i] > seqNos[maxThread]) {
+ maxThread = i;
+ }
+ }
+ // make sure all sequence numbers were different
+ assertEquals(threads.length, allSeqNos.size());
+ DirectoryReader r = DirectoryReader.open(w);
+ IndexSearcher s = newSearcher(r);
+ TopDocs hits = s.search(new TermQuery(id), 1);
+ assertEquals(1, hits.totalHits);
+ Document doc = r.document(hits.scoreDocs[0].doc);
+ assertEquals(maxThread, doc.getField("thread").numericValue().intValue());
+ r.close();
+ w.close();
+ dir.close();
+ }
+ }
+
+ static class Operation {
+ // 0 = update, 1 = delete, 2 = commit
+ byte what;
+ int id;
+ int threadID;
+ long seqNo;
+ }
+
+ public void testStressConcurrentCommit() throws Exception {
+ final int opCount = atLeast(10000);
+ final int idCount = TestUtil.nextInt(random(), 10, 1000);
+
+ Directory dir = newDirectory();
+ // nocommit use RandomIndexWriter
+ IndexWriterConfig iwc = newIndexWriterConfig();
+ iwc.setIndexDeletionPolicy(NoDeletionPolicy.INSTANCE);
+ final IndexWriter w = new IndexWriter(dir, iwc);
+ final int numThreads = TestUtil.nextInt(random(), 2, 5);
+ Thread[] threads = new Thread[numThreads];
+ //System.out.println("TEST: iter=" + iter + " opCount=" + opCount + " idCount=" + idCount + " threadCount=" + threads.length);
+ final CountDownLatch startingGun = new CountDownLatch(1);
+ List<List<Operation>> threadOps = new ArrayList<>();
+
+ Object commitLock = new Object();
+ final List<Operation> commits = new ArrayList<>();
+ final AtomicInteger opsSinceCommit = new AtomicInteger();
+
+ // multiple threads update the same set of documents, and we randomly commit
+ for(int i=0;i<threads.length;i++) {
+ final List<Operation> ops = new ArrayList<>();
+ threadOps.add(ops);
+ final int threadID = i;
+ threads[i] = new Thread() {
+ @Override
+ public void run() {
+ try {
+ startingGun.await();
+ for(int i=0;i<opCount;i++) {
+ Operation op = new Operation();
+ op.threadID = threadID;
+ if (random().nextInt(500) == 17) {
+ op.what = 2;
+ synchronized(commitLock) {
+ // nocommit why does this sometimes fail :)
+ //if (w.hasUncommittedChanges()) {
+ if (opsSinceCommit.get() > numThreads) {
+ op.seqNo = w.commit();
+ commits.add(op);
+ opsSinceCommit.set(0);
+ }
+ //System.out.println("done commit seqNo=" + op.seqNo);
+ }
+ } else {
+ op.id = random().nextInt(idCount);
+ Term idTerm = new Term("id", "" + op.id);
+ if (random().nextInt(10) == 1) {
+ op.what = 1;
+ op.seqNo = w.deleteDocuments(idTerm);
+ } else {
+ Document doc = new Document();
+ doc.add(new StoredField("thread", threadID));
+ doc.add(new StringField("id", "" + op.id, Field.Store.NO));
+ op.seqNo = w.updateDocument(idTerm, doc);
+ op.what = 2;
+ }
+ ops.add(op);
+ opsSinceCommit.getAndIncrement();
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ threads[i].start();
+ }
+ startingGun.countDown();
+ for(Thread thread : threads) {
+ thread.join();
+ }
+
+ Operation commitOp = new Operation();
+ synchronized(commitLock) {
+ commitOp.seqNo = w.commit();
+ commits.add(commitOp);
+ }
+
+ List<IndexCommit> indexCommits = DirectoryReader.listCommits(dir);
+ assertEquals(commits.size(), indexCommits.size());
+
+ int[] expectedThreadIDs = new int[idCount];
+ long[] seqNos = new long[idCount];
+
+ //System.out.println("TEST: " + commits.size() + " commits");
+ for(int i=0;i<commits.size();i++) {
+ // this commit point should reflect all operations <= this seqNo
+ long commitSeqNo = commits.get(i).seqNo;
+ //System.out.println(" commit " + i + ": seqNo=" + commitSeqNo + " segs=" + indexCommits.get(i));
+
+ Arrays.fill(expectedThreadIDs, -1);
+ Arrays.fill(seqNos, 0);
+
+ for(int threadID=0;threadID<threadOps.size();threadID++) {
+ long lastSeqNo = 0;
+ for(Operation op : threadOps.get(threadID)) {
+ if (op.seqNo <= commitSeqNo && op.seqNo > seqNos[op.id]) {
+ seqNos[op.id] = op.seqNo;
+ if (op.what == 2) {
+ expectedThreadIDs[op.id] = threadID;
+ } else {
+ expectedThreadIDs[op.id] = -1;
+ }
+ }
+
+ assertTrue(op.seqNo >= lastSeqNo);
+ lastSeqNo = op.seqNo;
+ }
+ }
+
+ DirectoryReader r = DirectoryReader.open(indexCommits.get(i));
+ IndexSearcher s = new IndexSearcher(r);
+
+ for(int id=0;id<idCount;id++) {
+ //System.out.println("TEST: check id=" + id + " expectedThreadID=" + expectedThreadIDs[id]);
+ TopDocs hits = s.search(new TermQuery(new Term("id", ""+id)), 1);
+
+ if (expectedThreadIDs[id] != -1) {
+ assertEquals(1, hits.totalHits);
+ Document doc = r.document(hits.scoreDocs[0].doc);
+ int actualThreadID = doc.getField("thread").numericValue().intValue();
+ if (expectedThreadIDs[id] != actualThreadID) {
+ System.out.println("FAIL: id=" + id + " expectedThreadID=" + expectedThreadIDs[id] + " vs actualThreadID=" + actualThreadID);
+ for(int threadID=0;threadID<threadOps.size();threadID++) {
+ for(Operation op : threadOps.get(threadID)) {
+ if (id == op.id) {
+ System.out.println(" threadID=" + threadID + " seqNo=" + op.seqNo + " " + (op.what == 2 ? "updated" : "deleted"));
+ }
+ }
+ }
+ assertEquals("id=" + id, expectedThreadIDs[id], actualThreadID);
+ }
+ } else if (hits.totalHits != 0) {
+ System.out.println("FAIL: id=" + id + " expectedThreadID=" + expectedThreadIDs[id] + " vs totalHits=" + hits.totalHits);
+ for(int threadID=0;threadID<threadOps.size();threadID++) {
+ for(Operation op : threadOps.get(threadID)) {
+ if (id == op.id) {
+ System.out.println(" threadID=" + threadID + " seqNo=" + op.seqNo + " " + (op.what == 2 ? "updated" : "del"));
+ }
+ }
+ }
+ assertEquals(0, hits.totalHits);
+ }
+ }
+ w.close();
+ r.close();
+ }
+
+ dir.close();
+ }
+
+ // nocommit test that does n ops across threads, then does it again with a single index / single thread, and assert indices are the same
+}
[07/15] lucene-solr:branch_6x: sequence numbers: remove dead variable,
improve comments
Posted by mi...@apache.org.
sequence numbers: remove dead variable, improve comments
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/7a03c649
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/7a03c649
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/7a03c649
Branch: refs/heads/branch_6x
Commit: 7a03c649695b0f8f79621dbbc538246f72367c97
Parents: 818ed49
Author: Mike McCandless <mi...@apache.org>
Authored: Thu May 26 15:41:39 2016 -0400
Committer: Mike McCandless <mi...@apache.org>
Committed: Sat Jun 11 14:06:38 2016 -0400
----------------------------------------------------------------------
.../org/apache/lucene/index/DocumentsWriterDeleteQueue.java | 4 ++--
.../org/apache/lucene/index/DocumentsWriterFlushControl.java | 5 ++---
2 files changed, 4 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7a03c649/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java
index f14c783..80d2c85 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java
@@ -77,17 +77,17 @@ final class DocumentsWriterDeleteQueue implements Accountable {
private final DeleteSlice globalSlice;
private final BufferedUpdates globalBufferedUpdates;
- private long gen;
// only acquired to update the global deletes, pkg-private for access by tests:
final ReentrantLock globalBufferLock = new ReentrantLock();
final long generation;
+ /** Generates the sequence number that IW returns to callers changing the index, showing the effective serialization of all operations. */
final AtomicLong seqNo;
DocumentsWriterDeleteQueue() {
- // seqNo must start at 1 because some APIs negate this to encode a boolean
+ // seqNo must start at 1 because some APIs negate this to also return a boolean
this(0, 1);
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7a03c649/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
index bd8015d..ffcb7dc 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
@@ -481,11 +481,10 @@ final class DocumentsWriterFlushControl implements Accountable {
// we do another full flush
//System.out.println("DWFC: fullFLush old seqNo=" + documentsWriter.deleteQueue.seqNo.get() + " activeThreadCount=" + perThreadPool.getActiveThreadStateCount());
- // jump over any possible in flight ops:
- seqNo = documentsWriter.deleteQueue.seqNo.get() + perThreadPool.getActiveThreadStateCount();
-
// Insert a gap in seqNo of current active thread count, in the worst case those threads now have one operation in flight. It's fine
// if we have some sequence numbers that were never assigned:
+ seqNo = documentsWriter.deleteQueue.seqNo.get() + perThreadPool.getActiveThreadStateCount();
+
DocumentsWriterDeleteQueue newQueue = new DocumentsWriterDeleteQueue(flushingQueue.generation+1, seqNo+1);
documentsWriter.deleteQueue = newQueue;
[02/15] lucene-solr:branch_6x: cutover all IW APIs that change the
index to return seq no
Posted by mi...@apache.org.
cutover all IW APIs that change the index to return seq no
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/7ee1f422
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/7ee1f422
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/7ee1f422
Branch: refs/heads/branch_6x
Commit: 7ee1f4224581a9891c9e389edd9abe94c4e730e0
Parents: 88dd1aa
Author: Mike McCandless <mi...@apache.org>
Authored: Tue May 24 19:45:40 2016 -0400
Committer: Mike McCandless <mi...@apache.org>
Committed: Sat Jun 11 14:06:06 2016 -0400
----------------------------------------------------------------------
.../apache/lucene/index/DocumentsWriter.java | 37 +++++++----
.../index/DocumentsWriterFlushControl.java | 2 +
.../lucene/index/DocumentsWriterPerThread.java | 13 ++--
.../org/apache/lucene/index/IndexWriter.java | 70 ++++++++++++++------
.../lucene/index/TrackingIndexWriter.java | 18 +----
.../lucene/index/TestIndexWriterDelete.java | 4 +-
.../index/TestIndexingSequenceNumbers.java | 30 ++++++---
.../apache/lucene/index/TestRollingUpdates.java | 2 +-
.../lucene/index/TestTwoPhaseCommitTool.java | 16 +++--
.../TestControlledRealTimeReopenThread.java | 6 +-
10 files changed, 119 insertions(+), 79 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7ee1f422/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
index bbbef71..6b698db 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
@@ -136,13 +136,15 @@ final class DocumentsWriter implements Closeable, Accountable {
flushControl = new DocumentsWriterFlushControl(this, config, writer.bufferedUpdatesStream);
}
- synchronized boolean deleteQueries(final Query... queries) throws IOException {
+ synchronized long deleteQueries(final Query... queries) throws IOException {
// TODO why is this synchronized?
final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue;
- deleteQueue.addDelete(queries);
+ long seqNo = deleteQueue.addDelete(queries);
flushControl.doOnDelete();
- // nocommit long
- return applyAllDeletes(deleteQueue);
+ if (applyAllDeletes(deleteQueue)) {
+ seqNo = -seqNo;
+ }
+ return seqNo;
}
// TODO: we could check w/ FreqProxTermsWriter: if the
@@ -251,6 +253,10 @@ final class DocumentsWriter implements Closeable, Accountable {
abortedDocCount += abortThreadState(perThread);
}
deleteQueue.clear();
+
+ // jump over any possible in flight ops:
+ deleteQueue.seqNo.addAndGet(perThreadPool.getActiveThreadStateCount()+1);
+
flushControl.abortPendingFlushes();
flushControl.waitForFlush();
success = true;
@@ -397,13 +403,14 @@ final class DocumentsWriter implements Closeable, Accountable {
}
}
- boolean updateDocuments(final Iterable<? extends Iterable<? extends IndexableField>> docs, final Analyzer analyzer,
- final Term delTerm) throws IOException, AbortingException {
+ long updateDocuments(final Iterable<? extends Iterable<? extends IndexableField>> docs, final Analyzer analyzer,
+ final Term delTerm) throws IOException, AbortingException {
boolean hasEvents = preUpdate();
final ThreadState perThread = flushControl.obtainAndLock();
final DocumentsWriterPerThread flushingDWPT;
-
+ final long seqNo;
+
try {
// This must happen after we've pulled the ThreadState because IW.close
// waits for all ThreadStates to be released:
@@ -413,7 +420,7 @@ final class DocumentsWriter implements Closeable, Accountable {
final DocumentsWriterPerThread dwpt = perThread.dwpt;
final int dwptNumDocs = dwpt.getNumDocsInRAM();
try {
- dwpt.updateDocuments(docs, analyzer, delTerm);
+ seqNo = dwpt.updateDocuments(docs, analyzer, delTerm);
} catch (AbortingException ae) {
flushControl.doOnAbort(perThread);
dwpt.abort();
@@ -430,7 +437,11 @@ final class DocumentsWriter implements Closeable, Accountable {
perThreadPool.release(perThread);
}
- return postUpdate(flushingDWPT, hasEvents);
+ if (postUpdate(flushingDWPT, hasEvents)) {
+ return -seqNo;
+ } else {
+ return seqNo;
+ }
}
long updateDocument(final Iterable<? extends IndexableField> doc, final Analyzer analyzer,
@@ -441,7 +452,7 @@ final class DocumentsWriter implements Closeable, Accountable {
final ThreadState perThread = flushControl.obtainAndLock();
final DocumentsWriterPerThread flushingDWPT;
- final long seqno;
+ final long seqNo;
try {
// This must happen after we've pulled the ThreadState because IW.close
// waits for all ThreadStates to be released:
@@ -451,7 +462,7 @@ final class DocumentsWriter implements Closeable, Accountable {
final DocumentsWriterPerThread dwpt = perThread.dwpt;
final int dwptNumDocs = dwpt.getNumDocsInRAM();
try {
- seqno = dwpt.updateDocument(doc, analyzer, delTerm);
+ seqNo = dwpt.updateDocument(doc, analyzer, delTerm);
} catch (AbortingException ae) {
flushControl.doOnAbort(perThread);
dwpt.abort();
@@ -469,9 +480,9 @@ final class DocumentsWriter implements Closeable, Accountable {
}
if (postUpdate(flushingDWPT, hasEvents)) {
- return -seqno;
+ return -seqNo;
} else {
- return seqno;
+ return seqNo;
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7ee1f422/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
index ec390d9..f388f46 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
@@ -480,6 +480,8 @@ final class DocumentsWriterFlushControl implements Accountable {
// Set a new delete queue - all subsequent DWPT will use this queue until
// we do another full flush
//System.out.println("DWFC: fullFLush old seqNo=" + documentsWriter.deleteQueue.seqNo.get() + " activeThreadCount=" + perThreadPool.getActiveThreadStateCount());
+
+ // jump over any possible in flight ops:
seqNo = documentsWriter.deleteQueue.seqNo.get() + perThreadPool.getActiveThreadStateCount();
// nocommit is this (active thread state count) always enough of a gap? what if new indexing thread sneaks in just now? it would
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7ee1f422/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
index ab00662..5b1afa0 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
@@ -244,7 +244,7 @@ class DocumentsWriterPerThread {
return finishDocument(delTerm);
}
- public int updateDocuments(Iterable<? extends Iterable<? extends IndexableField>> docs, Analyzer analyzer, Term delTerm) throws IOException, AbortingException {
+ public long updateDocuments(Iterable<? extends Iterable<? extends IndexableField>> docs, Analyzer analyzer, Term delTerm) throws IOException, AbortingException {
testPoint("DocumentsWriterPerThread addDocuments start");
assert deleteQueue != null;
docState.analyzer = analyzer;
@@ -285,13 +285,17 @@ class DocumentsWriterPerThread {
// Apply delTerm only after all indexing has
// succeeded, but apply it only to docs prior to when
// this batch started:
+ long seqNo;
if (delTerm != null) {
- deleteQueue.add(delTerm, deleteSlice);
+ seqNo = deleteQueue.add(delTerm, deleteSlice);
assert deleteSlice.isTailItem(delTerm) : "expected the delete term as the tail item";
deleteSlice.apply(pendingUpdates, numDocsInRAM-docCount);
+ return seqNo;
+ } else {
+ seqNo = deleteQueue.seqNo.get();
}
- // nocommit return seqNo here
+ return seqNo;
} finally {
if (!allDocsIndexed && !aborted) {
@@ -306,8 +310,6 @@ class DocumentsWriterPerThread {
}
docState.clear();
}
-
- return docCount;
}
private long finishDocument(Term delTerm) {
@@ -326,7 +328,6 @@ class DocumentsWriterPerThread {
assert deleteSlice.isTailItem(delTerm) : "expected the delete term as the tail item";
} else {
applySlice &= deleteQueue.updateSlice(deleteSlice);
- // nocommit we don't need to increment here?
seqNo = deleteQueue.seqNo.get();
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7ee1f422/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
index 1682135..8609365 100644
--- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
@@ -1332,8 +1332,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
*
* @lucene.experimental
*/
- public void addDocuments(Iterable<? extends Iterable<? extends IndexableField>> docs) throws IOException {
- updateDocuments(null, docs);
+ public long addDocuments(Iterable<? extends Iterable<? extends IndexableField>> docs) throws IOException {
+ return updateDocuments(null, docs);
}
/**
@@ -1349,15 +1349,18 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
*
* @lucene.experimental
*/
- public void updateDocuments(Term delTerm, Iterable<? extends Iterable<? extends IndexableField>> docs) throws IOException {
+ public long updateDocuments(Term delTerm, Iterable<? extends Iterable<? extends IndexableField>> docs) throws IOException {
ensureOpen();
try {
boolean success = false;
try {
- if (docWriter.updateDocuments(docs, analyzer, delTerm)) {
+ long seqNo = docWriter.updateDocuments(docs, analyzer, delTerm);
+ if (seqNo < 0) {
+ seqNo = -seqNo;
processEvents(true, false);
}
success = true;
+ return seqNo;
} finally {
if (!success) {
if (infoStream.isEnabled("IW")) {
@@ -1367,6 +1370,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
}
} catch (AbortingException | VirtualMachineError tragedy) {
tragicEvent(tragedy, "updateDocuments");
+
+ // dead code but javac disagrees
+ return -1;
}
}
@@ -1375,15 +1381,15 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* DirectoryReader#open(IndexWriter)}). If the
* provided reader is an NRT reader obtained from this
* writer, and its segment has not been merged away, then
- * the delete succeeds and this method returns true; else, it
- * returns false the caller must then separately delete by
- * Term or Query.
+ * the delete succeeds and this method returns a valid (> 0) sequence
+ * number; else, it returns -1 and the caller must then
+ * separately delete by Term or Query.
*
* <b>NOTE</b>: this method can only delete documents
* visible to the currently open NRT reader. If you need
* to delete documents indexed after opening the NRT
* reader you must use {@link #deleteDocuments(Term...)}). */
- public synchronized boolean tryDeleteDocument(IndexReader readerIn, int docID) throws IOException {
+ public synchronized long tryDeleteDocument(IndexReader readerIn, int docID) throws IOException {
final LeafReader reader;
if (readerIn instanceof LeafReader) {
@@ -1434,7 +1440,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
changed();
}
//System.out.println(" yes " + info.info.name + " " + docID);
- return true;
+
+ return docWriter.deleteQueue.seqNo.getAndIncrement();
}
} else {
//System.out.println(" no rld " + info.info.name + " " + docID);
@@ -1442,7 +1449,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
} else {
//System.out.println(" no seg " + info.info.name + " " + docID);
}
- return false;
+
+ return -1;
}
/**
@@ -1481,23 +1489,29 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* @throws CorruptIndexException if the index is corrupt
* @throws IOException if there is a low-level IO error
*/
- public void deleteDocuments(Query... queries) throws IOException {
+ public long deleteDocuments(Query... queries) throws IOException {
ensureOpen();
// LUCENE-6379: Specialize MatchAllDocsQuery
for(Query query : queries) {
if (query.getClass() == MatchAllDocsQuery.class) {
- deleteAll();
- return;
+ return deleteAll();
}
}
try {
- if (docWriter.deleteQueries(queries)) {
+ long seqNo = docWriter.deleteQueries(queries);
+ if (seqNo < 0) {
+ seqNo = -seqNo;
processEvents(true, false);
}
+
+ return seqNo;
} catch (VirtualMachineError tragedy) {
tragicEvent(tragedy, "deleteDocuments(Query..)");
+
+ // dead code but javac disagrees:
+ return -1;
}
}
@@ -2225,7 +2239,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* or {@link #forceMergeDeletes} methods, they may receive
* {@link MergePolicy.MergeAbortedException}s.
*/
- public void deleteAll() throws IOException {
+ public long deleteAll() throws IOException {
ensureOpen();
// Remove any buffered docs
boolean success = false;
@@ -2272,6 +2286,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
globalFieldNumberMap.clear();
success = true;
+ return docWriter.deleteQueue.seqNo.get();
+
} finally {
docWriter.unlockAllAfterAbortAll(this);
if (!success) {
@@ -2284,6 +2300,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
}
} catch (VirtualMachineError tragedy) {
tragicEvent(tragedy, "deleteAll");
+
+ // dead code but javac disagrees
+ return -1;
}
}
@@ -2511,7 +2530,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* the index to exceed {@link #MAX_DOCS}, or if the indoming
* index sort does not match this index's index sort
*/
- public void addIndexes(Directory... dirs) throws IOException {
+ public long addIndexes(Directory... dirs) throws IOException {
ensureOpen();
noDupDirs(dirs);
@@ -2618,6 +2637,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
}
}
maybeMerge();
+
+ // no need to increment:
+ return docWriter.deleteQueue.seqNo.get();
}
/**
@@ -2649,7 +2671,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* @throws IllegalArgumentException
* if addIndexes would cause the index to exceed {@link #MAX_DOCS}
*/
- public void addIndexes(CodecReader... readers) throws IOException {
+ public long addIndexes(CodecReader... readers) throws IOException {
ensureOpen();
// long so we can detect int overflow:
@@ -2691,7 +2713,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
rateLimiters.set(new MergeRateLimiter(null));
if (!merger.shouldMerge()) {
- return;
+ // no need to increment:
+ return docWriter.deleteQueue.seqNo.get();
}
merger.merge(); // merge 'em
@@ -2709,7 +2732,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
if (stopMerges) {
// Safe: these files must exist
deleteNewFiles(infoPerCommit.files());
- return;
+
+ // no need to increment:
+ return docWriter.deleteQueue.seqNo.get();
}
ensureOpen();
useCompoundFile = mergePolicy.useCompoundFile(segmentInfos, infoPerCommit, this);
@@ -2744,7 +2769,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
if (stopMerges) {
// Safe: these files must exist
deleteNewFiles(infoPerCommit.files());
- return;
+
+ // no need to increment:
+ return docWriter.deleteQueue.seqNo.get();
}
ensureOpen();
@@ -2758,6 +2785,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
tragicEvent(tragedy, "addIndexes(CodecReader...)");
}
maybeMerge();
+
+ // no need to increment:
+ return docWriter.deleteQueue.seqNo.get();
}
/** Copies the segment files as-is into the IndexWriter's directory. */
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7ee1f422/lucene/core/src/java/org/apache/lucene/index/TrackingIndexWriter.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/TrackingIndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/TrackingIndexWriter.java
index b50f53c..33c193b 100644
--- a/lucene/core/src/java/org/apache/lucene/index/TrackingIndexWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/TrackingIndexWriter.java
@@ -68,28 +68,12 @@ public class TrackingIndexWriter {
/** Calls {@link IndexWriter#deleteDocuments(Term...)} and
* returns the generation that reflects this change. */
- public long deleteDocuments(Term t) throws IOException {
- writer.deleteDocuments(t);
- // Return gen as of when indexing finished:
- return indexingGen.get();
- }
-
- /** Calls {@link IndexWriter#deleteDocuments(Term...)} and
- * returns the generation that reflects this change. */
public long deleteDocuments(Term... terms) throws IOException {
writer.deleteDocuments(terms);
// Return gen as of when indexing finished:
return indexingGen.get();
}
- /** Calls {@link IndexWriter#deleteDocuments(Query...)} and
- * returns the generation that reflects this change. */
- public long deleteDocuments(Query q) throws IOException {
- writer.deleteDocuments(q);
- // Return gen as of when indexing finished:
- return indexingGen.get();
- }
-
/** Calls {@link IndexWriter#deleteDocuments(Query...)}
* and returns the generation that reflects this change. */
public long deleteDocuments(Query... queries) throws IOException {
@@ -159,7 +143,7 @@ public class TrackingIndexWriter {
* IndexWriter#tryDeleteDocument(IndexReader,int)} and
* returns the generation that reflects this change. */
public long tryDeleteDocument(IndexReader reader, int docID) throws IOException {
- if (writer.tryDeleteDocument(reader, docID)) {
+ if (writer.tryDeleteDocument(reader, docID) != -1) {
return indexingGen.get();
} else {
return -1;
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7ee1f422/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterDelete.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterDelete.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterDelete.java
index 14dac59..ccfe5c6 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterDelete.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterDelete.java
@@ -1238,8 +1238,8 @@ public class TestIndexWriterDelete extends LuceneTestCase {
iwc.setOpenMode(IndexWriterConfig.OpenMode.APPEND);
w = new IndexWriter(d, iwc);
IndexReader r = DirectoryReader.open(w, false, false);
- assertTrue(w.tryDeleteDocument(r, 1));
- assertTrue(w.tryDeleteDocument(r.leaves().get(0).reader(), 0));
+ assertTrue(w.tryDeleteDocument(r, 1) != -1);
+ assertTrue(w.tryDeleteDocument(r.leaves().get(0).reader(), 0) != -1);
r.close();
w.close();
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7ee1f422/lucene/core/src/test/org/apache/lucene/index/TestIndexingSequenceNumbers.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexingSequenceNumbers.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexingSequenceNumbers.java
index 198ff52..f2e9636 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestIndexingSequenceNumbers.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexingSequenceNumbers.java
@@ -43,7 +43,7 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
IndexWriter w = new IndexWriter(dir, newIndexWriterConfig());
long a = w.addDocument(new Document());
long b = w.addDocument(new Document());
- assertTrue(b > a);
+ assertTrue(b >= a);
w.close();
dir.close();
}
@@ -154,7 +154,6 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
Object commitLock = new Object();
final List<Operation> commits = new ArrayList<>();
- final AtomicInteger opsSinceCommit = new AtomicInteger();
// multiple threads update the same set of documents, and we randomly commit
for(int i=0;i<threads.length;i++) {
@@ -172,12 +171,9 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
if (random().nextInt(500) == 17) {
op.what = 2;
synchronized(commitLock) {
- // nocommit why does this sometimes fail :)
- //if (w.hasUncommittedChanges()) {
- if (opsSinceCommit.get() > numThreads) {
+ if (w.hasUncommittedChanges()) {
op.seqNo = w.commit();
commits.add(op);
- opsSinceCommit.set(0);
}
//System.out.println("done commit seqNo=" + op.seqNo);
}
@@ -186,16 +182,25 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
Term idTerm = new Term("id", "" + op.id);
if (random().nextInt(10) == 1) {
op.what = 1;
- op.seqNo = w.deleteDocuments(idTerm);
+ if (random().nextBoolean()) {
+ op.seqNo = w.deleteDocuments(idTerm);
+ } else {
+ op.seqNo = w.deleteDocuments(new TermQuery(idTerm));
+ }
} else {
Document doc = new Document();
doc.add(new StoredField("thread", threadID));
doc.add(new StringField("id", "" + op.id, Field.Store.NO));
- op.seqNo = w.updateDocument(idTerm, doc);
+ if (random().nextBoolean()) {
+ List<Document> docs = new ArrayList<>();
+ docs.add(doc);
+ op.seqNo = w.updateDocuments(idTerm, docs);
+ } else {
+ op.seqNo = w.updateDocument(idTerm, doc);
+ }
op.what = 2;
}
ops.add(op);
- opsSinceCommit.getAndIncrement();
}
}
} catch (Exception e) {
@@ -210,11 +215,14 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
thread.join();
}
- Operation commitOp = new Operation();
- synchronized(commitLock) {
+ /*
+ // nocommit: why does this make the assertEquals angry...?
+ if (w.hasUncommittedChanges()) {
+ Operation commitOp = new Operation();
commitOp.seqNo = w.commit();
commits.add(commitOp);
}
+ */
List<IndexCommit> indexCommits = DirectoryReader.listCommits(dir);
assertEquals(commits.size(), indexCommits.size());
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7ee1f422/lucene/core/src/test/org/apache/lucene/index/TestRollingUpdates.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestRollingUpdates.java b/lucene/core/src/test/org/apache/lucene/index/TestRollingUpdates.java
index 23be40b..5a2c82f 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestRollingUpdates.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestRollingUpdates.java
@@ -80,7 +80,7 @@ public class TestRollingUpdates extends LuceneTestCase {
if (s != null && updateCount < SIZE) {
TopDocs hits = s.search(new TermQuery(idTerm), 1);
assertEquals(1, hits.totalHits);
- doUpdate = !w.tryDeleteDocument(r, hits.scoreDocs[0].doc);
+ doUpdate = w.tryDeleteDocument(r, hits.scoreDocs[0].doc) == -1;
if (VERBOSE) {
if (doUpdate) {
System.out.println(" tryDeleteDocument failed");
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7ee1f422/lucene/core/src/test/org/apache/lucene/index/TestTwoPhaseCommitTool.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestTwoPhaseCommitTool.java b/lucene/core/src/test/org/apache/lucene/index/TestTwoPhaseCommitTool.java
index be6e4f8..def90f2 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestTwoPhaseCommitTool.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestTwoPhaseCommitTool.java
@@ -40,29 +40,33 @@ public class TestTwoPhaseCommitTool extends LuceneTestCase {
}
@Override
- public void prepareCommit() throws IOException {
- prepareCommit(null);
+ public long prepareCommit() throws IOException {
+ return prepareCommit(null);
}
- public void prepareCommit(Map<String, String> commitData) throws IOException {
+ public long prepareCommit(Map<String, String> commitData) throws IOException {
this.prepareCommitData = commitData;
assertFalse("commit should not have been called before all prepareCommit were", commitCalled);
if (failOnPrepare) {
throw new IOException("failOnPrepare");
}
+ // nocommit hmm
+ return -1;
}
@Override
- public void commit() throws IOException {
- commit(null);
+ public long commit() throws IOException {
+ return commit(null);
}
- public void commit(Map<String, String> commitData) throws IOException {
+ public long commit(Map<String, String> commitData) throws IOException {
this.commitData = commitData;
commitCalled = true;
if (failOnCommit) {
throw new RuntimeException("failOnCommit");
}
+ // nocommit hmm
+ return -1;
}
@Override
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7ee1f422/lucene/core/src/test/org/apache/lucene/search/TestControlledRealTimeReopenThread.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/test/org/apache/lucene/search/TestControlledRealTimeReopenThread.java b/lucene/core/src/test/org/apache/lucene/search/TestControlledRealTimeReopenThread.java
index 8217eb1..2ef954c 100644
--- a/lucene/core/src/test/org/apache/lucene/search/TestControlledRealTimeReopenThread.java
+++ b/lucene/core/src/test/org/apache/lucene/search/TestControlledRealTimeReopenThread.java
@@ -389,14 +389,13 @@ public class TestControlledRealTimeReopenThread extends ThreadedIndexingAndSearc
super(d, conf);
this.latch = latch;
this.signal = signal;
-
}
@Override
- public void updateDocument(Term term,
+ public long updateDocument(Term term,
Iterable<? extends IndexableField> doc)
throws IOException {
- super.updateDocument(term, doc);
+ long result = super.updateDocument(term, doc);
try {
if (waitAfterUpdate) {
signal.countDown();
@@ -405,6 +404,7 @@ public class TestControlledRealTimeReopenThread extends ThreadedIndexingAndSearc
} catch (InterruptedException e) {
throw new ThreadInterruptedException(e);
}
+ return result;
}
}
[10/15] lucene-solr:branch_6x: sequence numbers: add test case for
updating numeric doc values
Posted by mi...@apache.org.
sequence numbers: add test case for updating numeric doc values
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/88c761b4
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/88c761b4
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/88c761b4
Branch: refs/heads/branch_6x
Commit: 88c761b4a009d1e07771566163566a1805aabe61
Parents: 9ae8140
Author: Mike McCandless <mi...@apache.org>
Authored: Wed Jun 1 18:50:14 2016 -0400
Committer: Mike McCandless <mi...@apache.org>
Committed: Sat Jun 11 14:06:58 2016 -0400
----------------------------------------------------------------------
.../index/TestIndexingSequenceNumbers.java | 184 +++++++++++++++++--
1 file changed, 172 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/88c761b4/lucene/core/src/test/org/apache/lucene/index/TestIndexingSequenceNumbers.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexingSequenceNumbers.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexingSequenceNumbers.java
index 52c05d3..23389dd 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestIndexingSequenceNumbers.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexingSequenceNumbers.java
@@ -17,16 +17,9 @@
package org.apache.lucene.index;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicInteger;
-
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
+import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.document.StoredField;
import org.apache.lucene.document.StringField;
import org.apache.lucene.search.IndexSearcher;
@@ -38,6 +31,14 @@ import org.apache.lucene.util.Bits;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.TestUtil;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
public class TestIndexingSequenceNumbers extends LuceneTestCase {
public void testBasic() throws Exception {
@@ -206,7 +207,7 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
} else {
op.seqNo = w.updateDocument(idTerm, doc);
}
- op.what = 2;
+ op.what = 0;
}
ops.add(op);
}
@@ -249,7 +250,7 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
for(Operation op : threadOps.get(threadID)) {
if (op.seqNo <= commitSeqNo && op.seqNo > seqNos[op.id]) {
seqNos[op.id] = op.seqNo;
- if (op.what == 2) {
+ if (op.what == 0) {
expectedThreadIDs[op.id] = threadID;
} else {
expectedThreadIDs[op.id] = -1;
@@ -302,6 +303,167 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
dir.close();
}
+ public void testStressConcurrentDocValuesUpdatesCommit() throws Exception {
+ final int opCount = atLeast(10000);
+ final int idCount = TestUtil.nextInt(random(), 10, 1000);
+
+ Directory dir = newDirectory();
+ IndexWriterConfig iwc = newIndexWriterConfig();
+ iwc.setIndexDeletionPolicy(NoDeletionPolicy.INSTANCE);
+
+ // Cannot use RIW since it randomly commits:
+ final IndexWriter w = new IndexWriter(dir, iwc);
+
+ final int numThreads = TestUtil.nextInt(random(), 2, 10);
+ Thread[] threads = new Thread[numThreads];
+ //System.out.println("TEST: iter=" + iter + " opCount=" + opCount + " idCount=" + idCount + " threadCount=" + threads.length);
+ final CountDownLatch startingGun = new CountDownLatch(1);
+ List<List<Operation>> threadOps = new ArrayList<>();
+
+ Object commitLock = new Object();
+ final List<Operation> commits = new ArrayList<>();
+
+ List<Operation> ops1 = new ArrayList<>();
+ threadOps.add(ops1);
+
+ for(int id=0;id<idCount;id++) {
+ int threadID = 0;
+ Operation op = new Operation();
+ op.threadID = threadID;
+ op.id = id;
+
+ Document doc = new Document();
+ doc.add(new StoredField("thread", threadID));
+ doc.add(new NumericDocValuesField("thread", threadID));
+ doc.add(new StringField("id", "" + id, Field.Store.NO));
+ op.seqNo = w.addDocument(doc);
+ ops1.add(op);
+ }
+
+ // multiple threads update the same set of documents, and we randomly commit
+ for(int i=0;i<threads.length;i++) {
+ final List<Operation> ops;
+ if (i == 0) {
+ ops = threadOps.get(0);
+ } else {
+ ops = new ArrayList<>();
+ threadOps.add(ops);
+ }
+
+ final int threadID = i;
+ threads[i] = new Thread() {
+ @Override
+ public void run() {
+ try {
+ startingGun.await();
+ for(int i=0;i<opCount;i++) {
+ Operation op = new Operation();
+ op.threadID = threadID;
+ if (random().nextInt(500) == 17) {
+ op.what = 2;
+ synchronized(commitLock) {
+ op.seqNo = w.commit();
+ if (op.seqNo != -1) {
+ commits.add(op);
+ }
+ }
+ } else {
+ op.id = random().nextInt(idCount);
+ Term idTerm = new Term("id", "" + op.id);
+ op.seqNo = w.updateNumericDocValue(idTerm, "thread", threadID);
+ op.what = 0;
+ ops.add(op);
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ threads[i].start();
+ }
+ startingGun.countDown();
+ for(Thread thread : threads) {
+ thread.join();
+ }
+
+ Operation commitOp = new Operation();
+ commitOp.seqNo = w.commit();
+ if (commitOp.seqNo != -1) {
+ commits.add(commitOp);
+ }
+
+ List<IndexCommit> indexCommits = DirectoryReader.listCommits(dir);
+ assertEquals(commits.size(), indexCommits.size());
+
+ int[] expectedThreadIDs = new int[idCount];
+ long[] seqNos = new long[idCount];
+
+ //System.out.println("TEST: " + commits.size() + " commits");
+ for(int i=0;i<commits.size();i++) {
+ // this commit point should reflect all operations <= this seqNo
+ long commitSeqNo = commits.get(i).seqNo;
+ //System.out.println(" commit " + i + ": seqNo=" + commitSeqNo + " segs=" + indexCommits.get(i));
+
+ Arrays.fill(expectedThreadIDs, -1);
+ Arrays.fill(seqNos, 0);
+
+ for(int threadID=0;threadID<threadOps.size();threadID++) {
+ long lastSeqNo = 0;
+ for(Operation op : threadOps.get(threadID)) {
+ if (op.seqNo <= commitSeqNo && op.seqNo > seqNos[op.id]) {
+ seqNos[op.id] = op.seqNo;
+ if (op.what == 0) {
+ expectedThreadIDs[op.id] = threadID;
+ }
+ }
+
+ assertTrue(op.seqNo > lastSeqNo);
+ lastSeqNo = op.seqNo;
+ }
+ }
+
+ DirectoryReader r = DirectoryReader.open(indexCommits.get(i));
+ IndexSearcher s = new IndexSearcher(r);
+ NumericDocValues docValues = MultiDocValues.getNumericValues(r, "thread");
+
+ for(int id=0;id<idCount;id++) {
+ //System.out.println("TEST: check id=" + id + " expectedThreadID=" + expectedThreadIDs[id]);
+ TopDocs hits = s.search(new TermQuery(new Term("id", ""+id)), 1);
+
+ if (expectedThreadIDs[id] != -1) {
+ assertEquals(1, hits.totalHits);
+ int actualThreadID = (int) docValues.get(id);
+ if (expectedThreadIDs[id] != actualThreadID) {
+ System.out.println("FAIL: id=" + id + " expectedThreadID=" + expectedThreadIDs[id] + " vs actualThreadID=" + actualThreadID + " commitSeqNo=" + commitSeqNo + " numThreads=" + numThreads);
+ for(int threadID=0;threadID<threadOps.size();threadID++) {
+ for(Operation op : threadOps.get(threadID)) {
+ if (id == op.id) {
+ System.out.println(" threadID=" + threadID + " seqNo=" + op.seqNo + " " + (op.what == 2 ? "updated" : "deleted"));
+ }
+ }
+ }
+ assertEquals("id=" + id, expectedThreadIDs[id], actualThreadID);
+ }
+ } else if (hits.totalHits != 0) {
+ System.out.println("FAIL: id=" + id + " expectedThreadID=" + expectedThreadIDs[id] + " vs totalHits=" + hits.totalHits + " commitSeqNo=" + commitSeqNo + " numThreads=" + numThreads);
+ for(int threadID=0;threadID<threadOps.size();threadID++) {
+ for(Operation op : threadOps.get(threadID)) {
+ if (id == op.id) {
+ System.out.println(" threadID=" + threadID + " seqNo=" + op.seqNo + " " + (op.what == 2 ? "updated" : "del"));
+ }
+ }
+ }
+ assertEquals(0, hits.totalHits);
+ }
+ }
+ w.close();
+ r.close();
+ }
+
+ dir.close();
+ }
+
public void testStressConcurrentAddAndDeleteAndCommit() throws Exception {
final int opCount = atLeast(10000);
final int idCount = TestUtil.nextInt(random(), 10, 1000);
@@ -478,6 +640,4 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
w.close();
dir.close();
}
-
- // nocommit test doc values updates
}
[08/15] lucene-solr:branch_6x: sequence numbers: always increment seq
no (even for addDocument/s); add tests; add javadocs;
make DWDQ's seqNo private
Posted by mi...@apache.org.
sequence numbers: always increment seq no (even for addDocument/s); add tests; add javadocs; make DWDQ's seqNo private
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/5361d679
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/5361d679
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/5361d679
Branch: refs/heads/branch_6x
Commit: 5361d67996a35c34f467e25f339e7c6a655db0d5
Parents: 7a03c64
Author: Mike McCandless <mi...@apache.org>
Authored: Fri May 27 06:11:07 2016 -0400
Committer: Mike McCandless <mi...@apache.org>
Committed: Sat Jun 11 14:06:43 2016 -0400
----------------------------------------------------------------------
.../apache/lucene/index/DocumentsWriter.java | 2 +-
.../index/DocumentsWriterDeleteQueue.java | 22 ++-
.../index/DocumentsWriterFlushControl.java | 4 +-
.../lucene/index/DocumentsWriterPerThread.java | 4 +-
.../org/apache/lucene/index/IndexWriter.java | 90 +++++++++--
.../index/TestIndexingSequenceNumbers.java | 155 ++++++++++++++++++-
6 files changed, 249 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/5361d679/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
index 9f1bdd3..5630fbb 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
@@ -259,7 +259,7 @@ final class DocumentsWriter implements Closeable, Accountable {
deleteQueue.clear();
// jump over any possible in flight ops:
- deleteQueue.seqNo.addAndGet(perThreadPool.getActiveThreadStateCount()+1);
+ deleteQueue.skipSequenceNumbers(perThreadPool.getActiveThreadStateCount()+1);
flushControl.abortPendingFlushes();
flushControl.waitForFlush();
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/5361d679/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java
index 80d2c85..4a11599 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java
@@ -84,7 +84,7 @@ final class DocumentsWriterDeleteQueue implements Accountable {
final long generation;
/** Generates the sequence number that IW returns to callers changing the index, showing the effective serialization of all operations. */
- final AtomicLong seqNo;
+ private final AtomicLong nextSeqNo;
DocumentsWriterDeleteQueue() {
// seqNo must start at 1 because some APIs negate this to also return a boolean
@@ -98,7 +98,7 @@ final class DocumentsWriterDeleteQueue implements Accountable {
DocumentsWriterDeleteQueue(BufferedUpdates globalBufferedUpdates, long generation, long startSeqNo) {
this.globalBufferedUpdates = globalBufferedUpdates;
this.generation = generation;
- this.seqNo = new AtomicLong(startSeqNo);
+ this.nextSeqNo = new AtomicLong(startSeqNo);
/*
* we use a sentinel instance as our initial tail. No slice will ever try to
* apply this tail since the head is always omitted.
@@ -168,10 +168,10 @@ final class DocumentsWriterDeleteQueue implements Accountable {
/*
* now that we are done we need to advance the tail
*/
- long mySeqNo = seqNo.getAndIncrement();
+ long seqNo = getNextSequenceNumber();
boolean result = tailUpdater.compareAndSet(this, currentTail, newNode);
assert result;
- return mySeqNo;
+ return seqNo;
}
}
}
@@ -460,6 +460,16 @@ final class DocumentsWriterDeleteQueue implements Accountable {
public String toString() {
return "DWDQ: [ generation: " + generation + " ]";
}
-
-
+
+ public long getNextSequenceNumber() {
+ return nextSeqNo.getAndIncrement();
+ }
+
+ public long getLastSequenceNumber() {
+ return nextSeqNo.get()-1;
+ }
+
+ public void skipSequenceNumbers(long jump) {
+ nextSeqNo.addAndGet(jump);
+ }
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/5361d679/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
index ffcb7dc..99bf8d8 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
@@ -481,9 +481,9 @@ final class DocumentsWriterFlushControl implements Accountable {
// we do another full flush
//System.out.println("DWFC: fullFLush old seqNo=" + documentsWriter.deleteQueue.seqNo.get() + " activeThreadCount=" + perThreadPool.getActiveThreadStateCount());
- // Insert a gap in seqNo of current active thread count, in the worst case those threads now have one operation in flight. It's fine
+ // Insert a gap in seqNo of current active thread count, in the worst case each of those threads now have one operation in flight. It's fine
// if we have some sequence numbers that were never assigned:
- seqNo = documentsWriter.deleteQueue.seqNo.get() + perThreadPool.getActiveThreadStateCount();
+ seqNo = documentsWriter.deleteQueue.getLastSequenceNumber() + perThreadPool.getActiveThreadStateCount() + 2;
DocumentsWriterDeleteQueue newQueue = new DocumentsWriterDeleteQueue(flushingQueue.generation+1, seqNo+1);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/5361d679/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
index 5b1afa0..cf5694d 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
@@ -292,7 +292,7 @@ class DocumentsWriterPerThread {
deleteSlice.apply(pendingUpdates, numDocsInRAM-docCount);
return seqNo;
} else {
- seqNo = deleteQueue.seqNo.get();
+ seqNo = deleteQueue.getNextSequenceNumber();
}
return seqNo;
@@ -328,7 +328,7 @@ class DocumentsWriterPerThread {
assert deleteSlice.isTailItem(delTerm) : "expected the delete term as the tail item";
} else {
applySlice &= deleteQueue.updateSlice(deleteSlice);
- seqNo = deleteQueue.seqNo.get();
+ seqNo = deleteQueue.getNextSequenceNumber();
}
if (applySlice) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/5361d679/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
index 945399c..2bdfea7 100644
--- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
@@ -95,6 +95,14 @@ import org.apache.lucene.util.Version;
and then adds the entire document). When finished adding, deleting
and updating documents, {@link #close() close} should be called.</p>
+ <a name="sequence_numbers"></a>
+ <p>Each method that changes the index returns a {@code long} sequence number, which
+ expresses the effective order in which each change was applied.
+ {@link #commit} also returns a sequence number, describing which
+ changes are in the commit point and which are not. Sequence numbers
+ are transient (not saved into the index in any way) and only valid
+ within a single {@code IndexWriter} instance.</p>
+
<a name="flush"></a>
<p>These changes are buffered in memory and periodically
flushed to the {@link Directory} (during the above method
@@ -1288,6 +1296,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* replaced with the Unicode replacement character
* U+FFFD.</p>
*
+ * @return The <a href="#sequence_number">sequence number</a>
+ * for this operation
+ *
* @throws CorruptIndexException if the index is corrupt
* @throws IOException if there is a low-level IO error
*/
@@ -1327,6 +1338,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* and will likely break them up. Use such tools at your
* own risk!
*
+ * @return The <a href="#sequence_number">sequence number</a>
+ * for this operation
+ *
* @throws CorruptIndexException if the index is corrupt
* @throws IOException if there is a low-level IO error
*
@@ -1344,6 +1358,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
*
* See {@link #addDocuments(Iterable)}.
*
+ * @return The <a href="#sequence_number">sequence number</a>
+ * for this operation
+ *
* @throws CorruptIndexException if the index is corrupt
* @throws IOException if there is a low-level IO error
*
@@ -1441,7 +1458,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
}
//System.out.println(" yes " + info.info.name + " " + docID);
- return docWriter.deleteQueue.seqNo.getAndIncrement();
+ return docWriter.deleteQueue.getNextSequenceNumber();
}
} else {
//System.out.println(" no rld " + info.info.name + " " + docID);
@@ -1458,6 +1475,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* terms. All given deletes are applied and flushed atomically
* at the same time.
*
+ * @return The <a href="#sequence_number">sequence number</a>
+ * for this operation
+ *
* @param terms array of terms to identify the documents
* to be deleted
* @throws CorruptIndexException if the index is corrupt
@@ -1484,6 +1504,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* Deletes the document(s) matching any of the provided queries.
* All given deletes are applied and flushed atomically at the same time.
*
+ * @return The <a href="#sequence_number">sequence number</a>
+ * for this operation
+ *
* @param queries array of queries to identify the documents
* to be deleted
* @throws CorruptIndexException if the index is corrupt
@@ -1522,6 +1545,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* by a reader on the same index (flush may happen only after
* the add).
*
+ * @return The <a href="#sequence_number">sequence number</a>
+ * for this operation
+ *
* @param term the term to identify the document(s) to be
* deleted
* @param doc the document to be added
@@ -1566,6 +1592,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* field name of the {@link NumericDocValues} field
* @param value
* new value for the field
+ *
+ * @return The <a href="#sequence_number">sequence number</a>
+ * for this operation
+ *
* @throws CorruptIndexException
* if the index is corrupt
* @throws IOException
@@ -1606,6 +1636,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* field name of the {@link BinaryDocValues} field
* @param value
* new value for the field
+ *
+ * @return The <a href="#sequence_number">sequence number</a>
+ * for this operation
+ *
* @throws CorruptIndexException
* if the index is corrupt
* @throws IOException
@@ -1642,6 +1676,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
*
* @param updates
* the updates to apply
+ *
+ * @return The <a href="#sequence_number">sequence number</a>
+ * for this operation
+ *
* @throws CorruptIndexException
* if the index is corrupt
* @throws IOException
@@ -2256,6 +2294,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* threads are running {@link #forceMerge}, {@link #addIndexes(CodecReader[])}
* or {@link #forceMergeDeletes} methods, they may receive
* {@link MergePolicy.MergeAbortedException}s.
+ *
+ * @return The <a href="#sequence_number">sequence number</a>
+ * for this operation
*/
public long deleteAll() throws IOException {
ensureOpen();
@@ -2304,7 +2345,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
globalFieldNumberMap.clear();
success = true;
- return docWriter.deleteQueue.seqNo.get();
+ return docWriter.deleteQueue.getNextSequenceNumber();
} finally {
docWriter.unlockAllAfterAbortAll(this);
@@ -2542,6 +2583,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
*
* <p>This requires this index not be among those to be added.
*
+ * @return The <a href="#sequence_number">sequence number</a>
+ * for this operation
+ *
* @throws CorruptIndexException if the index is corrupt
* @throws IOException if there is a low-level IO error
* @throws IllegalArgumentException if addIndexes would cause
@@ -2559,6 +2603,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
boolean successTop = false;
+ long seqNo;
+
try {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "flush at addIndexes(Directory...)");
@@ -2630,6 +2676,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
// Now reserve the docs, just before we update SIS:
reserveDocs(totalMaxDoc);
+ seqNo = docWriter.deleteQueue.getNextSequenceNumber();
+
success = true;
} finally {
if (!success) {
@@ -2647,6 +2695,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
} catch (VirtualMachineError tragedy) {
tragicEvent(tragedy, "addIndexes(Directory...)");
+ // dead code but javac disagrees:
+ seqNo = -1;
} finally {
if (successTop) {
IOUtils.close(locks);
@@ -2656,8 +2706,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
}
maybeMerge();
- // no need to increment:
- return docWriter.deleteQueue.seqNo.get();
+ return seqNo;
}
/**
@@ -2682,6 +2731,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* {@code maxMergeAtOnce} parameter, you should pass that many readers in one
* call.
*
+ * @return The <a href="#sequence_number">sequence number</a>
+ * for this operation
+ *
* @throws CorruptIndexException
* if the index is corrupt
* @throws IOException
@@ -2697,6 +2749,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
Sort indexSort = config.getIndexSort();
+ long seqNo;
+
try {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "flush at addIndexes(CodecReader...)");
@@ -2731,8 +2785,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
rateLimiters.set(new MergeRateLimiter(null));
if (!merger.shouldMerge()) {
- // no need to increment:
- return docWriter.deleteQueue.seqNo.get();
+ return docWriter.deleteQueue.getNextSequenceNumber();
}
merger.merge(); // merge 'em
@@ -2751,8 +2804,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
// Safe: these files must exist
deleteNewFiles(infoPerCommit.files());
- // no need to increment:
- return docWriter.deleteQueue.seqNo.get();
+ return docWriter.deleteQueue.getNextSequenceNumber();
}
ensureOpen();
useCompoundFile = mergePolicy.useCompoundFile(segmentInfos, infoPerCommit, this);
@@ -2788,8 +2840,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
// Safe: these files must exist
deleteNewFiles(infoPerCommit.files());
- // no need to increment:
- return docWriter.deleteQueue.seqNo.get();
+ return docWriter.deleteQueue.getNextSequenceNumber();
}
ensureOpen();
@@ -2797,15 +2848,17 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
reserveDocs(numDocs);
segmentInfos.add(infoPerCommit);
+ seqNo = docWriter.deleteQueue.getNextSequenceNumber();
checkpoint();
}
} catch (VirtualMachineError tragedy) {
tragicEvent(tragedy, "addIndexes(CodecReader...)");
+ // dead code but javac disagrees:
+ seqNo = -1;
}
maybeMerge();
- // no need to increment:
- return docWriter.deleteQueue.seqNo.get();
+ return seqNo;
}
/** Copies the segment files as-is into the IndexWriter's directory. */
@@ -2873,6 +2926,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* <p>You can also just call {@link #commit()} directly
* without prepareCommit first in which case that method
* will internally call prepareCommit.
+ *
+ * @return The <a href="#sequence_number">sequence number</a>
+ * last operation in the commit. All sequence numbers <= this value
+ * will be reflected in the commit, and all others will not.
*/
@Override
public final long prepareCommit() throws IOException {
@@ -3069,6 +3126,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* point, and all other operations will not. </p>
*
* @see #prepareCommit
+ *
+ * @return The <a href="#sequence_number">sequence number</a>
+ * last operation in the commit. All sequence numbers <= this value
+ * will be reflected in the commit, and all others will not.
*/
@Override
public final long commit() throws IOException {
@@ -4988,11 +5049,12 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
};
}
- /** Returns the last sequence number.
+ /** Returns the last <a href="#sequence_number">sequence number</a>, or 0
+ * if no index-changing operations have completed yet.
*
* @lucene.experimental */
public long getLastSequenceNumber() {
ensureOpen();
- return docWriter.deleteQueue.seqNo.get()-1;
+ return docWriter.deleteQueue.getLastSequenceNumber();
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/5361d679/lucene/core/src/test/org/apache/lucene/index/TestIndexingSequenceNumbers.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexingSequenceNumbers.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexingSequenceNumbers.java
index fb9b9ab..002292c 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestIndexingSequenceNumbers.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexingSequenceNumbers.java
@@ -43,7 +43,7 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
IndexWriter w = new IndexWriter(dir, newIndexWriterConfig());
long a = w.addDocument(new Document());
long b = w.addDocument(new Document());
- assertTrue(b >= a);
+ assertTrue(b > a);
w.close();
dir.close();
}
@@ -129,7 +129,7 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
}
static class Operation {
- // 0 = update, 1 = delete, 2 = commit
+ // 0 = update, 1 = delete, 2 = commit, 3 = add
byte what;
int id;
int threadID;
@@ -248,7 +248,7 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
}
}
- assertTrue(op.seqNo >= lastSeqNo);
+ assertTrue(op.seqNo > lastSeqNo);
lastSeqNo = op.seqNo;
}
}
@@ -293,4 +293,153 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
dir.close();
}
+
+ public void testStressConcurrentAddAndDeleteAndCommit() throws Exception {
+ final int opCount = atLeast(10000);
+ final int idCount = TestUtil.nextInt(random(), 10, 1000);
+
+ Directory dir = newDirectory();
+ IndexWriterConfig iwc = newIndexWriterConfig();
+ iwc.setIndexDeletionPolicy(NoDeletionPolicy.INSTANCE);
+
+ // Cannot use RIW since it randomly commits:
+ final IndexWriter w = new IndexWriter(dir, iwc);
+
+ final int numThreads = TestUtil.nextInt(random(), 2, 5);
+ Thread[] threads = new Thread[numThreads];
+ //System.out.println("TEST: iter=" + iter + " opCount=" + opCount + " idCount=" + idCount + " threadCount=" + threads.length);
+ final CountDownLatch startingGun = new CountDownLatch(1);
+ List<List<Operation>> threadOps = new ArrayList<>();
+
+ Object commitLock = new Object();
+ final List<Operation> commits = new ArrayList<>();
+
+ // multiple threads update the same set of documents, and we randomly commit
+ for(int i=0;i<threads.length;i++) {
+ final List<Operation> ops = new ArrayList<>();
+ threadOps.add(ops);
+ final int threadID = i;
+ threads[i] = new Thread() {
+ @Override
+ public void run() {
+ try {
+ startingGun.await();
+ for(int i=0;i<opCount;i++) {
+ Operation op = new Operation();
+ op.threadID = threadID;
+ if (random().nextInt(500) == 17) {
+ op.what = 2;
+ synchronized(commitLock) {
+ op.seqNo = w.commit();
+ if (op.seqNo != -1) {
+ commits.add(op);
+ }
+ }
+ } else {
+ op.id = random().nextInt(idCount);
+ Term idTerm = new Term("id", "" + op.id);
+ if (random().nextInt(10) == 1) {
+ op.what = 1;
+ if (random().nextBoolean()) {
+ op.seqNo = w.deleteDocuments(idTerm);
+ } else {
+ op.seqNo = w.deleteDocuments(new TermQuery(idTerm));
+ }
+ } else {
+ Document doc = new Document();
+ doc.add(new StoredField("thread", threadID));
+ doc.add(new StringField("id", "" + op.id, Field.Store.NO));
+ if (random().nextBoolean()) {
+ List<Document> docs = new ArrayList<>();
+ docs.add(doc);
+ op.seqNo = w.addDocuments(docs);
+ } else {
+ op.seqNo = w.addDocument(doc);
+ }
+ op.what = 3;
+ }
+ ops.add(op);
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ threads[i].start();
+ }
+ startingGun.countDown();
+ for(Thread thread : threads) {
+ thread.join();
+ }
+
+ Operation commitOp = new Operation();
+ commitOp.seqNo = w.commit();
+ if (commitOp.seqNo != -1) {
+ commits.add(commitOp);
+ }
+
+ List<IndexCommit> indexCommits = DirectoryReader.listCommits(dir);
+ assertEquals(commits.size(), indexCommits.size());
+
+ // how many docs with this id are expected:
+ int[] expectedCounts = new int[idCount];
+ long[] lastDelSeqNos = new long[idCount];
+
+ //System.out.println("TEST: " + commits.size() + " commits");
+ for(int i=0;i<commits.size();i++) {
+ // this commit point should reflect all operations <= this seqNo
+ long commitSeqNo = commits.get(i).seqNo;
+ //System.out.println(" commit " + i + ": seqNo=" + commitSeqNo + " segs=" + indexCommits.get(i));
+
+ // first find the highest seqNo of the last delete op, for each id, prior to this commit:
+ Arrays.fill(lastDelSeqNos, -1);
+ for(int threadID=0;threadID<threadOps.size();threadID++) {
+ long lastSeqNo = 0;
+ for(Operation op : threadOps.get(threadID)) {
+ if (op.what == 1 && op.seqNo <= commitSeqNo && op.seqNo > lastDelSeqNos[op.id]) {
+ lastDelSeqNos[op.id] = op.seqNo;
+ }
+
+ // within one thread the seqNos must only increase:
+ assertTrue(op.seqNo > lastSeqNo);
+ lastSeqNo = op.seqNo;
+ }
+ }
+
+ // then count how many adds happened since the last delete and before this commit:
+ Arrays.fill(expectedCounts, 0);
+ for(int threadID=0;threadID<threadOps.size();threadID++) {
+ for(Operation op : threadOps.get(threadID)) {
+ if (op.what == 3 && op.seqNo <= commitSeqNo && op.seqNo > lastDelSeqNos[op.id]) {
+ expectedCounts[op.id]++;
+ }
+ }
+ }
+
+ DirectoryReader r = DirectoryReader.open(indexCommits.get(i));
+ IndexSearcher s = new IndexSearcher(r);
+
+ for(int id=0;id<idCount;id++) {
+ //System.out.println("TEST: check id=" + id + " expectedThreadID=" + expectedThreadIDs[id]);
+ assertEquals(expectedCounts[id], s.count(new TermQuery(new Term("id", ""+id))));
+ }
+ w.close();
+ r.close();
+ }
+
+ dir.close();
+ }
+
+ public void testDeleteAll() throws Exception {
+ Directory dir = newDirectory();
+ IndexWriter w = new IndexWriter(dir, newIndexWriterConfig());
+ long a = w.addDocument(new Document());
+ long b = w.deleteAll();
+ assertTrue(a < b);
+ long c = w.commit();
+ assertTrue(b < c);
+ w.close();
+ dir.close();
+ }
}
[06/15] lucene-solr:branch_6x: sequence numbers: removed synchronized
in DocumentsWriterDeleteQueue.add
Posted by mi...@apache.org.
sequence numbers: removed synchronized in DocumentsWriterDeleteQueue.add
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/818ed490
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/818ed490
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/818ed490
Branch: refs/heads/branch_6x
Commit: 818ed49098165229ac9ce093977184a1d4175b6f
Parents: 4ee0b49
Author: Mike McCandless <mi...@apache.org>
Authored: Thu May 26 05:39:12 2016 -0400
Committer: Mike McCandless <mi...@apache.org>
Committed: Sat Jun 11 14:06:30 2016 -0400
----------------------------------------------------------------------
.../index/DocumentsWriterDeleteQueue.java | 29 +++++++++++++++++---
1 file changed, 25 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/818ed490/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java
index abb735d..f14c783 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java
@@ -150,10 +150,31 @@ final class DocumentsWriterDeleteQueue implements Accountable {
return seqNo;
}
- synchronized long add(Node<?> newNode) {
- tail.next = newNode;
- tail = newNode;
- return seqNo.getAndIncrement();
+ long add(Node<?> newNode) {
+ /*
+ * this non-blocking / 'wait-free' linked list add was inspired by Apache
+ * Harmony's ConcurrentLinkedQueue Implementation.
+ */
+ while (true) {
+ final Node<?> currentTail = tail;
+ final Node<?> tailNext = currentTail.next;
+ if (tail == currentTail && tailNext == null) {
+ /*
+ * we are in quiescent state and can try to insert the newNode to the
+ * current tail if we fail to insert we just retry the operation since
+ * somebody else has already added its newNode
+ */
+ if (currentTail.casNext(null, newNode)) {
+ /*
+ * now that we are done we need to advance the tail
+ */
+ long mySeqNo = seqNo.getAndIncrement();
+ boolean result = tailUpdater.compareAndSet(this, currentTail, newNode);
+ assert result;
+ return mySeqNo;
+ }
+ }
+ }
}
boolean anyChanges() {
[11/15] lucene-solr:branch_6x: sequence numbers: add CHANGES entry
Posted by mi...@apache.org.
sequence numbers: add CHANGES entry
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/0b9c0dd5
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/0b9c0dd5
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/0b9c0dd5
Branch: refs/heads/branch_6x
Commit: 0b9c0dd52275a5bd9cf4d35d32eecf777e7d744d
Parents: 88c761b
Author: Mike McCandless <mi...@apache.org>
Authored: Wed Jun 1 18:54:55 2016 -0400
Committer: Mike McCandless <mi...@apache.org>
Committed: Sat Jun 11 14:07:05 2016 -0400
----------------------------------------------------------------------
lucene/CHANGES.txt | 4 ++++
1 file changed, 4 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0b9c0dd5/lucene/CHANGES.txt
----------------------------------------------------------------------
diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt
index 425d6e2..0935333 100644
--- a/lucene/CHANGES.txt
+++ b/lucene/CHANGES.txt
@@ -45,6 +45,10 @@ New Features
applicable and supported when copying files from another FSDirectory in
Directory#copyFrom. (Simon Willnauer)
+* LUCENE-73012: IndexWriter methods that change the index now return a
+ long "sequence number" indicating the effective equivalent
+ single-threaded execution order (Mike McCandless)
+
API Changes
* LUCENE-7163: refactor GeoRect, Polygon, and GeoUtils tests to geo
[13/15] lucene-solr:branch_6x: sequence numbers: fix issue number in
CHANGES
Posted by mi...@apache.org.
sequence numbers: fix issue number in CHANGES
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/1216b694
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/1216b694
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/1216b694
Branch: refs/heads/branch_6x
Commit: 1216b69414006245ee1076e3523cf63d72710d87
Parents: 4bdd08a
Author: Mike McCandless <mi...@apache.org>
Authored: Thu Jun 2 05:59:10 2016 -0400
Committer: Mike McCandless <mi...@apache.org>
Committed: Sat Jun 11 14:07:15 2016 -0400
----------------------------------------------------------------------
lucene/CHANGES.txt | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1216b694/lucene/CHANGES.txt
----------------------------------------------------------------------
diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt
index 0935333..350bee1 100644
--- a/lucene/CHANGES.txt
+++ b/lucene/CHANGES.txt
@@ -45,7 +45,7 @@ New Features
applicable and supported when copying files from another FSDirectory in
Directory#copyFrom. (Simon Willnauer)
-* LUCENE-73012: IndexWriter methods that change the index now return a
+* LUCENE-7302: IndexWriter methods that change the index now return a
long "sequence number" indicating the effective equivalent
single-threaded execution order (Mike McCandless)
[04/15] lucene-solr:branch_6x: cutover RandomIndexWriter to return
sequence numbers
Posted by mi...@apache.org.
cutover RandomIndexWriter to return sequence numbers
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/69c28eac
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/69c28eac
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/69c28eac
Branch: refs/heads/branch_6x
Commit: 69c28eac11b267e1eb7e4b3e5e98288b40ca166f
Parents: 67c35ae
Author: Mike McCandless <mi...@apache.org>
Authored: Tue May 24 20:22:47 2016 -0400
Committer: Mike McCandless <mi...@apache.org>
Committed: Sat Jun 11 14:06:17 2016 -0400
----------------------------------------------------------------------
.../apache/lucene/index/DocumentsWriter.java | 10 ++-
.../org/apache/lucene/index/IndexWriter.java | 31 +++++++---
.../org/apache/lucene/index/TwoPhaseCommit.java | 2 -
.../lucene/index/TestTwoPhaseCommitTool.java | 6 +-
.../directory/DirectoryTaxonomyWriter.java | 8 +--
.../apache/lucene/index/RandomIndexWriter.java | 64 +++++++++++---------
6 files changed, 73 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/69c28eac/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
index 6b698db..9f1bdd3 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
@@ -161,11 +161,15 @@ final class DocumentsWriter implements Closeable, Accountable {
return seqNo;
}
- synchronized boolean updateDocValues(DocValuesUpdate... updates) throws IOException {
+ synchronized long updateDocValues(DocValuesUpdate... updates) throws IOException {
final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue;
- deleteQueue.addDocValuesUpdates(updates);
+ long seqNo = deleteQueue.addDocValuesUpdates(updates);
flushControl.doOnDelete();
- return applyAllDeletes(deleteQueue);
+ if (applyAllDeletes(deleteQueue)) {
+ seqNo = -seqNo;
+ }
+
+ return seqNo;
}
DocumentsWriterDeleteQueue currentDeleteSession() {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/69c28eac/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
index 4791a15..9a520b1 100644
--- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
@@ -1571,17 +1571,23 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* @throws IOException
* if there is a low-level IO error
*/
- public void updateNumericDocValue(Term term, String field, long value) throws IOException {
+ public long updateNumericDocValue(Term term, String field, long value) throws IOException {
ensureOpen();
if (!globalFieldNumberMap.contains(field, DocValuesType.NUMERIC)) {
throw new IllegalArgumentException("can only update existing numeric-docvalues fields!");
}
try {
- if (docWriter.updateDocValues(new NumericDocValuesUpdate(term, field, value))) {
+ long seqNo = docWriter.updateDocValues(new NumericDocValuesUpdate(term, field, value));
+ if (seqNo < 0) {
+ seqNo = -seqNo;
processEvents(true, false);
}
+ return seqNo;
} catch (VirtualMachineError tragedy) {
tragicEvent(tragedy, "updateNumericDocValue");
+
+ // dead code but javac disagrees:
+ return -1;
}
}
@@ -1605,7 +1611,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* @throws IOException
* if there is a low-level IO error
*/
- public void updateBinaryDocValue(Term term, String field, BytesRef value) throws IOException {
+ public long updateBinaryDocValue(Term term, String field, BytesRef value) throws IOException {
ensureOpen();
if (value == null) {
throw new IllegalArgumentException("cannot update a field to a null value: " + field);
@@ -1614,11 +1620,17 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
throw new IllegalArgumentException("can only update existing binary-docvalues fields!");
}
try {
- if (docWriter.updateDocValues(new BinaryDocValuesUpdate(term, field, value))) {
+ long seqNo = docWriter.updateDocValues(new BinaryDocValuesUpdate(term, field, value));
+ if (seqNo < 0) {
+ seqNo = -seqNo;
processEvents(true, false);
}
+ return seqNo;
} catch (VirtualMachineError tragedy) {
tragicEvent(tragedy, "updateBinaryDocValue");
+
+ // dead code but javac disagrees:
+ return -1;
}
}
@@ -1635,7 +1647,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* @throws IOException
* if there is a low-level IO error
*/
- public void updateDocValues(Term term, Field... updates) throws IOException {
+ public long updateDocValues(Term term, Field... updates) throws IOException {
ensureOpen();
DocValuesUpdate[] dvUpdates = new DocValuesUpdate[updates.length];
for (int i = 0; i < updates.length; i++) {
@@ -1662,11 +1674,17 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
}
}
try {
- if (docWriter.updateDocValues(dvUpdates)) {
+ long seqNo = docWriter.updateDocValues(dvUpdates);
+ if (seqNo < 0) {
+ seqNo = -seqNo;
processEvents(true, false);
}
+ return seqNo;
} catch (VirtualMachineError tragedy) {
tragicEvent(tragedy, "updateDocValues");
+
+ // dead code but javac disagrees:
+ return -1;
}
}
@@ -3045,7 +3063,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
@Override
public final long commit() throws IOException {
ensureOpen();
- // nocommit should we put seq no into sis?
return commitInternal(config.getMergePolicy());
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/69c28eac/lucene/core/src/java/org/apache/lucene/index/TwoPhaseCommit.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/TwoPhaseCommit.java b/lucene/core/src/java/org/apache/lucene/index/TwoPhaseCommit.java
index ffa284e..c4ba78c 100644
--- a/lucene/core/src/java/org/apache/lucene/index/TwoPhaseCommit.java
+++ b/lucene/core/src/java/org/apache/lucene/index/TwoPhaseCommit.java
@@ -50,7 +50,5 @@ public interface TwoPhaseCommit {
* {@link #prepareCommit()}, this method is used to roll all other objects
* back to their previous state.
*/
- // nocommit return long?
public void rollback() throws IOException;
-
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/69c28eac/lucene/core/src/test/org/apache/lucene/index/TestTwoPhaseCommitTool.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestTwoPhaseCommitTool.java b/lucene/core/src/test/org/apache/lucene/index/TestTwoPhaseCommitTool.java
index def90f2..9ef5a30 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestTwoPhaseCommitTool.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestTwoPhaseCommitTool.java
@@ -50,8 +50,7 @@ public class TestTwoPhaseCommitTool extends LuceneTestCase {
if (failOnPrepare) {
throw new IOException("failOnPrepare");
}
- // nocommit hmm
- return -1;
+ return 1;
}
@Override
@@ -65,8 +64,7 @@ public class TestTwoPhaseCommitTool extends LuceneTestCase {
if (failOnCommit) {
throw new RuntimeException("failOnCommit");
}
- // nocommit hmm
- return -1;
+ return 1;
}
@Override
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/69c28eac/lucene/facet/src/java/org/apache/lucene/facet/taxonomy/directory/DirectoryTaxonomyWriter.java
----------------------------------------------------------------------
diff --git a/lucene/facet/src/java/org/apache/lucene/facet/taxonomy/directory/DirectoryTaxonomyWriter.java b/lucene/facet/src/java/org/apache/lucene/facet/taxonomy/directory/DirectoryTaxonomyWriter.java
index 3899a12..8e0841e 100644
--- a/lucene/facet/src/java/org/apache/lucene/facet/taxonomy/directory/DirectoryTaxonomyWriter.java
+++ b/lucene/facet/src/java/org/apache/lucene/facet/taxonomy/directory/DirectoryTaxonomyWriter.java
@@ -581,14 +581,14 @@ public class DirectoryTaxonomyWriter implements TaxonomyWriter {
}
@Override
- public synchronized void commit() throws IOException {
+ public synchronized long commit() throws IOException {
ensureOpen();
// LUCENE-4972: if we always call setCommitData, we create empty commits
String epochStr = indexWriter.getCommitData().get(INDEX_EPOCH);
if (epochStr == null || Long.parseLong(epochStr, 16) != indexEpoch) {
indexWriter.setCommitData(combinedCommitData(indexWriter.getCommitData()));
}
- indexWriter.commit();
+ return indexWriter.commit();
}
/** Combine original user data with the taxonomy epoch. */
@@ -616,14 +616,14 @@ public class DirectoryTaxonomyWriter implements TaxonomyWriter {
* See {@link IndexWriter#prepareCommit}.
*/
@Override
- public synchronized void prepareCommit() throws IOException {
+ public synchronized long prepareCommit() throws IOException {
ensureOpen();
// LUCENE-4972: if we always call setCommitData, we create empty commits
String epochStr = indexWriter.getCommitData().get(INDEX_EPOCH);
if (epochStr == null || Long.parseLong(epochStr, 16) != indexEpoch) {
indexWriter.setCommitData(combinedCommitData(indexWriter.getCommitData()));
}
- indexWriter.prepareCommit();
+ return indexWriter.prepareCommit();
}
@Override
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/69c28eac/lucene/test-framework/src/java/org/apache/lucene/index/RandomIndexWriter.java
----------------------------------------------------------------------
diff --git a/lucene/test-framework/src/java/org/apache/lucene/index/RandomIndexWriter.java b/lucene/test-framework/src/java/org/apache/lucene/index/RandomIndexWriter.java
index 0f67882..d46c248 100644
--- a/lucene/test-framework/src/java/org/apache/lucene/index/RandomIndexWriter.java
+++ b/lucene/test-framework/src/java/org/apache/lucene/index/RandomIndexWriter.java
@@ -130,14 +130,15 @@ public class RandomIndexWriter implements Closeable {
* Adds a Document.
* @see IndexWriter#addDocument(Iterable)
*/
- public <T extends IndexableField> void addDocument(final Iterable<T> doc) throws IOException {
+ public <T extends IndexableField> long addDocument(final Iterable<T> doc) throws IOException {
LuceneTestCase.maybeChangeLiveIndexWriterConfig(r, w.getConfig());
+ long seqNo;
if (r.nextInt(5) == 3) {
// TODO: maybe, we should simply buffer up added docs
// (but we need to clone them), and only when
// getReader, commit, etc. are called, we do an
// addDocuments? Would be better testing.
- w.addDocuments(new Iterable<Iterable<T>>() {
+ seqNo = w.addDocuments(new Iterable<Iterable<T>>() {
@Override
public Iterator<Iterable<T>> iterator() {
@@ -167,10 +168,12 @@ public class RandomIndexWriter implements Closeable {
}
});
} else {
- w.addDocument(doc);
+ seqNo = w.addDocument(doc);
}
maybeFlushOrCommit();
+
+ return seqNo;
}
private void maybeFlushOrCommit() throws IOException {
@@ -195,26 +198,29 @@ public class RandomIndexWriter implements Closeable {
}
}
- public void addDocuments(Iterable<? extends Iterable<? extends IndexableField>> docs) throws IOException {
+ public long addDocuments(Iterable<? extends Iterable<? extends IndexableField>> docs) throws IOException {
LuceneTestCase.maybeChangeLiveIndexWriterConfig(r, w.getConfig());
- w.addDocuments(docs);
+ long seqNo = w.addDocuments(docs);
maybeFlushOrCommit();
+ return seqNo;
}
- public void updateDocuments(Term delTerm, Iterable<? extends Iterable<? extends IndexableField>> docs) throws IOException {
+ public long updateDocuments(Term delTerm, Iterable<? extends Iterable<? extends IndexableField>> docs) throws IOException {
LuceneTestCase.maybeChangeLiveIndexWriterConfig(r, w.getConfig());
- w.updateDocuments(delTerm, docs);
+ long seqNo = w.updateDocuments(delTerm, docs);
maybeFlushOrCommit();
+ return seqNo;
}
/**
* Updates a document.
* @see IndexWriter#updateDocument(Term, Iterable)
*/
- public <T extends IndexableField> void updateDocument(Term t, final Iterable<T> doc) throws IOException {
+ public <T extends IndexableField> long updateDocument(Term t, final Iterable<T> doc) throws IOException {
LuceneTestCase.maybeChangeLiveIndexWriterConfig(r, w.getConfig());
+ long seqNo;
if (r.nextInt(5) == 3) {
- w.updateDocuments(t, new Iterable<Iterable<T>>() {
+ seqNo = w.updateDocuments(t, new Iterable<Iterable<T>>() {
@Override
public Iterator<Iterable<T>> iterator() {
@@ -243,49 +249,51 @@ public class RandomIndexWriter implements Closeable {
}
});
} else {
- w.updateDocument(t, doc);
+ seqNo = w.updateDocument(t, doc);
}
maybeFlushOrCommit();
+
+ return seqNo;
}
- public void addIndexes(Directory... dirs) throws IOException {
+ public long addIndexes(Directory... dirs) throws IOException {
LuceneTestCase.maybeChangeLiveIndexWriterConfig(r, w.getConfig());
- w.addIndexes(dirs);
+ return w.addIndexes(dirs);
}
- public void addIndexes(CodecReader... readers) throws IOException {
+ public long addIndexes(CodecReader... readers) throws IOException {
LuceneTestCase.maybeChangeLiveIndexWriterConfig(r, w.getConfig());
- w.addIndexes(readers);
+ return w.addIndexes(readers);
}
- public void updateNumericDocValue(Term term, String field, Long value) throws IOException {
+ public long updateNumericDocValue(Term term, String field, Long value) throws IOException {
LuceneTestCase.maybeChangeLiveIndexWriterConfig(r, w.getConfig());
- w.updateNumericDocValue(term, field, value);
+ return w.updateNumericDocValue(term, field, value);
}
- public void updateBinaryDocValue(Term term, String field, BytesRef value) throws IOException {
+ public long updateBinaryDocValue(Term term, String field, BytesRef value) throws IOException {
LuceneTestCase.maybeChangeLiveIndexWriterConfig(r, w.getConfig());
- w.updateBinaryDocValue(term, field, value);
+ return w.updateBinaryDocValue(term, field, value);
}
- public void updateDocValues(Term term, Field... updates) throws IOException {
+ public long updateDocValues(Term term, Field... updates) throws IOException {
LuceneTestCase.maybeChangeLiveIndexWriterConfig(r, w.getConfig());
- w.updateDocValues(term, updates);
+ return w.updateDocValues(term, updates);
}
- public void deleteDocuments(Term term) throws IOException {
+ public long deleteDocuments(Term term) throws IOException {
LuceneTestCase.maybeChangeLiveIndexWriterConfig(r, w.getConfig());
- w.deleteDocuments(term);
+ return w.deleteDocuments(term);
}
- public void deleteDocuments(Query q) throws IOException {
+ public long deleteDocuments(Query q) throws IOException {
LuceneTestCase.maybeChangeLiveIndexWriterConfig(r, w.getConfig());
- w.deleteDocuments(q);
+ return w.deleteDocuments(q);
}
- public void commit() throws IOException {
+ public long commit() throws IOException {
LuceneTestCase.maybeChangeLiveIndexWriterConfig(r, w.getConfig());
- w.commit();
+ return w.commit();
}
public int numDocs() {
@@ -296,8 +304,8 @@ public class RandomIndexWriter implements Closeable {
return w.maxDoc();
}
- public void deleteAll() throws IOException {
- w.deleteAll();
+ public long deleteAll() throws IOException {
+ return w.deleteAll();
}
public DirectoryReader getReader() throws IOException {
[09/15] lucene-solr:branch_6x: sequence numbers: fix concurrency bug
in add/updateDocuments; improve test debuggability;
simplify delete queue concurrency
Posted by mi...@apache.org.
sequence numbers: fix concurrency bug in add/updateDocuments; improve test debuggability; simplify delete queue concurrency
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/9ae81407
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/9ae81407
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/9ae81407
Branch: refs/heads/branch_6x
Commit: 9ae814073ccd985e3f1e44604f1bfb987d2d4e08
Parents: 5361d67
Author: Mike McCandless <mi...@apache.org>
Authored: Wed Jun 1 18:27:45 2016 -0400
Committer: Mike McCandless <mi...@apache.org>
Committed: Sat Jun 11 14:06:51 2016 -0400
----------------------------------------------------------------------
.../apache/lucene/index/BufferedUpdates.java | 5 +-
.../apache/lucene/index/DocumentsWriter.java | 3 +-
.../index/DocumentsWriterDeleteQueue.java | 69 ++++++++++----------
.../index/DocumentsWriterFlushControl.java | 7 +-
.../lucene/index/DocumentsWriterPerThread.java | 22 +++++--
.../index/DocumentsWriterPerThreadPool.java | 1 +
.../index/TestDocumentsWriterDeleteQueue.java | 6 +-
.../lucene/index/TestIndexWriterConfig.java | 1 -
.../index/TestIndexingSequenceNumbers.java | 50 ++++++++++++--
9 files changed, 106 insertions(+), 58 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9ae81407/lucene/core/src/java/org/apache/lucene/index/BufferedUpdates.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/BufferedUpdates.java b/lucene/core/src/java/org/apache/lucene/index/BufferedUpdates.java
index b59c616..1c3494f 100644
--- a/lucene/core/src/java/org/apache/lucene/index/BufferedUpdates.java
+++ b/lucene/core/src/java/org/apache/lucene/index/BufferedUpdates.java
@@ -158,9 +158,12 @@ class BufferedUpdates {
private final static boolean VERBOSE_DELETES = false;
long gen;
+
+ final String segmentName;
- public BufferedUpdates() {
+ public BufferedUpdates(String segmentName) {
this.bytesUsed = new AtomicLong();
+ this.segmentName = segmentName;
}
@Override
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9ae81407/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
index 5630fbb..13800a8 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
@@ -626,7 +626,7 @@ final class DocumentsWriter implements Closeable, Accountable {
/* Cutover to a new delete queue. This must be synced on the flush control
* otherwise a new DWPT could sneak into the loop with an already flushing
* delete queue */
- seqNo = flushControl.markForFullFlush(); // swaps the delQueue synced on FlushControl
+ seqNo = flushControl.markForFullFlush(); // swaps this.deleteQueue synced on FlushControl
assert setFlushingDeleteQueue(flushingDeleteQueue);
}
assert currentFullFlushDelQueue != null;
@@ -676,7 +676,6 @@ final class DocumentsWriter implements Closeable, Accountable {
} finally {
pendingChangesInCurrentFullFlush = false;
}
-
}
public LiveIndexWriterConfig getIndexWriterConfig() {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9ae81407/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java
index 4a11599..dac2e4c 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java
@@ -69,12 +69,15 @@ import org.apache.lucene.util.BytesRef;
*/
final class DocumentsWriterDeleteQueue implements Accountable {
+ // the current end (latest delete operation) in the delete queue:
private volatile Node<?> tail;
@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<DocumentsWriterDeleteQueue,Node> tailUpdater = AtomicReferenceFieldUpdater
.newUpdater(DocumentsWriterDeleteQueue.class, Node.class, "tail");
+ /** Used to record deletes against all prior (already written to disk) segments. Whenever any segment flushes, we bundle up this set of
+ * deletes and insert into the buffered updates stream before the newly flushed segment(s). */
private final DeleteSlice globalSlice;
private final BufferedUpdates globalBufferedUpdates;
@@ -85,6 +88,9 @@ final class DocumentsWriterDeleteQueue implements Accountable {
/** Generates the sequence number that IW returns to callers changing the index, showing the effective serialization of all operations. */
private final AtomicLong nextSeqNo;
+
+ // for asserts
+ long maxSeqNo = Long.MAX_VALUE;
DocumentsWriterDeleteQueue() {
// seqNo must start at 1 because some APIs negate this to also return a boolean
@@ -92,7 +98,7 @@ final class DocumentsWriterDeleteQueue implements Accountable {
}
DocumentsWriterDeleteQueue(long generation, long startSeqNo) {
- this(new BufferedUpdates(), generation, startSeqNo);
+ this(new BufferedUpdates("global"), generation, startSeqNo);
}
DocumentsWriterDeleteQueue(BufferedUpdates globalBufferedUpdates, long generation, long startSeqNo) {
@@ -130,7 +136,6 @@ final class DocumentsWriterDeleteQueue implements Accountable {
*/
long add(Term term, DeleteSlice slice) {
final TermNode termNode = new TermNode(term);
-// System.out.println(Thread.currentThread().getName() + ": push " + termNode + " this=" + this);
long seqNo = add(termNode);
/*
* this is an update request where the term is the updated documents
@@ -150,31 +155,10 @@ final class DocumentsWriterDeleteQueue implements Accountable {
return seqNo;
}
- long add(Node<?> newNode) {
- /*
- * this non-blocking / 'wait-free' linked list add was inspired by Apache
- * Harmony's ConcurrentLinkedQueue Implementation.
- */
- while (true) {
- final Node<?> currentTail = tail;
- final Node<?> tailNext = currentTail.next;
- if (tail == currentTail && tailNext == null) {
- /*
- * we are in quiescent state and can try to insert the newNode to the
- * current tail if we fail to insert we just retry the operation since
- * somebody else has already added its newNode
- */
- if (currentTail.casNext(null, newNode)) {
- /*
- * now that we are done we need to advance the tail
- */
- long seqNo = getNextSequenceNumber();
- boolean result = tailUpdater.compareAndSet(this, currentTail, newNode);
- assert result;
- return seqNo;
- }
- }
- }
+ synchronized long add(Node<?> newNode) {
+ tail.next = newNode;
+ this.tail = newNode;
+ return getNextSequenceNumber();
}
boolean anyChanges() {
@@ -185,8 +169,7 @@ final class DocumentsWriterDeleteQueue implements Accountable {
* and if the global slice is up-to-date
* and if globalBufferedUpdates has changes
*/
- return globalBufferedUpdates.any() || !globalSlice.isEmpty() || globalSlice.sliceTail != tail
- || tail.next != null;
+ return globalBufferedUpdates.any() || !globalSlice.isEmpty() || globalSlice.sliceTail != tail || tail.next != null;
} finally {
globalBufferLock.unlock();
}
@@ -201,8 +184,7 @@ final class DocumentsWriterDeleteQueue implements Accountable {
* tail the next time we can get the lock!
*/
try {
- if (updateSlice(globalSlice)) {
-// System.out.println(Thread.currentThread() + ": apply globalSlice");
+ if (updateSliceNoSeqNo(globalSlice)) {
globalSlice.apply(globalBufferedUpdates, BufferedUpdates.MAX_INT);
}
} finally {
@@ -231,7 +213,6 @@ final class DocumentsWriterDeleteQueue implements Accountable {
globalSlice.apply(globalBufferedUpdates, BufferedUpdates.MAX_INT);
}
-// System.out.println(Thread.currentThread().getName() + ": now freeze global buffer " + globalBufferedDeletes);
final FrozenBufferedUpdates packet = new FrozenBufferedUpdates(globalBufferedUpdates, false);
globalBufferedUpdates.clear();
return packet;
@@ -244,8 +225,21 @@ final class DocumentsWriterDeleteQueue implements Accountable {
return new DeleteSlice(tail);
}
- boolean updateSlice(DeleteSlice slice) {
- if (slice.sliceTail != tail) { // If we are the same just
+ /** Negative result means there were new deletes since we last applied */
+ synchronized long updateSlice(DeleteSlice slice) {
+ long seqNo = getNextSequenceNumber();
+ if (slice.sliceTail != tail) {
+ // new deletes arrived since we last checked
+ slice.sliceTail = tail;
+ seqNo = -seqNo;
+ }
+ return seqNo;
+ }
+
+ /** Just like updateSlice, but does not assign a sequence number */
+ boolean updateSliceNoSeqNo(DeleteSlice slice) {
+ if (slice.sliceTail != tail) {
+ // new deletes arrived since we last checked
slice.sliceTail = tail;
return true;
}
@@ -283,7 +277,6 @@ final class DocumentsWriterDeleteQueue implements Accountable {
current = current.next;
assert current != null : "slice property violated between the head on the tail must not be a null node";
current.apply(del, docIDUpto);
-// System.out.println(Thread.currentThread().getName() + ": pull " + current + " docIDUpto=" + docIDUpto);
} while (current != sliceTail);
reset();
}
@@ -462,13 +455,17 @@ final class DocumentsWriterDeleteQueue implements Accountable {
}
public long getNextSequenceNumber() {
- return nextSeqNo.getAndIncrement();
+ long seqNo = nextSeqNo.getAndIncrement();
+ assert seqNo < maxSeqNo: "seqNo=" + seqNo + " vs maxSeqNo=" + maxSeqNo;
+ return seqNo;
}
public long getLastSequenceNumber() {
return nextSeqNo.get()-1;
}
+ /** Inserts a gap in the sequence numbers. This is used by IW during flush or commit to ensure any in-flight threads get sequence numbers
+ * inside the gap */
public void skipSequenceNumbers(long jump) {
nextSeqNo.addAndGet(jump);
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9ae81407/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
index 99bf8d8..a8c1dc3 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
@@ -190,7 +190,7 @@ final class DocumentsWriterFlushControl implements Accountable {
flushingDWPT = null;
}
} else {
- flushingDWPT = tryCheckoutForFlush(perThread);
+ flushingDWPT = tryCheckoutForFlush(perThread);
}
return flushingDWPT;
} finally {
@@ -452,8 +452,7 @@ final class DocumentsWriterFlushControl implements Accountable {
.currentThread(), documentsWriter);
boolean success = false;
try {
- if (perThread.isInitialized()
- && perThread.dwpt.deleteQueue != documentsWriter.deleteQueue) {
+ if (perThread.isInitialized() && perThread.dwpt.deleteQueue != documentsWriter.deleteQueue) {
// There is a flush-all in process and this DWPT is
// now stale -- enroll it for flush and try for
// another DWPT:
@@ -479,11 +478,11 @@ final class DocumentsWriterFlushControl implements Accountable {
flushingQueue = documentsWriter.deleteQueue;
// Set a new delete queue - all subsequent DWPT will use this queue until
// we do another full flush
- //System.out.println("DWFC: fullFLush old seqNo=" + documentsWriter.deleteQueue.seqNo.get() + " activeThreadCount=" + perThreadPool.getActiveThreadStateCount());
// Insert a gap in seqNo of current active thread count, in the worst case each of those threads now have one operation in flight. It's fine
// if we have some sequence numbers that were never assigned:
seqNo = documentsWriter.deleteQueue.getLastSequenceNumber() + perThreadPool.getActiveThreadStateCount() + 2;
+ flushingQueue.maxSeqNo = seqNo+1;
DocumentsWriterDeleteQueue newQueue = new DocumentsWriterDeleteQueue(flushingQueue.generation+1, seqNo+1);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9ae81407/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
index cf5694d..e72145c 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
@@ -171,7 +171,7 @@ class DocumentsWriterPerThread {
this.pendingNumDocs = pendingNumDocs;
bytesUsed = Counter.newCounter();
byteBlockAllocator = new DirectTrackingAllocator(bytesUsed);
- pendingUpdates = new BufferedUpdates();
+ pendingUpdates = new BufferedUpdates(segmentName);
intBlockAllocator = new IntBlockAllocator(bytesUsed);
this.deleteQueue = deleteQueue;
assert numDocsInRAM == 0 : "num docs " + numDocsInRAM;
@@ -278,7 +278,8 @@ class DocumentsWriterPerThread {
numDocsInRAM++;
}
}
- finishDocument(null);
+
+ numDocsInRAM++;
}
allDocsIndexed = true;
@@ -292,7 +293,13 @@ class DocumentsWriterPerThread {
deleteSlice.apply(pendingUpdates, numDocsInRAM-docCount);
return seqNo;
} else {
- seqNo = deleteQueue.getNextSequenceNumber();
+ seqNo = deleteQueue.updateSlice(deleteSlice);
+ if (seqNo < 0) {
+ seqNo = -seqNo;
+ deleteSlice.apply(pendingUpdates, numDocsInRAM-docCount);
+ } else {
+ deleteSlice.reset();
+ }
}
return seqNo;
@@ -327,8 +334,13 @@ class DocumentsWriterPerThread {
seqNo = deleteQueue.add(delTerm, deleteSlice);
assert deleteSlice.isTailItem(delTerm) : "expected the delete term as the tail item";
} else {
- applySlice &= deleteQueue.updateSlice(deleteSlice);
- seqNo = deleteQueue.getNextSequenceNumber();
+ seqNo = deleteQueue.updateSlice(deleteSlice);
+
+ if (seqNo < 0) {
+ seqNo = -seqNo;
+ } else {
+ applySlice = false;
+ }
}
if (applySlice) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9ae81407/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
index 0b0ac84..87310fb 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
@@ -226,6 +226,7 @@ final class DocumentsWriterPerThreadPool {
return threadStates.get(ord);
}
+ // TODO: merge this with getActiveThreadStateCount: they are the same!
synchronized int getMaxThreadStates() {
return threadStates.size();
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9ae81407/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterDeleteQueue.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterDeleteQueue.java b/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterDeleteQueue.java
index 51e17cf..c60f54d 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterDeleteQueue.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterDeleteQueue.java
@@ -43,8 +43,8 @@ public class TestDocumentsWriterDeleteQueue extends LuceneTestCase {
}
DeleteSlice slice1 = queue.newSlice();
DeleteSlice slice2 = queue.newSlice();
- BufferedUpdates bd1 = new BufferedUpdates();
- BufferedUpdates bd2 = new BufferedUpdates();
+ BufferedUpdates bd1 = new BufferedUpdates("bd1");
+ BufferedUpdates bd2 = new BufferedUpdates("bd2");
int last1 = 0;
int last2 = 0;
Set<Term> uniqueValues = new HashSet<>();
@@ -225,7 +225,7 @@ public class TestDocumentsWriterDeleteQueue extends LuceneTestCase {
this.index = index;
this.ids = ids;
this.slice = queue.newSlice();
- deletes = new BufferedUpdates();
+ deletes = new BufferedUpdates("deletes");
this.latch = latch;
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9ae81407/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterConfig.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterConfig.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterConfig.java
index 2ffdce7..ec033d4 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterConfig.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterConfig.java
@@ -93,7 +93,6 @@ public class TestIndexWriterConfig extends LuceneTestCase {
getters.add("getIndexingChain");
getters.add("getMergedSegmentWarmer");
getters.add("getMergePolicy");
- getters.add("getMaxThreadStates");
getters.add("getReaderPooling");
getters.add("getIndexerThreadPool");
getters.add("getFlushPolicy");
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9ae81407/lucene/core/src/test/org/apache/lucene/index/TestIndexingSequenceNumbers.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexingSequenceNumbers.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexingSequenceNumbers.java
index 002292c..52c05d3 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestIndexingSequenceNumbers.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexingSequenceNumbers.java
@@ -30,9 +30,11 @@ import org.apache.lucene.document.Field;
import org.apache.lucene.document.StoredField;
import org.apache.lucene.document.StringField;
import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.store.Directory;
+import org.apache.lucene.util.Bits;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.TestUtil;
@@ -91,7 +93,13 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
doc.add(new StringField("id", "id", Field.Store.NO));
startingGun.await();
for(int j=0;j<100;j++) {
- seqNos[threadID] = w.updateDocument(id, doc);
+ if (random().nextBoolean()) {
+ seqNos[threadID] = w.updateDocument(id, doc);
+ } else {
+ List<Document> docs = new ArrayList<>();
+ docs.add(doc);
+ seqNos[threadID] = w.updateDocuments(id, docs);
+ }
}
} catch (Exception e) {
throw new RuntimeException(e);
@@ -147,7 +155,7 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
// Cannot use RIW since it randomly commits:
final IndexWriter w = new IndexWriter(dir, iwc);
- final int numThreads = TestUtil.nextInt(random(), 2, 5);
+ final int numThreads = TestUtil.nextInt(random(), 2, 10);
Thread[] threads = new Thread[numThreads];
//System.out.println("TEST: iter=" + iter + " opCount=" + opCount + " idCount=" + idCount + " threadCount=" + threads.length);
final CountDownLatch startingGun = new CountDownLatch(1);
@@ -265,7 +273,7 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
Document doc = r.document(hits.scoreDocs[0].doc);
int actualThreadID = doc.getField("thread").numericValue().intValue();
if (expectedThreadIDs[id] != actualThreadID) {
- System.out.println("FAIL: id=" + id + " expectedThreadID=" + expectedThreadIDs[id] + " vs actualThreadID=" + actualThreadID);
+ System.out.println("FAIL: id=" + id + " expectedThreadID=" + expectedThreadIDs[id] + " vs actualThreadID=" + actualThreadID + " commitSeqNo=" + commitSeqNo + " numThreads=" + numThreads);
for(int threadID=0;threadID<threadOps.size();threadID++) {
for(Operation op : threadOps.get(threadID)) {
if (id == op.id) {
@@ -276,7 +284,7 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
assertEquals("id=" + id, expectedThreadIDs[id], actualThreadID);
}
} else if (hits.totalHits != 0) {
- System.out.println("FAIL: id=" + id + " expectedThreadID=" + expectedThreadIDs[id] + " vs totalHits=" + hits.totalHits);
+ System.out.println("FAIL: id=" + id + " expectedThreadID=" + expectedThreadIDs[id] + " vs totalHits=" + hits.totalHits + " commitSeqNo=" + commitSeqNo + " numThreads=" + numThreads);
for(int threadID=0;threadID<threadOps.size();threadID++) {
for(Operation op : threadOps.get(threadID)) {
if (id == op.id) {
@@ -347,7 +355,7 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
}
} else {
Document doc = new Document();
- doc.add(new StoredField("thread", threadID));
+ doc.add(new StoredField("threadop", threadID + "-" + ops.size()));
doc.add(new StringField("id", "" + op.id, Field.Store.NO));
if (random().nextBoolean()) {
List<Document> docs = new ArrayList<>();
@@ -366,6 +374,7 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
}
}
};
+ threads[i].setName("thread" + threadID);
threads[i].start();
}
startingGun.countDown();
@@ -422,7 +431,34 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
for(int id=0;id<idCount;id++) {
//System.out.println("TEST: check id=" + id + " expectedThreadID=" + expectedThreadIDs[id]);
- assertEquals(expectedCounts[id], s.count(new TermQuery(new Term("id", ""+id))));
+ int actualCount = s.count(new TermQuery(new Term("id", ""+id)));
+ if (expectedCounts[id] != actualCount) {
+ System.out.println("TEST: FAIL r=" + r + " id=" + id + " commitSeqNo=" + commitSeqNo);
+ for(int threadID=0;threadID<threadOps.size();threadID++) {
+ int opCount2 = 0;
+ for(Operation op : threadOps.get(threadID)) {
+ if (op.id == id) {
+ boolean shouldCount = op.seqNo <= commitSeqNo && op.seqNo > lastDelSeqNos[op.id];
+ System.out.println(" id=" + id + " what=" + op.what + " threadop=" + threadID + "-" + opCount2 + " seqNo=" + op.seqNo + " vs lastDelSeqNo=" + lastDelSeqNos[op.id] + " shouldCount=" + shouldCount);
+ }
+ opCount2++;
+ }
+ }
+ TopDocs hits = s.search(new TermQuery(new Term("id", ""+id)), 1+actualCount);
+ for(ScoreDoc hit : hits.scoreDocs) {
+ System.out.println(" hit: " + s.doc(hit.doc).get("threadop"));
+ }
+
+ for(LeafReaderContext ctx : r.leaves()) {
+ System.out.println(" sub=" + ctx.reader());
+ Bits liveDocs = ctx.reader().getLiveDocs();
+ for(int docID=0;docID<ctx.reader().maxDoc();docID++) {
+ System.out.println(" docID=" + docID + " threadop=" + ctx.reader().document(docID).get("threadop") + (liveDocs != null && liveDocs.get(docID) == false ? " (deleted)" : ""));
+ }
+ }
+
+ assertEquals("commit " + i + " of " + commits.size() + " id=" + id + " reader=" + r, expectedCounts[id], actualCount);
+ }
}
w.close();
r.close();
@@ -442,4 +478,6 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
w.close();
dir.close();
}
+
+ // nocommit test doc values updates
}
[05/15] lucene-solr:branch_6x: sequence numbers: resolve last
nocommits
Posted by mi...@apache.org.
sequence numbers: resolve last nocommits
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/4ee0b494
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/4ee0b494
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/4ee0b494
Branch: refs/heads/branch_6x
Commit: 4ee0b4942990d4491a19e05d0ae35db26254a6db
Parents: 69c28ea
Author: Mike McCandless <mi...@apache.org>
Authored: Wed May 25 18:41:21 2016 -0400
Committer: Mike McCandless <mi...@apache.org>
Committed: Sat Jun 11 14:06:22 2016 -0400
----------------------------------------------------------------------
.../index/DocumentsWriterDeleteQueue.java | 37 ++------------------
.../index/DocumentsWriterFlushControl.java | 4 +--
.../org/apache/lucene/index/IndexWriter.java | 20 ++++++++---
.../search/ControlledRealTimeReopenThread.java | 3 --
.../index/TestIndexingSequenceNumbers.java | 25 ++++++-------
5 files changed, 31 insertions(+), 58 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/4ee0b494/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java
index 5d0e83d..abb735d 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java
@@ -150,41 +150,10 @@ final class DocumentsWriterDeleteQueue implements Accountable {
return seqNo;
}
- // nocommit can we remove the sync'd
synchronized long add(Node<?> newNode) {
- /*
- * this non-blocking / 'wait-free' linked list add was inspired by Apache
- * Harmony's ConcurrentLinkedQueue Implementation.
- */
- while (true) {
- final Node<?> currentTail = this.tail;
- final Node<?> tailNext = currentTail.next;
- if (tail == currentTail) {
- if (tailNext != null) {
- /*
- * we are in intermediate state here. the tails next pointer has been
- * advanced but the tail itself might not be updated yet. help to
- * advance the tail and try again updating it.
- */
- tailUpdater.compareAndSet(this, currentTail, tailNext); // can fail
- } else {
- /*
- * we are in quiescent state and can try to insert the new node to the
- * current tail if we fail to insert we just retry the operation since
- * somebody else has already added its item
- */
- if (currentTail.casNext(null, newNode)) {
- /*
- * now that we are done we need to advance the tail while another
- * thread could have advanced it already so we can ignore the return
- * type of this CAS call
- */
- tailUpdater.compareAndSet(this, currentTail, newNode);
- return seqNo.getAndIncrement();
- }
- }
- }
- }
+ tail.next = newNode;
+ tail = newNode;
+ return seqNo.getAndIncrement();
}
boolean anyChanges() {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/4ee0b494/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
index f388f46..bd8015d 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
@@ -484,8 +484,8 @@ final class DocumentsWriterFlushControl implements Accountable {
// jump over any possible in flight ops:
seqNo = documentsWriter.deleteQueue.seqNo.get() + perThreadPool.getActiveThreadStateCount();
- // nocommit is this (active thread state count) always enough of a gap? what if new indexing thread sneaks in just now? it would
- // have to get this next delete queue?
+ // Insert a gap in seqNo of current active thread count, in the worst case those threads now have one operation in flight. It's fine
+ // if we have some sequence numbers that were never assigned:
DocumentsWriterDeleteQueue newQueue = new DocumentsWriterDeleteQueue(flushingQueue.generation+1, seqNo+1);
documentsWriter.deleteQueue = newQueue;
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/4ee0b494/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
index 9a520b1..945399c 100644
--- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
@@ -426,7 +426,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
boolean success = false;
synchronized (fullFlushLock) {
try {
- // nocommit should we make this available in the returned NRT reader?
+ // TODO: should we somehow make this available in the returned NRT reader?
long seqNo = docWriter.flushAllThreads();
if (seqNo < 0) {
anyChanges = true;
@@ -2984,7 +2984,11 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
}
startCommit(toCommit);
success = true;
- return seqNo;
+ if (pendingCommit == null) {
+ return -1;
+ } else {
+ return seqNo;
+ }
} finally {
if (!success) {
synchronized (this) {
@@ -3058,6 +3062,12 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* loss it may still lose data. Lucene cannot guarantee
* consistency on such devices. </p>
*
+ * <p> If nothing was committed, because there were no
+ * pending changes, this returns -1. Otherwise, it returns
+ * the sequence number such that all indexing operations
+ * prior to this sequence will be included in the commit
+ * point, and all other operations will not. </p>
+ *
* @see #prepareCommit
*/
@Override
@@ -4978,9 +4988,11 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
};
}
- // nocommit javadocs
+ /** Returns the last sequence number.
+ *
+ * @lucene.experimental */
public long getLastSequenceNumber() {
ensureOpen();
- return docWriter.deleteQueue.seqNo.get();
+ return docWriter.deleteQueue.seqNo.get()-1;
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/4ee0b494/lucene/core/src/java/org/apache/lucene/search/ControlledRealTimeReopenThread.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/search/ControlledRealTimeReopenThread.java b/lucene/core/src/java/org/apache/lucene/search/ControlledRealTimeReopenThread.java
index d015ae9..466d793 100644
--- a/lucene/core/src/java/org/apache/lucene/search/ControlledRealTimeReopenThread.java
+++ b/lucene/core/src/java/org/apache/lucene/search/ControlledRealTimeReopenThread.java
@@ -151,9 +151,6 @@ public class ControlledRealTimeReopenThread<T> extends Thread implements Closeab
*/
public synchronized boolean waitForGeneration(long targetGen, int maxMS) throws InterruptedException {
final long curGen = writer.getLastSequenceNumber();
- if (targetGen > curGen) {
- throw new IllegalArgumentException("targetGen=" + targetGen + " was never returned by the ReferenceManager instance (current gen=" + curGen + ")");
- }
if (targetGen > searchingGen) {
// Notify the reopen thread that the waitingGen has
// changed, so it may wake up and realize it should
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/4ee0b494/lucene/core/src/test/org/apache/lucene/index/TestIndexingSequenceNumbers.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexingSequenceNumbers.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexingSequenceNumbers.java
index f2e9636..fb9b9ab 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestIndexingSequenceNumbers.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexingSequenceNumbers.java
@@ -74,8 +74,7 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
int iters = atLeast(100);
for(int iter=0;iter<iters;iter++) {
Directory dir = newDirectory();
- // nocommit use RandomIndexWriter
- final IndexWriter w = new IndexWriter(dir, newIndexWriterConfig());
+ final RandomIndexWriter w = new RandomIndexWriter(random(), dir);
Thread[] threads = new Thread[TestUtil.nextInt(random(), 2, 5)];
final CountDownLatch startingGun = new CountDownLatch(1);
final long[] seqNos = new long[threads.length];
@@ -117,7 +116,7 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
}
// make sure all sequence numbers were different
assertEquals(threads.length, allSeqNos.size());
- DirectoryReader r = DirectoryReader.open(w);
+ DirectoryReader r = w.getReader();
IndexSearcher s = newSearcher(r);
TopDocs hits = s.search(new TermQuery(id), 1);
assertEquals(1, hits.totalHits);
@@ -142,10 +141,12 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
final int idCount = TestUtil.nextInt(random(), 10, 1000);
Directory dir = newDirectory();
- // nocommit use RandomIndexWriter
IndexWriterConfig iwc = newIndexWriterConfig();
iwc.setIndexDeletionPolicy(NoDeletionPolicy.INSTANCE);
+
+ // Cannot use RIW since it randomly commits:
final IndexWriter w = new IndexWriter(dir, iwc);
+
final int numThreads = TestUtil.nextInt(random(), 2, 5);
Thread[] threads = new Thread[numThreads];
//System.out.println("TEST: iter=" + iter + " opCount=" + opCount + " idCount=" + idCount + " threadCount=" + threads.length);
@@ -171,11 +172,10 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
if (random().nextInt(500) == 17) {
op.what = 2;
synchronized(commitLock) {
- if (w.hasUncommittedChanges()) {
- op.seqNo = w.commit();
+ op.seqNo = w.commit();
+ if (op.seqNo != -1) {
commits.add(op);
}
- //System.out.println("done commit seqNo=" + op.seqNo);
}
} else {
op.id = random().nextInt(idCount);
@@ -215,14 +215,11 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
thread.join();
}
- /*
- // nocommit: why does this make the assertEquals angry...?
- if (w.hasUncommittedChanges()) {
- Operation commitOp = new Operation();
- commitOp.seqNo = w.commit();
+ Operation commitOp = new Operation();
+ commitOp.seqNo = w.commit();
+ if (commitOp.seqNo != -1) {
commits.add(commitOp);
}
- */
List<IndexCommit> indexCommits = DirectoryReader.listCommits(dir);
assertEquals(commits.size(), indexCommits.size());
@@ -296,6 +293,4 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
dir.close();
}
-
- // nocommit test that does n ops across threads, then does it again with a single index / single thread, and assert indices are the same
}
[14/15] lucene-solr:branch_6x: sequence numbers: fix test bug
Posted by mi...@apache.org.
sequence numbers: fix test bug
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/a73a6498
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/a73a6498
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/a73a6498
Branch: refs/heads/branch_6x
Commit: a73a64989e239002ddce4db15edd2be31f3cc43d
Parents: 1216b69
Author: Mike McCandless <mi...@apache.org>
Authored: Sat Jun 4 14:11:37 2016 -0400
Committer: Mike McCandless <mi...@apache.org>
Committed: Sat Jun 11 14:07:20 2016 -0400
----------------------------------------------------------------------
.../org/apache/lucene/store/IndexInput.java | 5 +++
.../index/TestIndexingSequenceNumbers.java | 32 +++++++++++---------
2 files changed, 23 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a73a6498/lucene/core/src/java/org/apache/lucene/store/IndexInput.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/store/IndexInput.java b/lucene/core/src/java/org/apache/lucene/store/IndexInput.java
index 8c9186c..4e36fe4 100644
--- a/lucene/core/src/java/org/apache/lucene/store/IndexInput.java
+++ b/lucene/core/src/java/org/apache/lucene/store/IndexInput.java
@@ -149,6 +149,11 @@ public abstract class IndexInput extends DataInput implements Cloneable,Closeabl
slice.seek(pos);
return slice.readLong();
}
+
+ @Override
+ public String toString() {
+ return "RandomAccessInput(" + IndexInput.this.toString() + ")";
+ }
};
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a73a6498/lucene/core/src/test/org/apache/lucene/index/TestIndexingSequenceNumbers.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexingSequenceNumbers.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexingSequenceNumbers.java
index f4fc2f0..9c7a6f9 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestIndexingSequenceNumbers.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexingSequenceNumbers.java
@@ -17,6 +17,14 @@
package org.apache.lucene.index;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.NumericDocValuesField;
@@ -28,17 +36,10 @@ import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.Bits;
+import org.apache.lucene.util.LuceneTestCase.SuppressCodecs;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.TestUtil;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicInteger;
-
public class TestIndexingSequenceNumbers extends LuceneTestCase {
public void testBasic() throws Exception {
@@ -316,6 +317,9 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
final IndexWriter w = new IndexWriter(dir, iwc);
final int numThreads = TestUtil.nextInt(random(), 2, 10);
+ if (VERBOSE) {
+ System.out.println("TEST: numThreads=" + numThreads);
+ }
Thread[] threads = new Thread[numThreads];
//System.out.println("TEST: iter=" + iter + " opCount=" + opCount + " idCount=" + idCount + " threadCount=" + threads.length);
final CountDownLatch startingGun = new CountDownLatch(1);
@@ -383,6 +387,7 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
}
}
};
+ threads[i].setName("thread" + i);
threads[i].start();
}
startingGun.countDown();
@@ -416,9 +421,8 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
for(Operation op : threadOps.get(threadID)) {
if (op.seqNo <= commitSeqNo && op.seqNo > seqNos[op.id]) {
seqNos[op.id] = op.seqNo;
- if (op.what == 0) {
- expectedThreadIDs[op.id] = threadID;
- }
+ assert op.what == 0;
+ expectedThreadIDs[op.id] = threadID;
}
assertTrue(op.seqNo > lastSeqNo);
@@ -437,9 +441,9 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
// We pre-add all ids up front:
assert expectedThreadIDs[id] != -1;
assertEquals(1, hits.totalHits);
- int actualThreadID = (int) docValues.get(id);
+ int actualThreadID = (int) docValues.get(hits.scoreDocs[0].doc);
if (expectedThreadIDs[id] != actualThreadID) {
- System.out.println("FAIL: commit=" + i + " (of " + commits.size() + ") id=" + id + " expectedThreadID=" + expectedThreadIDs[id] + " vs actualThreadID=" + actualThreadID + " commitSeqNo=" + commitSeqNo + " numThreads=" + numThreads);
+ System.out.println("FAIL: commit=" + i + " (of " + commits.size() + ") id=" + id + " expectedThreadID=" + expectedThreadIDs[id] + " vs actualThreadID=" + actualThreadID + " commitSeqNo=" + commitSeqNo + " numThreads=" + numThreads + " reader=" + r + " commit=" + indexCommits.get(i));
for(int threadID=0;threadID<threadOps.size();threadID++) {
for(Operation op : threadOps.get(threadID)) {
if (id == op.id) {
@@ -447,7 +451,7 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
}
}
}
- assertEquals("id=" + id, expectedThreadIDs[id], actualThreadID);
+ assertEquals("id=" + id + " docID=" + hits.scoreDocs[0].doc, expectedThreadIDs[id], actualThreadID);
}
}
w.close();
[03/15] lucene-solr:branch_6x: remove TrackingIndexWriter
Posted by mi...@apache.org.
remove TrackingIndexWriter
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/67c35aed
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/67c35aed
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/67c35aed
Branch: refs/heads/branch_6x
Commit: 67c35aedb918a870b849d98e54be8afbba2e9932
Parents: 7ee1f42
Author: Mike McCandless <mi...@apache.org>
Authored: Tue May 24 19:58:33 2016 -0400
Committer: Mike McCandless <mi...@apache.org>
Committed: Sat Jun 11 14:06:12 2016 -0400
----------------------------------------------------------------------
.../org/apache/lucene/index/IndexWriter.java | 6 +
.../lucene/index/TrackingIndexWriter.java | 153 -------------------
.../search/ControlledRealTimeReopenThread.java | 15 +-
.../org/apache/lucene/index/TestTryDelete.java | 17 +--
.../TestControlledRealTimeReopenThread.java | 19 +--
5 files changed, 25 insertions(+), 185 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/67c35aed/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
index 8609365..4791a15 100644
--- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
@@ -4960,4 +4960,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
}
};
}
+
+ // nocommit javadocs
+ public long getLastSequenceNumber() {
+ ensureOpen();
+ return docWriter.deleteQueue.seqNo.get();
+ }
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/67c35aed/lucene/core/src/java/org/apache/lucene/index/TrackingIndexWriter.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/TrackingIndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/TrackingIndexWriter.java
deleted file mode 100644
index 33c193b..0000000
--- a/lucene/core/src/java/org/apache/lucene/index/TrackingIndexWriter.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.lucene.index;
-
-
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.lucene.analysis.Analyzer;
-import org.apache.lucene.search.ControlledRealTimeReopenThread; // javadocs
-import org.apache.lucene.search.Query;
-import org.apache.lucene.store.Directory;
-
-/** Class that tracks changes to a delegated
- * IndexWriter, used by {@link
- * ControlledRealTimeReopenThread} to ensure specific
- * changes are visible. Create this class (passing your
- * IndexWriter), and then pass this class to {@link
- * ControlledRealTimeReopenThread}.
- * Be sure to make all changes via the
- * TrackingIndexWriter, otherwise {@link
- * ControlledRealTimeReopenThread} won't know about the changes.
- *
- * @lucene.experimental */
-
-// nocommit removeme
-public class TrackingIndexWriter {
- private final IndexWriter writer;
- private final AtomicLong indexingGen = new AtomicLong(1);
-
- /** Create a {@code TrackingIndexWriter} wrapping the
- * provided {@link IndexWriter}. */
- public TrackingIndexWriter(IndexWriter writer) {
- this.writer = writer;
- }
-
- /** Calls {@link
- * IndexWriter#updateDocument(Term,Iterable)} and
- * returns the generation that reflects this change. */
- public long updateDocument(Term t, Iterable<? extends IndexableField> d) throws IOException {
- writer.updateDocument(t, d);
- // Return gen as of when indexing finished:
- return indexingGen.get();
- }
-
- /** Calls {@link
- * IndexWriter#updateDocuments(Term,Iterable)} and returns
- * the generation that reflects this change. */
- public long updateDocuments(Term t, Iterable<? extends Iterable<? extends IndexableField>> docs) throws IOException {
- writer.updateDocuments(t, docs);
- // Return gen as of when indexing finished:
- return indexingGen.get();
- }
-
- /** Calls {@link IndexWriter#deleteDocuments(Term...)} and
- * returns the generation that reflects this change. */
- public long deleteDocuments(Term... terms) throws IOException {
- writer.deleteDocuments(terms);
- // Return gen as of when indexing finished:
- return indexingGen.get();
- }
-
- /** Calls {@link IndexWriter#deleteDocuments(Query...)}
- * and returns the generation that reflects this change. */
- public long deleteDocuments(Query... queries) throws IOException {
- writer.deleteDocuments(queries);
- // Return gen as of when indexing finished:
- return indexingGen.get();
- }
-
- /** Calls {@link IndexWriter#deleteAll} and returns the
- * generation that reflects this change. */
- public long deleteAll() throws IOException {
- writer.deleteAll();
- // Return gen as of when indexing finished:
- return indexingGen.get();
- }
-
- /** Calls {@link IndexWriter#addDocument(Iterable)}
- * and returns the generation that reflects this change. */
- public long addDocument(Iterable<? extends IndexableField> d) throws IOException {
- writer.addDocument(d);
- // Return gen as of when indexing finished:
- return indexingGen.get();
- }
-
- /** Calls {@link IndexWriter#addDocuments(Iterable)} and
- * returns the generation that reflects this change. */
- public long addDocuments(Iterable<? extends Iterable<? extends IndexableField>> docs) throws IOException {
- writer.addDocuments(docs);
- // Return gen as of when indexing finished:
- return indexingGen.get();
- }
-
- /** Calls {@link IndexWriter#addIndexes(Directory...)} and
- * returns the generation that reflects this change. */
- public long addIndexes(Directory... dirs) throws IOException {
- writer.addIndexes(dirs);
- // Return gen as of when indexing finished:
- return indexingGen.get();
- }
-
- /** Calls {@link IndexWriter#addIndexes(CodecReader...)}
- * and returns the generation that reflects this change. */
- public long addIndexes(CodecReader... readers) throws IOException {
- writer.addIndexes(readers);
- // Return gen as of when indexing finished:
- return indexingGen.get();
- }
-
- /** Return the current generation being indexed. */
- public long getGeneration() {
- return indexingGen.get();
- }
-
- /** Return the wrapped {@link IndexWriter}. */
- public IndexWriter getIndexWriter() {
- return writer;
- }
-
- /** Return and increment current gen.
- *
- * @lucene.internal */
- public long getAndIncrementGeneration() {
- return indexingGen.getAndIncrement();
- }
-
- /** Cals {@link
- * IndexWriter#tryDeleteDocument(IndexReader,int)} and
- * returns the generation that reflects this change. */
- public long tryDeleteDocument(IndexReader reader, int docID) throws IOException {
- if (writer.tryDeleteDocument(reader, docID) != -1) {
- return indexingGen.get();
- } else {
- return -1;
- }
- }
-}
-
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/67c35aed/lucene/core/src/java/org/apache/lucene/search/ControlledRealTimeReopenThread.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/search/ControlledRealTimeReopenThread.java b/lucene/core/src/java/org/apache/lucene/search/ControlledRealTimeReopenThread.java
index a2541cb..d015ae9 100644
--- a/lucene/core/src/java/org/apache/lucene/search/ControlledRealTimeReopenThread.java
+++ b/lucene/core/src/java/org/apache/lucene/search/ControlledRealTimeReopenThread.java
@@ -23,16 +23,11 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.index.TrackingIndexWriter;
import org.apache.lucene.util.ThreadInterruptedException;
/** Utility class that runs a thread to manage periodicc
* reopens of a {@link ReferenceManager}, with methods to wait for a specific
- * index changes to become visible. To use this class you
- * must first wrap your {@link IndexWriter} with a {@link
- * TrackingIndexWriter} and always use it to make changes
- * to the index, saving the returned generation. Then,
- * when a given search request needs to see a specific
+ * index changes to become visible. When a given search request needs to see a specific
* index change, call the {#waitForGeneration} to wait for
* that change to be visible. Note that this will only
* scale well if most searches do not need to wait for a
@@ -44,7 +39,7 @@ public class ControlledRealTimeReopenThread<T> extends Thread implements Closeab
private final ReferenceManager<T> manager;
private final long targetMaxStaleNS;
private final long targetMinStaleNS;
- private final TrackingIndexWriter writer;
+ private final IndexWriter writer;
private volatile boolean finish;
private volatile long waitingGen;
private volatile long searchingGen;
@@ -69,7 +64,7 @@ public class ControlledRealTimeReopenThread<T> extends Thread implements Closeab
* is waiting for a specific generation to
* become visible.
*/
- public ControlledRealTimeReopenThread(TrackingIndexWriter writer, ReferenceManager<T> manager, double targetMaxStaleSec, double targetMinStaleSec) {
+ public ControlledRealTimeReopenThread(IndexWriter writer, ReferenceManager<T> manager, double targetMaxStaleSec, double targetMinStaleSec) {
if (targetMaxStaleSec < targetMinStaleSec) {
throw new IllegalArgumentException("targetMaxScaleSec (= " + targetMaxStaleSec + ") < targetMinStaleSec (=" + targetMinStaleSec + ")");
}
@@ -155,7 +150,7 @@ public class ControlledRealTimeReopenThread<T> extends Thread implements Closeab
* or false if maxMS wait time was exceeded
*/
public synchronized boolean waitForGeneration(long targetGen, int maxMS) throws InterruptedException {
- final long curGen = writer.getGeneration();
+ final long curGen = writer.getLastSequenceNumber();
if (targetGen > curGen) {
throw new IllegalArgumentException("targetGen=" + targetGen + " was never returned by the ReferenceManager instance (current gen=" + curGen + ")");
}
@@ -240,7 +235,7 @@ public class ControlledRealTimeReopenThread<T> extends Thread implements Closeab
// Save the gen as of when we started the reopen; the
// listener (HandleRefresh above) copies this to
// searchingGen once the reopen completes:
- refreshStartGen = writer.getAndIncrementGeneration();
+ refreshStartGen = writer.getLastSequenceNumber();
try {
manager.maybeRefreshBlocking();
} catch (IOException ioe) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/67c35aed/lucene/core/src/test/org/apache/lucene/index/TestTryDelete.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestTryDelete.java b/lucene/core/src/test/org/apache/lucene/index/TestTryDelete.java
index f9f1f0d..6ce519d 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestTryDelete.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestTryDelete.java
@@ -79,8 +79,6 @@ public class TestTryDelete extends LuceneTestCase
ReferenceManager<IndexSearcher> mgr = new SearcherManager(writer,
new SearcherFactory());
- TrackingIndexWriter mgrWriter = new TrackingIndexWriter(writer);
-
IndexSearcher searcher = mgr.acquire();
TopDocs topDocs = searcher.search(new TermQuery(new Term("foo", "0")),
@@ -90,10 +88,10 @@ public class TestTryDelete extends LuceneTestCase
long result;
if (random().nextBoolean()) {
IndexReader r = DirectoryReader.open(writer);
- result = mgrWriter.tryDeleteDocument(r, 0);
+ result = writer.tryDeleteDocument(r, 0);
r.close();
} else {
- result = mgrWriter.tryDeleteDocument(searcher.getIndexReader(), 0);
+ result = writer.tryDeleteDocument(searcher.getIndexReader(), 0);
}
// The tryDeleteDocument should have succeeded:
@@ -132,10 +130,9 @@ public class TestTryDelete extends LuceneTestCase
100);
assertEquals(1, topDocs.totalHits);
- TrackingIndexWriter mgrWriter = new TrackingIndexWriter(writer);
- long result = mgrWriter.tryDeleteDocument(DirectoryReader.open(writer), 0);
+ long result = writer.tryDeleteDocument(DirectoryReader.open(writer), 0);
- assertEquals(1, result);
+ assertTrue(result != -1);
writer.commit();
@@ -175,11 +172,9 @@ public class TestTryDelete extends LuceneTestCase
100);
assertEquals(1, topDocs.totalHits);
- TrackingIndexWriter mgrWriter = new TrackingIndexWriter(writer);
- long result = mgrWriter.deleteDocuments(new TermQuery(new Term("foo",
- "0")));
+ long result = writer.deleteDocuments(new TermQuery(new Term("foo", "0")));
- assertEquals(1, result);
+ assertTrue(result != -1);
// writer.commit();
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/67c35aed/lucene/core/src/test/org/apache/lucene/search/TestControlledRealTimeReopenThread.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/test/org/apache/lucene/search/TestControlledRealTimeReopenThread.java b/lucene/core/src/test/org/apache/lucene/search/TestControlledRealTimeReopenThread.java
index 2ef954c..69822a6 100644
--- a/lucene/core/src/test/org/apache/lucene/search/TestControlledRealTimeReopenThread.java
+++ b/lucene/core/src/test/org/apache/lucene/search/TestControlledRealTimeReopenThread.java
@@ -40,7 +40,6 @@ import org.apache.lucene.index.RandomIndexWriter;
import org.apache.lucene.index.SnapshotDeletionPolicy;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.ThreadedIndexingAndSearchingTestCase;
-import org.apache.lucene.index.TrackingIndexWriter;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.NRTCachingDirectory;
import org.apache.lucene.util.IOUtils;
@@ -57,7 +56,7 @@ public class TestControlledRealTimeReopenThread extends ThreadedIndexingAndSearc
// Is guaranteed to reflect deletes:
private SearcherManager nrtDeletes;
- private TrackingIndexWriter genWriter;
+ private IndexWriter genWriter;
private ControlledRealTimeReopenThread<IndexSearcher> nrtDeletesThread;
private ControlledRealTimeReopenThread<IndexSearcher> nrtNoDeletesThread;
@@ -219,7 +218,7 @@ public class TestControlledRealTimeReopenThread extends ThreadedIndexingAndSearc
System.out.println("TEST: make SearcherManager maxReopenSec=" + maxReopenSec + " minReopenSec=" + minReopenSec);
}
- genWriter = new TrackingIndexWriter(writer);
+ genWriter = writer;
final SearcherFactory sf = new SearcherFactory() {
@Override
@@ -311,9 +310,8 @@ public class TestControlledRealTimeReopenThread extends ThreadedIndexingAndSearc
final CountDownLatch latch = new CountDownLatch(1);
final CountDownLatch signal = new CountDownLatch(1);
- LatchedIndexWriter _writer = new LatchedIndexWriter(d, conf, latch, signal);
- final TrackingIndexWriter writer = new TrackingIndexWriter(_writer);
- final SearcherManager manager = new SearcherManager(_writer, false, false, null);
+ LatchedIndexWriter writer = new LatchedIndexWriter(d, conf, latch, signal);
+ final SearcherManager manager = new SearcherManager(writer, false, false, null);
Document doc = new Document();
doc.add(newTextField("test", "test", Field.Store.YES));
writer.addDocument(doc);
@@ -334,7 +332,7 @@ public class TestControlledRealTimeReopenThread extends ThreadedIndexingAndSearc
}
};
t.start();
- _writer.waitAfterUpdate = true; // wait in addDocument to let some reopens go through
+ writer.waitAfterUpdate = true; // wait in addDocument to let some reopens go through
final long lastGen = writer.updateDocument(new Term("foo", "bar"), doc); // once this returns the doc is already reflected in the last reopen
assertFalse(manager.isSearcherCurrent()); // false since there is a delete in the queue
@@ -373,7 +371,7 @@ public class TestControlledRealTimeReopenThread extends ThreadedIndexingAndSearc
}
thread.close();
thread.join();
- _writer.close();
+ writer.close();
IOUtils.close(manager, d);
}
@@ -483,9 +481,8 @@ public class TestControlledRealTimeReopenThread extends ThreadedIndexingAndSearc
config.setOpenMode(IndexWriterConfig.OpenMode.CREATE_OR_APPEND);
final IndexWriter iw = new IndexWriter(dir, config);
SearcherManager sm = new SearcherManager(iw, new SearcherFactory());
- final TrackingIndexWriter tiw = new TrackingIndexWriter(iw);
ControlledRealTimeReopenThread<IndexSearcher> controlledRealTimeReopenThread =
- new ControlledRealTimeReopenThread<>(tiw, sm, maxStaleSecs, 0);
+ new ControlledRealTimeReopenThread<>(iw, sm, maxStaleSecs, 0);
controlledRealTimeReopenThread.setDaemon(true);
controlledRealTimeReopenThread.start();
@@ -517,7 +514,7 @@ public class TestControlledRealTimeReopenThread extends ThreadedIndexingAndSearc
d.add(new TextField("count", i + "", Field.Store.NO));
d.add(new TextField("content", content, Field.Store.YES));
long start = System.currentTimeMillis();
- long l = tiw.addDocument(d);
+ long l = iw.addDocument(d);
controlledRealTimeReopenThread.waitForGeneration(l);
long wait = System.currentTimeMillis() - start;
assertTrue("waited too long for generation " + wait,
[12/15] lucene-solr:branch_6x: sequence numbers: improve test
debuggability, IW javadocs
Posted by mi...@apache.org.
sequence numbers: improve test debuggability, IW javadocs
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/4bdd08a9
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/4bdd08a9
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/4bdd08a9
Branch: refs/heads/branch_6x
Commit: 4bdd08a9a6d08974fdae94bd9ea584da23b5b0f7
Parents: 0b9c0dd
Author: Mike McCandless <mi...@apache.org>
Authored: Thu Jun 2 05:58:09 2016 -0400
Committer: Mike McCandless <mi...@apache.org>
Committed: Sat Jun 11 14:07:10 2016 -0400
----------------------------------------------------------------------
.../org/apache/lucene/index/IndexWriter.java | 4 +--
.../index/TestIndexingSequenceNumbers.java | 37 ++++++++------------
2 files changed, 17 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/4bdd08a9/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
index 2bdfea7..b5e0c22 100644
--- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
@@ -2928,7 +2928,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* will internally call prepareCommit.
*
* @return The <a href="#sequence_number">sequence number</a>
- * last operation in the commit. All sequence numbers <= this value
+ * of the last operation in the commit. All sequence numbers <= this value
* will be reflected in the commit, and all others will not.
*/
@Override
@@ -3128,7 +3128,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* @see #prepareCommit
*
* @return The <a href="#sequence_number">sequence number</a>
- * last operation in the commit. All sequence numbers <= this value
+ * of the last operation in the commit. All sequence numbers <= this value
* will be reflected in the commit, and all others will not.
*/
@Override
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/4bdd08a9/lucene/core/src/test/org/apache/lucene/index/TestIndexingSequenceNumbers.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexingSequenceNumbers.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexingSequenceNumbers.java
index 23389dd..f4fc2f0 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestIndexingSequenceNumbers.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexingSequenceNumbers.java
@@ -165,7 +165,8 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
Object commitLock = new Object();
final List<Operation> commits = new ArrayList<>();
- // multiple threads update the same set of documents, and we randomly commit
+ // multiple threads update the same set of documents, and we randomly commit, recording the commit seqNo and then opening each commit in
+ // the end to verify it reflects the correct updates
for(int i=0;i<threads.length;i++) {
final List<Operation> ops = new ArrayList<>();
threadOps.add(ops);
@@ -325,7 +326,8 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
List<Operation> ops1 = new ArrayList<>();
threadOps.add(ops1);
-
+
+ // pre-index every ID so none are missing:
for(int id=0;id<idCount;id++) {
int threadID = 0;
Operation op = new Operation();
@@ -340,7 +342,8 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
ops1.add(op);
}
- // multiple threads update the same set of documents, and we randomly commit
+ // multiple threads update the same set of documents, and we randomly commit, recording the commit seqNo and then opening each commit in
+ // the end to verify it reflects the correct updates
for(int i=0;i<threads.length;i++) {
final List<Operation> ops;
if (i == 0) {
@@ -430,31 +433,21 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
for(int id=0;id<idCount;id++) {
//System.out.println("TEST: check id=" + id + " expectedThreadID=" + expectedThreadIDs[id]);
TopDocs hits = s.search(new TermQuery(new Term("id", ""+id)), 1);
-
- if (expectedThreadIDs[id] != -1) {
- assertEquals(1, hits.totalHits);
- int actualThreadID = (int) docValues.get(id);
- if (expectedThreadIDs[id] != actualThreadID) {
- System.out.println("FAIL: id=" + id + " expectedThreadID=" + expectedThreadIDs[id] + " vs actualThreadID=" + actualThreadID + " commitSeqNo=" + commitSeqNo + " numThreads=" + numThreads);
- for(int threadID=0;threadID<threadOps.size();threadID++) {
- for(Operation op : threadOps.get(threadID)) {
- if (id == op.id) {
- System.out.println(" threadID=" + threadID + " seqNo=" + op.seqNo + " " + (op.what == 2 ? "updated" : "deleted"));
- }
- }
- }
- assertEquals("id=" + id, expectedThreadIDs[id], actualThreadID);
- }
- } else if (hits.totalHits != 0) {
- System.out.println("FAIL: id=" + id + " expectedThreadID=" + expectedThreadIDs[id] + " vs totalHits=" + hits.totalHits + " commitSeqNo=" + commitSeqNo + " numThreads=" + numThreads);
+
+ // We pre-add all ids up front:
+ assert expectedThreadIDs[id] != -1;
+ assertEquals(1, hits.totalHits);
+ int actualThreadID = (int) docValues.get(id);
+ if (expectedThreadIDs[id] != actualThreadID) {
+ System.out.println("FAIL: commit=" + i + " (of " + commits.size() + ") id=" + id + " expectedThreadID=" + expectedThreadIDs[id] + " vs actualThreadID=" + actualThreadID + " commitSeqNo=" + commitSeqNo + " numThreads=" + numThreads);
for(int threadID=0;threadID<threadOps.size();threadID++) {
for(Operation op : threadOps.get(threadID)) {
if (id == op.id) {
- System.out.println(" threadID=" + threadID + " seqNo=" + op.seqNo + " " + (op.what == 2 ? "updated" : "del"));
+ System.out.println(" threadID=" + threadID + " seqNo=" + op.seqNo);
}
}
}
- assertEquals(0, hits.totalHits);
+ assertEquals("id=" + id, expectedThreadIDs[id], actualThreadID);
}
}
w.close();
[15/15] lucene-solr:branch_6x: LUCENE-7302: move CHANGES entry to the
right section
Posted by mi...@apache.org.
LUCENE-7302: move CHANGES entry to the right section
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/00584579
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/00584579
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/00584579
Branch: refs/heads/branch_6x
Commit: 00584579b70041addbd47859012e25e67e079e10
Parents: a73a649
Author: Mike McCandless <mi...@apache.org>
Authored: Sat Jun 11 15:37:35 2016 -0400
Committer: Mike McCandless <mi...@apache.org>
Committed: Sat Jun 11 15:37:35 2016 -0400
----------------------------------------------------------------------
lucene/CHANGES.txt | 10 ++++++----
1 file changed, 6 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/00584579/lucene/CHANGES.txt
----------------------------------------------------------------------
diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt
index 350bee1..b577606 100644
--- a/lucene/CHANGES.txt
+++ b/lucene/CHANGES.txt
@@ -9,6 +9,12 @@ Bug Fixes
* LUCENE-6662: Fixed potential resource leaks. (Rishabh Patel via Adrien Grand)
+New Features
+
+* LUCENE-7302: IndexWriter methods that change the index now return a
+ long "sequence number" indicating the effective equivalent
+ single-threaded execution order (Mike McCandless)
+
Improvements
* LUCENE-7323: Compound file writing now verifies the incoming
@@ -45,10 +51,6 @@ New Features
applicable and supported when copying files from another FSDirectory in
Directory#copyFrom. (Simon Willnauer)
-* LUCENE-7302: IndexWriter methods that change the index now return a
- long "sequence number" indicating the effective equivalent
- single-threaded execution order (Mike McCandless)
-
API Changes
* LUCENE-7163: refactor GeoRect, Polygon, and GeoUtils tests to geo