You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by mi...@apache.org on 2016/06/05 09:51:12 UTC
[11/19] lucene-solr:master: sequence numbers: fix concurrency bug in
add/updateDocuments; improve test debuggability;
simplify delete queue concurrency
sequence numbers: fix concurrency bug in add/updateDocuments; improve test debuggability; simplify delete queue concurrency
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/76fb616b
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/76fb616b
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/76fb616b
Branch: refs/heads/master
Commit: 76fb616bdf591925caaa036e78dafa2e08d64846
Parents: 59311a4
Author: Mike McCandless <mi...@apache.org>
Authored: Wed Jun 1 18:27:45 2016 -0400
Committer: Mike McCandless <mi...@apache.org>
Committed: Wed Jun 1 18:27:45 2016 -0400
----------------------------------------------------------------------
.../apache/lucene/index/BufferedUpdates.java | 5 +-
.../apache/lucene/index/DocumentsWriter.java | 3 +-
.../index/DocumentsWriterDeleteQueue.java | 69 ++++++++++----------
.../index/DocumentsWriterFlushControl.java | 7 +-
.../lucene/index/DocumentsWriterPerThread.java | 22 +++++--
.../index/DocumentsWriterPerThreadPool.java | 1 +
.../index/TestDocumentsWriterDeleteQueue.java | 6 +-
.../lucene/index/TestIndexWriterConfig.java | 1 -
.../index/TestIndexingSequenceNumbers.java | 50 ++++++++++++--
9 files changed, 106 insertions(+), 58 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/76fb616b/lucene/core/src/java/org/apache/lucene/index/BufferedUpdates.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/BufferedUpdates.java b/lucene/core/src/java/org/apache/lucene/index/BufferedUpdates.java
index b59c616..1c3494f 100644
--- a/lucene/core/src/java/org/apache/lucene/index/BufferedUpdates.java
+++ b/lucene/core/src/java/org/apache/lucene/index/BufferedUpdates.java
@@ -158,9 +158,12 @@ class BufferedUpdates {
private final static boolean VERBOSE_DELETES = false;
long gen;
+
+ final String segmentName;
- public BufferedUpdates() {
+ public BufferedUpdates(String segmentName) {
this.bytesUsed = new AtomicLong();
+ this.segmentName = segmentName;
}
@Override
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/76fb616b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
index 5630fbb..13800a8 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
@@ -626,7 +626,7 @@ final class DocumentsWriter implements Closeable, Accountable {
/* Cutover to a new delete queue. This must be synced on the flush control
* otherwise a new DWPT could sneak into the loop with an already flushing
* delete queue */
- seqNo = flushControl.markForFullFlush(); // swaps the delQueue synced on FlushControl
+ seqNo = flushControl.markForFullFlush(); // swaps this.deleteQueue synced on FlushControl
assert setFlushingDeleteQueue(flushingDeleteQueue);
}
assert currentFullFlushDelQueue != null;
@@ -676,7 +676,6 @@ final class DocumentsWriter implements Closeable, Accountable {
} finally {
pendingChangesInCurrentFullFlush = false;
}
-
}
public LiveIndexWriterConfig getIndexWriterConfig() {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/76fb616b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java
index 4a11599..dac2e4c 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java
@@ -69,12 +69,15 @@ import org.apache.lucene.util.BytesRef;
*/
final class DocumentsWriterDeleteQueue implements Accountable {
+ // the current end (latest delete operation) in the delete queue:
private volatile Node<?> tail;
@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<DocumentsWriterDeleteQueue,Node> tailUpdater = AtomicReferenceFieldUpdater
.newUpdater(DocumentsWriterDeleteQueue.class, Node.class, "tail");
+ /** Used to record deletes against all prior (already written to disk) segments. Whenever any segment flushes, we bundle up this set of
+ * deletes and insert into the buffered updates stream before the newly flushed segment(s). */
private final DeleteSlice globalSlice;
private final BufferedUpdates globalBufferedUpdates;
@@ -85,6 +88,9 @@ final class DocumentsWriterDeleteQueue implements Accountable {
/** Generates the sequence number that IW returns to callers changing the index, showing the effective serialization of all operations. */
private final AtomicLong nextSeqNo;
+
+ // for asserts
+ long maxSeqNo = Long.MAX_VALUE;
DocumentsWriterDeleteQueue() {
// seqNo must start at 1 because some APIs negate this to also return a boolean
@@ -92,7 +98,7 @@ final class DocumentsWriterDeleteQueue implements Accountable {
}
DocumentsWriterDeleteQueue(long generation, long startSeqNo) {
- this(new BufferedUpdates(), generation, startSeqNo);
+ this(new BufferedUpdates("global"), generation, startSeqNo);
}
DocumentsWriterDeleteQueue(BufferedUpdates globalBufferedUpdates, long generation, long startSeqNo) {
@@ -130,7 +136,6 @@ final class DocumentsWriterDeleteQueue implements Accountable {
*/
long add(Term term, DeleteSlice slice) {
final TermNode termNode = new TermNode(term);
-// System.out.println(Thread.currentThread().getName() + ": push " + termNode + " this=" + this);
long seqNo = add(termNode);
/*
* this is an update request where the term is the updated documents
@@ -150,31 +155,10 @@ final class DocumentsWriterDeleteQueue implements Accountable {
return seqNo;
}
- long add(Node<?> newNode) {
- /*
- * this non-blocking / 'wait-free' linked list add was inspired by Apache
- * Harmony's ConcurrentLinkedQueue Implementation.
- */
- while (true) {
- final Node<?> currentTail = tail;
- final Node<?> tailNext = currentTail.next;
- if (tail == currentTail && tailNext == null) {
- /*
- * we are in quiescent state and can try to insert the newNode to the
- * current tail if we fail to insert we just retry the operation since
- * somebody else has already added its newNode
- */
- if (currentTail.casNext(null, newNode)) {
- /*
- * now that we are done we need to advance the tail
- */
- long seqNo = getNextSequenceNumber();
- boolean result = tailUpdater.compareAndSet(this, currentTail, newNode);
- assert result;
- return seqNo;
- }
- }
- }
+ synchronized long add(Node<?> newNode) {
+ tail.next = newNode;
+ this.tail = newNode;
+ return getNextSequenceNumber();
}
boolean anyChanges() {
@@ -185,8 +169,7 @@ final class DocumentsWriterDeleteQueue implements Accountable {
* and if the global slice is up-to-date
* and if globalBufferedUpdates has changes
*/
- return globalBufferedUpdates.any() || !globalSlice.isEmpty() || globalSlice.sliceTail != tail
- || tail.next != null;
+ return globalBufferedUpdates.any() || !globalSlice.isEmpty() || globalSlice.sliceTail != tail || tail.next != null;
} finally {
globalBufferLock.unlock();
}
@@ -201,8 +184,7 @@ final class DocumentsWriterDeleteQueue implements Accountable {
* tail the next time we can get the lock!
*/
try {
- if (updateSlice(globalSlice)) {
-// System.out.println(Thread.currentThread() + ": apply globalSlice");
+ if (updateSliceNoSeqNo(globalSlice)) {
globalSlice.apply(globalBufferedUpdates, BufferedUpdates.MAX_INT);
}
} finally {
@@ -231,7 +213,6 @@ final class DocumentsWriterDeleteQueue implements Accountable {
globalSlice.apply(globalBufferedUpdates, BufferedUpdates.MAX_INT);
}
-// System.out.println(Thread.currentThread().getName() + ": now freeze global buffer " + globalBufferedDeletes);
final FrozenBufferedUpdates packet = new FrozenBufferedUpdates(globalBufferedUpdates, false);
globalBufferedUpdates.clear();
return packet;
@@ -244,8 +225,21 @@ final class DocumentsWriterDeleteQueue implements Accountable {
return new DeleteSlice(tail);
}
- boolean updateSlice(DeleteSlice slice) {
- if (slice.sliceTail != tail) { // If we are the same just
+ /** Negative result means there were new deletes since we last applied */
+ synchronized long updateSlice(DeleteSlice slice) {
+ long seqNo = getNextSequenceNumber();
+ if (slice.sliceTail != tail) {
+ // new deletes arrived since we last checked
+ slice.sliceTail = tail;
+ seqNo = -seqNo;
+ }
+ return seqNo;
+ }
+
+ /** Just like updateSlice, but does not assign a sequence number */
+ boolean updateSliceNoSeqNo(DeleteSlice slice) {
+ if (slice.sliceTail != tail) {
+ // new deletes arrived since we last checked
slice.sliceTail = tail;
return true;
}
@@ -283,7 +277,6 @@ final class DocumentsWriterDeleteQueue implements Accountable {
current = current.next;
assert current != null : "slice property violated between the head on the tail must not be a null node";
current.apply(del, docIDUpto);
-// System.out.println(Thread.currentThread().getName() + ": pull " + current + " docIDUpto=" + docIDUpto);
} while (current != sliceTail);
reset();
}
@@ -462,13 +455,17 @@ final class DocumentsWriterDeleteQueue implements Accountable {
}
public long getNextSequenceNumber() {
- return nextSeqNo.getAndIncrement();
+ long seqNo = nextSeqNo.getAndIncrement();
+ assert seqNo < maxSeqNo: "seqNo=" + seqNo + " vs maxSeqNo=" + maxSeqNo;
+ return seqNo;
}
public long getLastSequenceNumber() {
return nextSeqNo.get()-1;
}
+ /** Inserts a gap in the sequence numbers. This is used by IW during flush or commit to ensure any in-flight threads get sequence numbers
+ * inside the gap */
public void skipSequenceNumbers(long jump) {
nextSeqNo.addAndGet(jump);
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/76fb616b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
index 99bf8d8..a8c1dc3 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
@@ -190,7 +190,7 @@ final class DocumentsWriterFlushControl implements Accountable {
flushingDWPT = null;
}
} else {
- flushingDWPT = tryCheckoutForFlush(perThread);
+ flushingDWPT = tryCheckoutForFlush(perThread);
}
return flushingDWPT;
} finally {
@@ -452,8 +452,7 @@ final class DocumentsWriterFlushControl implements Accountable {
.currentThread(), documentsWriter);
boolean success = false;
try {
- if (perThread.isInitialized()
- && perThread.dwpt.deleteQueue != documentsWriter.deleteQueue) {
+ if (perThread.isInitialized() && perThread.dwpt.deleteQueue != documentsWriter.deleteQueue) {
// There is a flush-all in process and this DWPT is
// now stale -- enroll it for flush and try for
// another DWPT:
@@ -479,11 +478,11 @@ final class DocumentsWriterFlushControl implements Accountable {
flushingQueue = documentsWriter.deleteQueue;
// Set a new delete queue - all subsequent DWPT will use this queue until
// we do another full flush
- //System.out.println("DWFC: fullFLush old seqNo=" + documentsWriter.deleteQueue.seqNo.get() + " activeThreadCount=" + perThreadPool.getActiveThreadStateCount());
// Insert a gap in seqNo of current active thread count, in the worst case each of those threads now have one operation in flight. It's fine
// if we have some sequence numbers that were never assigned:
seqNo = documentsWriter.deleteQueue.getLastSequenceNumber() + perThreadPool.getActiveThreadStateCount() + 2;
+ flushingQueue.maxSeqNo = seqNo+1;
DocumentsWriterDeleteQueue newQueue = new DocumentsWriterDeleteQueue(flushingQueue.generation+1, seqNo+1);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/76fb616b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
index cf5694d..e72145c 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
@@ -171,7 +171,7 @@ class DocumentsWriterPerThread {
this.pendingNumDocs = pendingNumDocs;
bytesUsed = Counter.newCounter();
byteBlockAllocator = new DirectTrackingAllocator(bytesUsed);
- pendingUpdates = new BufferedUpdates();
+ pendingUpdates = new BufferedUpdates(segmentName);
intBlockAllocator = new IntBlockAllocator(bytesUsed);
this.deleteQueue = deleteQueue;
assert numDocsInRAM == 0 : "num docs " + numDocsInRAM;
@@ -278,7 +278,8 @@ class DocumentsWriterPerThread {
numDocsInRAM++;
}
}
- finishDocument(null);
+
+ numDocsInRAM++;
}
allDocsIndexed = true;
@@ -292,7 +293,13 @@ class DocumentsWriterPerThread {
deleteSlice.apply(pendingUpdates, numDocsInRAM-docCount);
return seqNo;
} else {
- seqNo = deleteQueue.getNextSequenceNumber();
+ seqNo = deleteQueue.updateSlice(deleteSlice);
+ if (seqNo < 0) {
+ seqNo = -seqNo;
+ deleteSlice.apply(pendingUpdates, numDocsInRAM-docCount);
+ } else {
+ deleteSlice.reset();
+ }
}
return seqNo;
@@ -327,8 +334,13 @@ class DocumentsWriterPerThread {
seqNo = deleteQueue.add(delTerm, deleteSlice);
assert deleteSlice.isTailItem(delTerm) : "expected the delete term as the tail item";
} else {
- applySlice &= deleteQueue.updateSlice(deleteSlice);
- seqNo = deleteQueue.getNextSequenceNumber();
+ seqNo = deleteQueue.updateSlice(deleteSlice);
+
+ if (seqNo < 0) {
+ seqNo = -seqNo;
+ } else {
+ applySlice = false;
+ }
}
if (applySlice) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/76fb616b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
index 0b0ac84..87310fb 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
@@ -226,6 +226,7 @@ final class DocumentsWriterPerThreadPool {
return threadStates.get(ord);
}
+ // TODO: merge this with getActiveThreadStateCount: they are the same!
synchronized int getMaxThreadStates() {
return threadStates.size();
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/76fb616b/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterDeleteQueue.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterDeleteQueue.java b/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterDeleteQueue.java
index 51e17cf..c60f54d 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterDeleteQueue.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterDeleteQueue.java
@@ -43,8 +43,8 @@ public class TestDocumentsWriterDeleteQueue extends LuceneTestCase {
}
DeleteSlice slice1 = queue.newSlice();
DeleteSlice slice2 = queue.newSlice();
- BufferedUpdates bd1 = new BufferedUpdates();
- BufferedUpdates bd2 = new BufferedUpdates();
+ BufferedUpdates bd1 = new BufferedUpdates("bd1");
+ BufferedUpdates bd2 = new BufferedUpdates("bd2");
int last1 = 0;
int last2 = 0;
Set<Term> uniqueValues = new HashSet<>();
@@ -225,7 +225,7 @@ public class TestDocumentsWriterDeleteQueue extends LuceneTestCase {
this.index = index;
this.ids = ids;
this.slice = queue.newSlice();
- deletes = new BufferedUpdates();
+ deletes = new BufferedUpdates("deletes");
this.latch = latch;
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/76fb616b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterConfig.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterConfig.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterConfig.java
index 2ffdce7..ec033d4 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterConfig.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterConfig.java
@@ -93,7 +93,6 @@ public class TestIndexWriterConfig extends LuceneTestCase {
getters.add("getIndexingChain");
getters.add("getMergedSegmentWarmer");
getters.add("getMergePolicy");
- getters.add("getMaxThreadStates");
getters.add("getReaderPooling");
getters.add("getIndexerThreadPool");
getters.add("getFlushPolicy");
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/76fb616b/lucene/core/src/test/org/apache/lucene/index/TestIndexingSequenceNumbers.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexingSequenceNumbers.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexingSequenceNumbers.java
index 002292c..52c05d3 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestIndexingSequenceNumbers.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexingSequenceNumbers.java
@@ -30,9 +30,11 @@ import org.apache.lucene.document.Field;
import org.apache.lucene.document.StoredField;
import org.apache.lucene.document.StringField;
import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.store.Directory;
+import org.apache.lucene.util.Bits;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.TestUtil;
@@ -91,7 +93,13 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
doc.add(new StringField("id", "id", Field.Store.NO));
startingGun.await();
for(int j=0;j<100;j++) {
- seqNos[threadID] = w.updateDocument(id, doc);
+ if (random().nextBoolean()) {
+ seqNos[threadID] = w.updateDocument(id, doc);
+ } else {
+ List<Document> docs = new ArrayList<>();
+ docs.add(doc);
+ seqNos[threadID] = w.updateDocuments(id, docs);
+ }
}
} catch (Exception e) {
throw new RuntimeException(e);
@@ -147,7 +155,7 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
// Cannot use RIW since it randomly commits:
final IndexWriter w = new IndexWriter(dir, iwc);
- final int numThreads = TestUtil.nextInt(random(), 2, 5);
+ final int numThreads = TestUtil.nextInt(random(), 2, 10);
Thread[] threads = new Thread[numThreads];
//System.out.println("TEST: iter=" + iter + " opCount=" + opCount + " idCount=" + idCount + " threadCount=" + threads.length);
final CountDownLatch startingGun = new CountDownLatch(1);
@@ -265,7 +273,7 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
Document doc = r.document(hits.scoreDocs[0].doc);
int actualThreadID = doc.getField("thread").numericValue().intValue();
if (expectedThreadIDs[id] != actualThreadID) {
- System.out.println("FAIL: id=" + id + " expectedThreadID=" + expectedThreadIDs[id] + " vs actualThreadID=" + actualThreadID);
+ System.out.println("FAIL: id=" + id + " expectedThreadID=" + expectedThreadIDs[id] + " vs actualThreadID=" + actualThreadID + " commitSeqNo=" + commitSeqNo + " numThreads=" + numThreads);
for(int threadID=0;threadID<threadOps.size();threadID++) {
for(Operation op : threadOps.get(threadID)) {
if (id == op.id) {
@@ -276,7 +284,7 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
assertEquals("id=" + id, expectedThreadIDs[id], actualThreadID);
}
} else if (hits.totalHits != 0) {
- System.out.println("FAIL: id=" + id + " expectedThreadID=" + expectedThreadIDs[id] + " vs totalHits=" + hits.totalHits);
+ System.out.println("FAIL: id=" + id + " expectedThreadID=" + expectedThreadIDs[id] + " vs totalHits=" + hits.totalHits + " commitSeqNo=" + commitSeqNo + " numThreads=" + numThreads);
for(int threadID=0;threadID<threadOps.size();threadID++) {
for(Operation op : threadOps.get(threadID)) {
if (id == op.id) {
@@ -347,7 +355,7 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
}
} else {
Document doc = new Document();
- doc.add(new StoredField("thread", threadID));
+ doc.add(new StoredField("threadop", threadID + "-" + ops.size()));
doc.add(new StringField("id", "" + op.id, Field.Store.NO));
if (random().nextBoolean()) {
List<Document> docs = new ArrayList<>();
@@ -366,6 +374,7 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
}
}
};
+ threads[i].setName("thread" + threadID);
threads[i].start();
}
startingGun.countDown();
@@ -422,7 +431,34 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
for(int id=0;id<idCount;id++) {
//System.out.println("TEST: check id=" + id + " expectedThreadID=" + expectedThreadIDs[id]);
- assertEquals(expectedCounts[id], s.count(new TermQuery(new Term("id", ""+id))));
+ int actualCount = s.count(new TermQuery(new Term("id", ""+id)));
+ if (expectedCounts[id] != actualCount) {
+ System.out.println("TEST: FAIL r=" + r + " id=" + id + " commitSeqNo=" + commitSeqNo);
+ for(int threadID=0;threadID<threadOps.size();threadID++) {
+ int opCount2 = 0;
+ for(Operation op : threadOps.get(threadID)) {
+ if (op.id == id) {
+ boolean shouldCount = op.seqNo <= commitSeqNo && op.seqNo > lastDelSeqNos[op.id];
+ System.out.println(" id=" + id + " what=" + op.what + " threadop=" + threadID + "-" + opCount2 + " seqNo=" + op.seqNo + " vs lastDelSeqNo=" + lastDelSeqNos[op.id] + " shouldCount=" + shouldCount);
+ }
+ opCount2++;
+ }
+ }
+ TopDocs hits = s.search(new TermQuery(new Term("id", ""+id)), 1+actualCount);
+ for(ScoreDoc hit : hits.scoreDocs) {
+ System.out.println(" hit: " + s.doc(hit.doc).get("threadop"));
+ }
+
+ for(LeafReaderContext ctx : r.leaves()) {
+ System.out.println(" sub=" + ctx.reader());
+ Bits liveDocs = ctx.reader().getLiveDocs();
+ for(int docID=0;docID<ctx.reader().maxDoc();docID++) {
+ System.out.println(" docID=" + docID + " threadop=" + ctx.reader().document(docID).get("threadop") + (liveDocs != null && liveDocs.get(docID) == false ? " (deleted)" : ""));
+ }
+ }
+
+ assertEquals("commit " + i + " of " + commits.size() + " id=" + id + " reader=" + r, expectedCounts[id], actualCount);
+ }
}
w.close();
r.close();
@@ -442,4 +478,6 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
w.close();
dir.close();
}
+
+ // nocommit test doc values updates
}