You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by mi...@apache.org on 2016/06/05 09:51:14 UTC

[13/19] lucene-solr:master: sequence numbers: add test case for updating numeric doc values

sequence numbers: add test case for updating numeric doc values


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/671de296
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/671de296
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/671de296

Branch: refs/heads/master
Commit: 671de29635400d7eaba6abe12160bb6f8671ebe8
Parents: 72115c8
Author: Mike McCandless <mi...@apache.org>
Authored: Wed Jun 1 18:50:14 2016 -0400
Committer: Mike McCandless <mi...@apache.org>
Committed: Wed Jun 1 18:50:14 2016 -0400

----------------------------------------------------------------------
 .../index/TestIndexingSequenceNumbers.java      | 184 +++++++++++++++++--
 1 file changed, 172 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/671de296/lucene/core/src/test/org/apache/lucene/index/TestIndexingSequenceNumbers.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexingSequenceNumbers.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexingSequenceNumbers.java
index 52c05d3..23389dd 100644
--- a/lucene/core/src/test/org/apache/lucene/index/TestIndexingSequenceNumbers.java
+++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexingSequenceNumbers.java
@@ -17,16 +17,9 @@
 
 package org.apache.lucene.index;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicInteger;
-
 import org.apache.lucene.document.Document;
 import org.apache.lucene.document.Field;
+import org.apache.lucene.document.NumericDocValuesField;
 import org.apache.lucene.document.StoredField;
 import org.apache.lucene.document.StringField;
 import org.apache.lucene.search.IndexSearcher;
@@ -38,6 +31,14 @@ import org.apache.lucene.util.Bits;
 import org.apache.lucene.util.LuceneTestCase;
 import org.apache.lucene.util.TestUtil;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
 public class TestIndexingSequenceNumbers extends LuceneTestCase {
 
   public void testBasic() throws Exception {
@@ -206,7 +207,7 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
                     } else {
                       op.seqNo = w.updateDocument(idTerm, doc);
                     }
-                    op.what = 2;
+                    op.what = 0;
                   }
                   ops.add(op);
                 }
