You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by mi...@apache.org on 2016/06/05 09:51:14 UTC
[13/19] lucene-solr:master: sequence numbers: add test case for
updating numeric doc values
sequence numbers: add test case for updating numeric doc values
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/671de296
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/671de296
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/671de296
Branch: refs/heads/master
Commit: 671de29635400d7eaba6abe12160bb6f8671ebe8
Parents: 72115c8
Author: Mike McCandless <mi...@apache.org>
Authored: Wed Jun 1 18:50:14 2016 -0400
Committer: Mike McCandless <mi...@apache.org>
Committed: Wed Jun 1 18:50:14 2016 -0400
----------------------------------------------------------------------
.../index/TestIndexingSequenceNumbers.java | 184 +++++++++++++++++--
1 file changed, 172 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/671de296/lucene/core/src/test/org/apache/lucene/index/TestIndexingSequenceNumbers.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexingSequenceNumbers.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexingSequenceNumbers.java
index 52c05d3..23389dd 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestIndexingSequenceNumbers.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexingSequenceNumbers.java
@@ -17,16 +17,9 @@
package org.apache.lucene.index;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicInteger;
-
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
+import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.document.StoredField;
import org.apache.lucene.document.StringField;
import org.apache.lucene.search.IndexSearcher;
@@ -38,6 +31,14 @@ import org.apache.lucene.util.Bits;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.TestUtil;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
public class TestIndexingSequenceNumbers extends LuceneTestCase {
public void testBasic() throws Exception {
@@ -206,7 +207,7 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
} else {
op.seqNo = w.updateDocument(idTerm, doc);
}
- op.what = 2;
+ op.what = 0;
}
ops.add(op);
}
@@ -249,7 +250,7 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
for(Operation op : threadOps.get(threadID)) {
if (op.seqNo <= commitSeqNo && op.seqNo > seqNos[op.id]) {
seqNos[op.id] = op.seqNo;
- if (op.what == 2) {
+ if (op.what == 0) {
expectedThreadIDs[op.id] = threadID;
} else {
expectedThreadIDs[op.id] = -1;
@@ -302,6 +303,167 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
dir.close();
}
+ public void testStressConcurrentDocValuesUpdatesCommit() throws Exception {
+ final int opCount = atLeast(10000);
+ final int idCount = TestUtil.nextInt(random(), 10, 1000);
+
+ Directory dir = newDirectory();
+ IndexWriterConfig iwc = newIndexWriterConfig();
+ iwc.setIndexDeletionPolicy(NoDeletionPolicy.INSTANCE);
+
+ // Cannot use RIW since it randomly commits:
+ final IndexWriter w = new IndexWriter(dir, iwc);
+
+ final int numThreads = TestUtil.nextInt(random(), 2, 10);
+ Thread[] threads = new Thread[numThreads];
+ //System.out.println("TEST: iter=" + iter + " opCount=" + opCount + " idCount=" + idCount + " threadCount=" + threads.length);
+ final CountDownLatch startingGun = new CountDownLatch(1);
+ List<List<Operation>> threadOps = new ArrayList<>();
+
+ Object commitLock = new Object();
+ final List<Operation> commits = new ArrayList<>();
+
+ List<Operation> ops1 = new ArrayList<>();
+ threadOps.add(ops1);
+
+ for(int id=0;id<idCount;id++) {
+ int threadID = 0;
+ Operation op = new Operation();
+ op.threadID = threadID;
+ op.id = id;
+
+ Document doc = new Document();
+ doc.add(new StoredField("thread", threadID));
+ doc.add(new NumericDocValuesField("thread", threadID));
+ doc.add(new StringField("id", "" + id, Field.Store.NO));
+ op.seqNo = w.addDocument(doc);
+ ops1.add(op);
+ }
+
+ // multiple threads update the same set of documents, and we randomly commit
+ for(int i=0;i<threads.length;i++) {
+ final List<Operation> ops;
+ if (i == 0) {
+ ops = threadOps.get(0);
+ } else {
+ ops = new ArrayList<>();
+ threadOps.add(ops);
+ }
+
+ final int threadID = i;
+ threads[i] = new Thread() {
+ @Override
+ public void run() {
+ try {
+ startingGun.await();
+ for(int i=0;i<opCount;i++) {
+ Operation op = new Operation();
+ op.threadID = threadID;
+ if (random().nextInt(500) == 17) {
+ op.what = 2;
+ synchronized(commitLock) {
+ op.seqNo = w.commit();
+ if (op.seqNo != -1) {
+ commits.add(op);
+ }
+ }
+ } else {
+ op.id = random().nextInt(idCount);
+ Term idTerm = new Term("id", "" + op.id);
+ op.seqNo = w.updateNumericDocValue(idTerm, "thread", threadID);
+ op.what = 0;
+ ops.add(op);
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ threads[i].start();
+ }
+ startingGun.countDown();
+ for(Thread thread : threads) {
+ thread.join();
+ }
+
+ Operation commitOp = new Operation();
+ commitOp.seqNo = w.commit();
+ if (commitOp.seqNo != -1) {
+ commits.add(commitOp);
+ }
+
+ List<IndexCommit> indexCommits = DirectoryReader.listCommits(dir);
+ assertEquals(commits.size(), indexCommits.size());
+
+ int[] expectedThreadIDs = new int[idCount];
+ long[] seqNos = new long[idCount];
+
+ //System.out.println("TEST: " + commits.size() + " commits");
+ for(int i=0;i<commits.size();i++) {
+ // this commit point should reflect all operations <= this seqNo
+ long commitSeqNo = commits.get(i).seqNo;
+ //System.out.println(" commit " + i + ": seqNo=" + commitSeqNo + " segs=" + indexCommits.get(i));
+
+ Arrays.fill(expectedThreadIDs, -1);
+ Arrays.fill(seqNos, 0);
+
+ for(int threadID=0;threadID<threadOps.size();threadID++) {
+ long lastSeqNo = 0;
+ for(Operation op : threadOps.get(threadID)) {
+ if (op.seqNo <= commitSeqNo && op.seqNo > seqNos[op.id]) {
+ seqNos[op.id] = op.seqNo;
+ if (op.what == 0) {
+ expectedThreadIDs[op.id] = threadID;
+ }
+ }
+
+ assertTrue(op.seqNo > lastSeqNo);
+ lastSeqNo = op.seqNo;
+ }
+ }
+
+ DirectoryReader r = DirectoryReader.open(indexCommits.get(i));
+ IndexSearcher s = new IndexSearcher(r);
+ NumericDocValues docValues = MultiDocValues.getNumericValues(r, "thread");
+
+ for(int id=0;id<idCount;id++) {
+ //System.out.println("TEST: check id=" + id + " expectedThreadID=" + expectedThreadIDs[id]);
+ TopDocs hits = s.search(new TermQuery(new Term("id", ""+id)), 1);
+
+ if (expectedThreadIDs[id] != -1) {
+ assertEquals(1, hits.totalHits);
+ int actualThreadID = (int) docValues.get(id);
+ if (expectedThreadIDs[id] != actualThreadID) {
+ System.out.println("FAIL: id=" + id + " expectedThreadID=" + expectedThreadIDs[id] + " vs actualThreadID=" + actualThreadID + " commitSeqNo=" + commitSeqNo + " numThreads=" + numThreads);
+ for(int threadID=0;threadID<threadOps.size();threadID++) {
+ for(Operation op : threadOps.get(threadID)) {
+ if (id == op.id) {
+ System.out.println(" threadID=" + threadID + " seqNo=" + op.seqNo + " " + (op.what == 2 ? "updated" : "deleted"));
+ }
+ }
+ }
+ assertEquals("id=" + id, expectedThreadIDs[id], actualThreadID);
+ }
+ } else if (hits.totalHits != 0) {
+ System.out.println("FAIL: id=" + id + " expectedThreadID=" + expectedThreadIDs[id] + " vs totalHits=" + hits.totalHits + " commitSeqNo=" + commitSeqNo + " numThreads=" + numThreads);
+ for(int threadID=0;threadID<threadOps.size();threadID++) {
+ for(Operation op : threadOps.get(threadID)) {
+ if (id == op.id) {
+ System.out.println(" threadID=" + threadID + " seqNo=" + op.seqNo + " " + (op.what == 2 ? "updated" : "del"));
+ }
+ }
+ }
+ assertEquals(0, hits.totalHits);
+ }
+ }
+ w.close();
+ r.close();
+ }
+
+ dir.close();
+ }
+
public void testStressConcurrentAddAndDeleteAndCommit() throws Exception {
final int opCount = atLeast(10000);
final int idCount = TestUtil.nextInt(random(), 10, 1000);
@@ -478,6 +640,4 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
w.close();
dir.close();
}
-
- // nocommit test doc values updates
}