You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by mi...@apache.org on 2016/06/05 09:51:10 UTC
[09/19] lucene-solr:master: sequence numbers: always increment seq no
(even for addDocument/s); add tests; add javadocs; make DWDQ's seqNo private
sequence numbers: always increment seq no (even for addDocument/s); add tests; add javadocs; make DWDQ's seqNo private
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/59311a44
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/59311a44
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/59311a44
Branch: refs/heads/master
Commit: 59311a445c1552340ba25a0e3e45bac55b3abdbd
Parents: 34673ad
Author: Mike McCandless <mi...@apache.org>
Authored: Fri May 27 06:11:07 2016 -0400
Committer: Mike McCandless <mi...@apache.org>
Committed: Fri May 27 06:11:07 2016 -0400
----------------------------------------------------------------------
.../apache/lucene/index/DocumentsWriter.java | 2 +-
.../index/DocumentsWriterDeleteQueue.java | 22 ++-
.../index/DocumentsWriterFlushControl.java | 4 +-
.../lucene/index/DocumentsWriterPerThread.java | 4 +-
.../org/apache/lucene/index/IndexWriter.java | 90 +++++++++--
.../index/TestIndexingSequenceNumbers.java | 155 ++++++++++++++++++-
6 files changed, 249 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/59311a44/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
index 9f1bdd3..5630fbb 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
@@ -259,7 +259,7 @@ final class DocumentsWriter implements Closeable, Accountable {
deleteQueue.clear();
// jump over any possible in flight ops:
- deleteQueue.seqNo.addAndGet(perThreadPool.getActiveThreadStateCount()+1);
+ deleteQueue.skipSequenceNumbers(perThreadPool.getActiveThreadStateCount()+1);
flushControl.abortPendingFlushes();
flushControl.waitForFlush();
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/59311a44/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java
index 80d2c85..4a11599 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java
@@ -84,7 +84,7 @@ final class DocumentsWriterDeleteQueue implements Accountable {
final long generation;
/** Generates the sequence number that IW returns to callers changing the index, showing the effective serialization of all operations. */
- final AtomicLong seqNo;
+ private final AtomicLong nextSeqNo;
DocumentsWriterDeleteQueue() {
// seqNo must start at 1 because some APIs negate this to also return a boolean
@@ -98,7 +98,7 @@ final class DocumentsWriterDeleteQueue implements Accountable {
DocumentsWriterDeleteQueue(BufferedUpdates globalBufferedUpdates, long generation, long startSeqNo) {
this.globalBufferedUpdates = globalBufferedUpdates;
this.generation = generation;
- this.seqNo = new AtomicLong(startSeqNo);
+ this.nextSeqNo = new AtomicLong(startSeqNo);
/*
* we use a sentinel instance as our initial tail. No slice will ever try to
* apply this tail since the head is always omitted.
@@ -168,10 +168,10 @@ final class DocumentsWriterDeleteQueue implements Accountable {
/*
* now that we are done we need to advance the tail
*/
- long mySeqNo = seqNo.getAndIncrement();
+ long seqNo = getNextSequenceNumber();
boolean result = tailUpdater.compareAndSet(this, currentTail, newNode);
assert result;
- return mySeqNo;
+ return seqNo;
}
}
}
@@ -460,6 +460,16 @@ final class DocumentsWriterDeleteQueue implements Accountable {
public String toString() {
return "DWDQ: [ generation: " + generation + " ]";
}
-
-
+
+ public long getNextSequenceNumber() {
+ return nextSeqNo.getAndIncrement();
+ }
+
+ public long getLastSequenceNumber() {
+ return nextSeqNo.get()-1;
+ }
+
+ public void skipSequenceNumbers(long jump) {
+ nextSeqNo.addAndGet(jump);
+ }
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/59311a44/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
index ffcb7dc..99bf8d8 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java
@@ -481,9 +481,9 @@ final class DocumentsWriterFlushControl implements Accountable {
// we do another full flush
//System.out.println("DWFC: fullFLush old seqNo=" + documentsWriter.deleteQueue.seqNo.get() + " activeThreadCount=" + perThreadPool.getActiveThreadStateCount());
- // Insert a gap in seqNo of current active thread count, in the worst case those threads now have one operation in flight. It's fine
+ // Insert a gap in seqNo of current active thread count, in the worst case each of those threads now have one operation in flight. It's fine
// if we have some sequence numbers that were never assigned:
- seqNo = documentsWriter.deleteQueue.seqNo.get() + perThreadPool.getActiveThreadStateCount();
+ seqNo = documentsWriter.deleteQueue.getLastSequenceNumber() + perThreadPool.getActiveThreadStateCount() + 2;
DocumentsWriterDeleteQueue newQueue = new DocumentsWriterDeleteQueue(flushingQueue.generation+1, seqNo+1);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/59311a44/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
index 5b1afa0..cf5694d 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
@@ -292,7 +292,7 @@ class DocumentsWriterPerThread {
deleteSlice.apply(pendingUpdates, numDocsInRAM-docCount);
return seqNo;
} else {
- seqNo = deleteQueue.seqNo.get();
+ seqNo = deleteQueue.getNextSequenceNumber();
}
return seqNo;
@@ -328,7 +328,7 @@ class DocumentsWriterPerThread {
assert deleteSlice.isTailItem(delTerm) : "expected the delete term as the tail item";
} else {
applySlice &= deleteQueue.updateSlice(deleteSlice);
- seqNo = deleteQueue.seqNo.get();
+ seqNo = deleteQueue.getNextSequenceNumber();
}
if (applySlice) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/59311a44/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
index 945399c..2bdfea7 100644
--- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
@@ -95,6 +95,14 @@ import org.apache.lucene.util.Version;
and then adds the entire document). When finished adding, deleting
and updating documents, {@link #close() close} should be called.</p>
+ <a name="sequence_numbers"></a>
+ <p>Each method that changes the index returns a {@code long} sequence number, which
+ expresses the effective order in which each change was applied.
+ {@link #commit} also returns a sequence number, describing which
+ changes are in the commit point and which are not. Sequence numbers
+ are transient (not saved into the index in any way) and only valid
+ within a single {@code IndexWriter} instance.</p>
+
<a name="flush"></a>
<p>These changes are buffered in memory and periodically
flushed to the {@link Directory} (during the above method
@@ -1288,6 +1296,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* replaced with the Unicode replacement character
* U+FFFD.</p>
*
+ * @return The <a href="#sequence_number">sequence number</a>
+ * for this operation
+ *
* @throws CorruptIndexException if the index is corrupt
* @throws IOException if there is a low-level IO error
*/
@@ -1327,6 +1338,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* and will likely break them up. Use such tools at your
* own risk!
*
+ * @return The <a href="#sequence_number">sequence number</a>
+ * for this operation
+ *
* @throws CorruptIndexException if the index is corrupt
* @throws IOException if there is a low-level IO error
*
@@ -1344,6 +1358,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
*
* See {@link #addDocuments(Iterable)}.
*
+ * @return The <a href="#sequence_number">sequence number</a>
+ * for this operation
+ *
* @throws CorruptIndexException if the index is corrupt
* @throws IOException if there is a low-level IO error
*
@@ -1441,7 +1458,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
}
//System.out.println(" yes " + info.info.name + " " + docID);
- return docWriter.deleteQueue.seqNo.getAndIncrement();
+ return docWriter.deleteQueue.getNextSequenceNumber();
}
} else {
//System.out.println(" no rld " + info.info.name + " " + docID);
@@ -1458,6 +1475,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* terms. All given deletes are applied and flushed atomically
* at the same time.
*
+ * @return The <a href="#sequence_number">sequence number</a>
+ * for this operation
+ *
* @param terms array of terms to identify the documents
* to be deleted
* @throws CorruptIndexException if the index is corrupt
@@ -1484,6 +1504,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* Deletes the document(s) matching any of the provided queries.
* All given deletes are applied and flushed atomically at the same time.
*
+ * @return The <a href="#sequence_number">sequence number</a>
+ * for this operation
+ *
* @param queries array of queries to identify the documents
* to be deleted
* @throws CorruptIndexException if the index is corrupt
@@ -1522,6 +1545,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* by a reader on the same index (flush may happen only after
* the add).
*
+ * @return The <a href="#sequence_number">sequence number</a>
+ * for this operation
+ *
* @param term the term to identify the document(s) to be
* deleted
* @param doc the document to be added
@@ -1566,6 +1592,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* field name of the {@link NumericDocValues} field
* @param value
* new value for the field
+ *
+ * @return The <a href="#sequence_number">sequence number</a>
+ * for this operation
+ *
* @throws CorruptIndexException
* if the index is corrupt
* @throws IOException
@@ -1606,6 +1636,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* field name of the {@link BinaryDocValues} field
* @param value
* new value for the field
+ *
+ * @return The <a href="#sequence_number">sequence number</a>
+ * for this operation
+ *
* @throws CorruptIndexException
* if the index is corrupt
* @throws IOException
@@ -1642,6 +1676,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
*
* @param updates
* the updates to apply
+ *
+ * @return The <a href="#sequence_number">sequence number</a>
+ * for this operation
+ *
* @throws CorruptIndexException
* if the index is corrupt
* @throws IOException
@@ -2256,6 +2294,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* threads are running {@link #forceMerge}, {@link #addIndexes(CodecReader[])}
* or {@link #forceMergeDeletes} methods, they may receive
* {@link MergePolicy.MergeAbortedException}s.
+ *
+ * @return The <a href="#sequence_number">sequence number</a>
+ * for this operation
*/
public long deleteAll() throws IOException {
ensureOpen();
@@ -2304,7 +2345,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
globalFieldNumberMap.clear();
success = true;
- return docWriter.deleteQueue.seqNo.get();
+ return docWriter.deleteQueue.getNextSequenceNumber();
} finally {
docWriter.unlockAllAfterAbortAll(this);
@@ -2542,6 +2583,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
*
* <p>This requires this index not be among those to be added.
*
+ * @return The <a href="#sequence_number">sequence number</a>
+ * for this operation
+ *
* @throws CorruptIndexException if the index is corrupt
* @throws IOException if there is a low-level IO error
* @throws IllegalArgumentException if addIndexes would cause
@@ -2559,6 +2603,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
boolean successTop = false;
+ long seqNo;
+
try {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "flush at addIndexes(Directory...)");
@@ -2630,6 +2676,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
// Now reserve the docs, just before we update SIS:
reserveDocs(totalMaxDoc);
+ seqNo = docWriter.deleteQueue.getNextSequenceNumber();
+
success = true;
} finally {
if (!success) {
@@ -2647,6 +2695,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
} catch (VirtualMachineError tragedy) {
tragicEvent(tragedy, "addIndexes(Directory...)");
+ // dead code but javac disagrees:
+ seqNo = -1;
} finally {
if (successTop) {
IOUtils.close(locks);
@@ -2656,8 +2706,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
}
maybeMerge();
- // no need to increment:
- return docWriter.deleteQueue.seqNo.get();
+ return seqNo;
}
/**
@@ -2682,6 +2731,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* {@code maxMergeAtOnce} parameter, you should pass that many readers in one
* call.
*
+ * @return The <a href="#sequence_number">sequence number</a>
+ * for this operation
+ *
* @throws CorruptIndexException
* if the index is corrupt
* @throws IOException
@@ -2697,6 +2749,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
Sort indexSort = config.getIndexSort();
+ long seqNo;
+
try {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "flush at addIndexes(CodecReader...)");
@@ -2731,8 +2785,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
rateLimiters.set(new MergeRateLimiter(null));
if (!merger.shouldMerge()) {
- // no need to increment:
- return docWriter.deleteQueue.seqNo.get();
+ return docWriter.deleteQueue.getNextSequenceNumber();
}
merger.merge(); // merge 'em
@@ -2751,8 +2804,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
// Safe: these files must exist
deleteNewFiles(infoPerCommit.files());
- // no need to increment:
- return docWriter.deleteQueue.seqNo.get();
+ return docWriter.deleteQueue.getNextSequenceNumber();
}
ensureOpen();
useCompoundFile = mergePolicy.useCompoundFile(segmentInfos, infoPerCommit, this);
@@ -2788,8 +2840,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
// Safe: these files must exist
deleteNewFiles(infoPerCommit.files());
- // no need to increment:
- return docWriter.deleteQueue.seqNo.get();
+ return docWriter.deleteQueue.getNextSequenceNumber();
}
ensureOpen();
@@ -2797,15 +2848,17 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
reserveDocs(numDocs);
segmentInfos.add(infoPerCommit);
+ seqNo = docWriter.deleteQueue.getNextSequenceNumber();
checkpoint();
}
} catch (VirtualMachineError tragedy) {
tragicEvent(tragedy, "addIndexes(CodecReader...)");
+ // dead code but javac disagrees:
+ seqNo = -1;
}
maybeMerge();
- // no need to increment:
- return docWriter.deleteQueue.seqNo.get();
+ return seqNo;
}
/** Copies the segment files as-is into the IndexWriter's directory. */
@@ -2873,6 +2926,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* <p>You can also just call {@link #commit()} directly
* without prepareCommit first in which case that method
* will internally call prepareCommit.
+ *
+ * @return The <a href="#sequence_number">sequence number</a>
+ * last operation in the commit. All sequence numbers <= this value
+ * will be reflected in the commit, and all others will not.
*/
@Override
public final long prepareCommit() throws IOException {
@@ -3069,6 +3126,10 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
* point, and all other operations will not. </p>
*
* @see #prepareCommit
+ *
+ * @return The <a href="#sequence_number">sequence number</a>
+ * last operation in the commit. All sequence numbers <= this value
+ * will be reflected in the commit, and all others will not.
*/
@Override
public final long commit() throws IOException {
@@ -4988,11 +5049,12 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
};
}
- /** Returns the last sequence number.
+ /** Returns the last <a href="#sequence_number">sequence number</a>, or 0
+ * if no index-changing operations have completed yet.
*
* @lucene.experimental */
public long getLastSequenceNumber() {
ensureOpen();
- return docWriter.deleteQueue.seqNo.get()-1;
+ return docWriter.deleteQueue.getLastSequenceNumber();
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/59311a44/lucene/core/src/test/org/apache/lucene/index/TestIndexingSequenceNumbers.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexingSequenceNumbers.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexingSequenceNumbers.java
index fb9b9ab..002292c 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestIndexingSequenceNumbers.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexingSequenceNumbers.java
@@ -43,7 +43,7 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
IndexWriter w = new IndexWriter(dir, newIndexWriterConfig());
long a = w.addDocument(new Document());
long b = w.addDocument(new Document());
- assertTrue(b >= a);
+ assertTrue(b > a);
w.close();
dir.close();
}
@@ -129,7 +129,7 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
}
static class Operation {
- // 0 = update, 1 = delete, 2 = commit
+ // 0 = update, 1 = delete, 2 = commit, 3 = add
byte what;
int id;
int threadID;
@@ -248,7 +248,7 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
}
}
- assertTrue(op.seqNo >= lastSeqNo);
+ assertTrue(op.seqNo > lastSeqNo);
lastSeqNo = op.seqNo;
}
}
@@ -293,4 +293,153 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
dir.close();
}
+
+ public void testStressConcurrentAddAndDeleteAndCommit() throws Exception {
+ final int opCount = atLeast(10000);
+ final int idCount = TestUtil.nextInt(random(), 10, 1000);
+
+ Directory dir = newDirectory();
+ IndexWriterConfig iwc = newIndexWriterConfig();
+ iwc.setIndexDeletionPolicy(NoDeletionPolicy.INSTANCE);
+
+ // Cannot use RIW since it randomly commits:
+ final IndexWriter w = new IndexWriter(dir, iwc);
+
+ final int numThreads = TestUtil.nextInt(random(), 2, 5);
+ Thread[] threads = new Thread[numThreads];
+ //System.out.println("TEST: iter=" + iter + " opCount=" + opCount + " idCount=" + idCount + " threadCount=" + threads.length);
+ final CountDownLatch startingGun = new CountDownLatch(1);
+ List<List<Operation>> threadOps = new ArrayList<>();
+
+ Object commitLock = new Object();
+ final List<Operation> commits = new ArrayList<>();
+
+ // multiple threads update the same set of documents, and we randomly commit
+ for(int i=0;i<threads.length;i++) {
+ final List<Operation> ops = new ArrayList<>();
+ threadOps.add(ops);
+ final int threadID = i;
+ threads[i] = new Thread() {
+ @Override
+ public void run() {
+ try {
+ startingGun.await();
+ for(int i=0;i<opCount;i++) {
+ Operation op = new Operation();
+ op.threadID = threadID;
+ if (random().nextInt(500) == 17) {
+ op.what = 2;
+ synchronized(commitLock) {
+ op.seqNo = w.commit();
+ if (op.seqNo != -1) {
+ commits.add(op);
+ }
+ }
+ } else {
+ op.id = random().nextInt(idCount);
+ Term idTerm = new Term("id", "" + op.id);
+ if (random().nextInt(10) == 1) {
+ op.what = 1;
+ if (random().nextBoolean()) {
+ op.seqNo = w.deleteDocuments(idTerm);
+ } else {
+ op.seqNo = w.deleteDocuments(new TermQuery(idTerm));
+ }
+ } else {
+ Document doc = new Document();
+ doc.add(new StoredField("thread", threadID));
+ doc.add(new StringField("id", "" + op.id, Field.Store.NO));
+ if (random().nextBoolean()) {
+ List<Document> docs = new ArrayList<>();
+ docs.add(doc);
+ op.seqNo = w.addDocuments(docs);
+ } else {
+ op.seqNo = w.addDocument(doc);
+ }
+ op.what = 3;
+ }
+ ops.add(op);
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ threads[i].start();
+ }
+ startingGun.countDown();
+ for(Thread thread : threads) {
+ thread.join();
+ }
+
+ Operation commitOp = new Operation();
+ commitOp.seqNo = w.commit();
+ if (commitOp.seqNo != -1) {
+ commits.add(commitOp);
+ }
+
+ List<IndexCommit> indexCommits = DirectoryReader.listCommits(dir);
+ assertEquals(commits.size(), indexCommits.size());
+
+ // how many docs with this id are expected:
+ int[] expectedCounts = new int[idCount];
+ long[] lastDelSeqNos = new long[idCount];
+
+ //System.out.println("TEST: " + commits.size() + " commits");
+ for(int i=0;i<commits.size();i++) {
+ // this commit point should reflect all operations <= this seqNo
+ long commitSeqNo = commits.get(i).seqNo;
+ //System.out.println(" commit " + i + ": seqNo=" + commitSeqNo + " segs=" + indexCommits.get(i));
+
+ // first find the highest seqNo of the last delete op, for each id, prior to this commit:
+ Arrays.fill(lastDelSeqNos, -1);
+ for(int threadID=0;threadID<threadOps.size();threadID++) {
+ long lastSeqNo = 0;
+ for(Operation op : threadOps.get(threadID)) {
+ if (op.what == 1 && op.seqNo <= commitSeqNo && op.seqNo > lastDelSeqNos[op.id]) {
+ lastDelSeqNos[op.id] = op.seqNo;
+ }
+
+ // within one thread the seqNos must only increase:
+ assertTrue(op.seqNo > lastSeqNo);
+ lastSeqNo = op.seqNo;
+ }
+ }
+
+ // then count how many adds happened since the last delete and before this commit:
+ Arrays.fill(expectedCounts, 0);
+ for(int threadID=0;threadID<threadOps.size();threadID++) {
+ for(Operation op : threadOps.get(threadID)) {
+ if (op.what == 3 && op.seqNo <= commitSeqNo && op.seqNo > lastDelSeqNos[op.id]) {
+ expectedCounts[op.id]++;
+ }
+ }
+ }
+
+ DirectoryReader r = DirectoryReader.open(indexCommits.get(i));
+ IndexSearcher s = new IndexSearcher(r);
+
+ for(int id=0;id<idCount;id++) {
+ //System.out.println("TEST: check id=" + id + " expectedThreadID=" + expectedThreadIDs[id]);
+ assertEquals(expectedCounts[id], s.count(new TermQuery(new Term("id", ""+id))));
+ }
+ w.close();
+ r.close();
+ }
+
+ dir.close();
+ }
+
+ public void testDeleteAll() throws Exception {
+ Directory dir = newDirectory();
+ IndexWriter w = new IndexWriter(dir, newIndexWriterConfig());
+ long a = w.addDocument(new Document());
+ long b = w.deleteAll();
+ assertTrue(a < b);
+ long c = w.commit();
+ assertTrue(b < c);
+ w.close();
+ dir.close();
+ }
}