@@ -249,7 +250,7 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
         for(Operation op : threadOps.get(threadID)) {
           if (op.seqNo <= commitSeqNo && op.seqNo > seqNos[op.id]) {
             seqNos[op.id] = op.seqNo;
-            if (op.what == 2) {
+            if (op.what == 0) {
               expectedThreadIDs[op.id] = threadID;
             } else {
               expectedThreadIDs[op.id] = -1;
@@ -302,6 +303,167 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
     dir.close();
   }
 
+  public void testStressConcurrentDocValuesUpdatesCommit() throws Exception {
+    final int opCount = atLeast(10000);
+    final int idCount = TestUtil.nextInt(random(), 10, 1000);
+
+    Directory dir = newDirectory();
+    IndexWriterConfig iwc = newIndexWriterConfig();
+    iwc.setIndexDeletionPolicy(NoDeletionPolicy.INSTANCE);
+
+    // Cannot use RIW since it randomly commits:
+    final IndexWriter w = new IndexWriter(dir, iwc);
+
+    final int numThreads = TestUtil.nextInt(random(), 2, 10);
+    Thread[] threads = new Thread[numThreads];
+    //System.out.println("TEST: iter=" + iter + " opCount=" + opCount + " idCount=" + idCount + " threadCount=" + threads.length);
+    final CountDownLatch startingGun = new CountDownLatch(1);
+    List<List<Operation>> threadOps = new ArrayList<>();
+
+    Object commitLock = new Object();
+    final List<Operation> commits = new ArrayList<>();
+
+    List<Operation> ops1 = new ArrayList<>();
+    threadOps.add(ops1);
+    
+    for(int id=0;id<idCount;id++) {
+      int threadID = 0;
+      Operation op = new Operation();
+      op.threadID = threadID;
+      op.id = id;
+
+      Document doc = new Document();
+      doc.add(new StoredField("thread", threadID));
+      doc.add(new NumericDocValuesField("thread", threadID));
+      doc.add(new StringField("id", "" + id, Field.Store.NO));
+      op.seqNo = w.addDocument(doc);
+      ops1.add(op);
+    }
+
+    // multiple threads update the same set of documents, and we randomly commit
+    for(int i=0;i<threads.length;i++) {
+      final List<Operation> ops;
+      if (i == 0) {
+        ops = threadOps.get(0);
+      } else {
+        ops = new ArrayList<>();
+        threadOps.add(ops);
+      }
+
+      final int threadID = i;
+      threads[i] = new Thread() {
+          @Override
+          public void run() {
+            try {
+              startingGun.await();
+              for(int i=0;i<opCount;i++) {
+                Operation op = new Operation();
+                op.threadID = threadID;
+                if (random().nextInt(500) == 17) {
+                  op.what = 2;
+                  synchronized(commitLock) {
+                    op.seqNo = w.commit();
+                    if (op.seqNo != -1) {
+                      commits.add(op);
+                    }
+                  }
+                } else {
+                  op.id = random().nextInt(idCount);
+                  Term idTerm = new Term("id", "" + op.id);
+                  op.seqNo = w.updateNumericDocValue(idTerm, "thread", threadID);
+                  op.what = 0;
+                  ops.add(op);
+                }
+              }
+            } catch (Exception e) {
+              throw new RuntimeException(e);
+            }
+          }
+        };
+      threads[i].start();
+    }
+    startingGun.countDown();
+    for(Thread thread : threads) {
+      thread.join();
+    }
+
+    Operation commitOp = new Operation();
+    commitOp.seqNo = w.commit();
+    if (commitOp.seqNo != -1) {
+      commits.add(commitOp);
+    }
+
+    List<IndexCommit> indexCommits = DirectoryReader.listCommits(dir);
+    assertEquals(commits.size(), indexCommits.size());
+
+    int[] expectedThreadIDs = new int[idCount];
+    long[] seqNos = new long[idCount];
+      
+    //System.out.println("TEST: " + commits.size() + " commits");
+    for(int i=0;i<commits.size();i++) {
+      // this commit point should reflect all operations <= this seqNo
+      long commitSeqNo = commits.get(i).seqNo;
+      //System.out.println("  commit " + i + ": seqNo=" + commitSeqNo + " segs=" + indexCommits.get(i));
+
+      Arrays.fill(expectedThreadIDs, -1);
+      Arrays.fill(seqNos, 0);
+
+      for(int threadID=0;threadID<threadOps.size();threadID++) {
+        long lastSeqNo = 0;
+        for(Operation op : threadOps.get(threadID)) {
+          if (op.seqNo <= commitSeqNo && op.seqNo > seqNos[op.id]) {
+            seqNos[op.id] = op.seqNo;
+            if (op.what == 0) {
+              expectedThreadIDs[op.id] = threadID;
+            }
+          }
+
+          assertTrue(op.seqNo > lastSeqNo);
+          lastSeqNo = op.seqNo;
+        }
+      }
+
+      DirectoryReader r = DirectoryReader.open(indexCommits.get(i));
+      IndexSearcher s = new IndexSearcher(r);
+      NumericDocValues docValues = MultiDocValues.getNumericValues(r, "thread");
+
+      for(int id=0;id<idCount;id++) {
+        //System.out.println("TEST: check id=" + id + " expectedThreadID=" + expectedThreadIDs[id]);
+        TopDocs hits = s.search(new TermQuery(new Term("id", ""+id)), 1);
+                                  
+        if (expectedThreadIDs[id] != -1) {
+          assertEquals(1, hits.totalHits);
+          int actualThreadID = (int) docValues.get(id);
+          if (expectedThreadIDs[id] != actualThreadID) {
+            System.out.println("FAIL: id=" + id + " expectedThreadID=" + expectedThreadIDs[id] + " vs actualThreadID=" + actualThreadID + " commitSeqNo=" + commitSeqNo + " numThreads=" + numThreads);
+            for(int threadID=0;threadID<threadOps.size();threadID++) {
+              for(Operation op : threadOps.get(threadID)) {
+                if (id == op.id) {
+                  System.out.println("  threadID=" + threadID + " seqNo=" + op.seqNo + " " + (op.what == 2 ? "updated" : "deleted"));
+                }
+              }
+            }
+            assertEquals("id=" + id, expectedThreadIDs[id], actualThreadID);
+          }
+        } else if (hits.totalHits != 0) {
+          System.out.println("FAIL: id=" + id + " expectedThreadID=" + expectedThreadIDs[id] + " vs totalHits=" + hits.totalHits + " commitSeqNo=" + commitSeqNo + " numThreads=" + numThreads);
+          for(int threadID=0;threadID<threadOps.size();threadID++) {
+            for(Operation op : threadOps.get(threadID)) {
+              if (id == op.id) {
+                System.out.println("  threadID=" + threadID + " seqNo=" + op.seqNo + " " + (op.what == 2 ? "updated" : "del"));
+              }
+            }
+          }
+          assertEquals(0, hits.totalHits);
+        }
+      }
+      w.close();
+      r.close();
+    }
+
+    dir.close();
+  }
+
   public void testStressConcurrentAddAndDeleteAndCommit() throws Exception {
     final int opCount = atLeast(10000);
     final int idCount = TestUtil.nextInt(random(), 10, 1000);
@@ -478,6 +640,4 @@ public class TestIndexingSequenceNumbers extends LuceneTestCase {
     w.close();
     dir.close();
   }
-
-  // nocommit test doc values updates
 }