You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by no...@apache.org on 2016/06/16 12:09:10 UTC

[40/50] [abbrv] lucene-solr:apiv2: LUCENE-7302: ensure IW.getMaxCompletedSequenceNumber only reflects a change after NRT reader refresh would also see it

LUCENE-7302: ensure IW.getMaxCompletedSequenceNumber only reflects a change after NRT reader refresh would also see it


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/5a032168
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/5a032168
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/5a032168

Branch: refs/heads/apiv2
Commit: 5a0321680fe5e57a17470b824024d5b56a4cbaa4
Parents: 843adfb
Author: Mike McCandless <mi...@apache.org>
Authored: Tue Jun 14 04:09:27 2016 -0400
Committer: Mike McCandless <mi...@apache.org>
Committed: Tue Jun 14 04:09:27 2016 -0400

----------------------------------------------------------------------
 .../apache/lucene/index/DocumentsWriter.java    | 40 +++++++++++++++-----
 .../index/DocumentsWriterPerThreadPool.java     |  3 ++
 .../org/apache/lucene/index/IndexWriter.java    | 10 ++---
 .../search/ControlledRealTimeReopenThread.java  |  3 +-
 .../TestControlledRealTimeReopenThread.java     | 20 +++++-----
 5 files changed, 49 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/5a032168/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
index 13800a8..a33d640 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
@@ -122,7 +122,7 @@ final class DocumentsWriter implements Closeable, Accountable {
   final DocumentsWriterFlushControl flushControl;
   private final IndexWriter writer;
   private final Queue<Event> events;
-
+  private long lastSeqNo;
   
   DocumentsWriter(IndexWriter writer, LiveIndexWriterConfig config, Directory directoryOrig, Directory directory) {
     this.directoryOrig = directoryOrig;
@@ -144,6 +144,7 @@ final class DocumentsWriter implements Closeable, Accountable {
     if (applyAllDeletes(deleteQueue)) {
       seqNo = -seqNo;
     }
+    lastSeqNo = Math.max(lastSeqNo, seqNo);
     return seqNo;
   }
 
@@ -158,6 +159,7 @@ final class DocumentsWriter implements Closeable, Accountable {
     if (applyAllDeletes(deleteQueue)) {
       seqNo = -seqNo;
     }
+    lastSeqNo = Math.max(lastSeqNo, seqNo);
     return seqNo;
   }
 
@@ -168,7 +170,7 @@ final class DocumentsWriter implements Closeable, Accountable {
     if (applyAllDeletes(deleteQueue)) {
       seqNo = -seqNo;
     }
-
+    lastSeqNo = Math.max(lastSeqNo, seqNo);
     return seqNo;
   }
   
@@ -317,6 +319,17 @@ final class DocumentsWriter implements Closeable, Accountable {
     }
   }
 
+  /** returns the maximum sequence number for all previously completed operations */
+  public long getMaxCompletedSequenceNumber() {
+    long value = lastSeqNo;
+    int limit = perThreadPool.getMaxThreadStates();
+    for(int i = 0; i < limit; i++) {
+      ThreadState perThread = perThreadPool.getThreadState(i);
+      value = Math.max(value, perThread.lastSeqNo);
+    }
+    return value;
+  }
+
   boolean anyChanges() {
     /*
      * changes are either in a DWPT or in the deleteQueue.
@@ -413,7 +426,7 @@ final class DocumentsWriter implements Closeable, Accountable {
 
     final ThreadState perThread = flushControl.obtainAndLock();
     final DocumentsWriterPerThread flushingDWPT;
-    final long seqNo;
+    long seqNo;
 
     try {
       // This must happen after we've pulled the ThreadState because IW.close
@@ -437,15 +450,18 @@ final class DocumentsWriter implements Closeable, Accountable {
       }
       final boolean isUpdate = delTerm != null;
       flushingDWPT = flushControl.doAfterDocument(perThread, isUpdate);
+
+      assert seqNo > perThread.lastSeqNo: "seqNo=" + seqNo + " lastSeqNo=" + perThread.lastSeqNo;
+      perThread.lastSeqNo = seqNo;
+
     } finally {
       perThreadPool.release(perThread);
     }
 
     if (postUpdate(flushingDWPT, hasEvents)) {
-      return -seqNo;
-    } else {
-      return seqNo;
+      seqNo = -seqNo;
     }
+    return seqNo;
   }
 
   long updateDocument(final Iterable<? extends IndexableField> doc, final Analyzer analyzer,
@@ -456,7 +472,7 @@ final class DocumentsWriter implements Closeable, Accountable {
     final ThreadState perThread = flushControl.obtainAndLock();
 
     final DocumentsWriterPerThread flushingDWPT;
-    final long seqNo;
+    long seqNo;
     try {
       // This must happen after we've pulled the ThreadState because IW.close
       // waits for all ThreadStates to be released:
@@ -479,15 +495,19 @@ final class DocumentsWriter implements Closeable, Accountable {
       }
       final boolean isUpdate = delTerm != null;
       flushingDWPT = flushControl.doAfterDocument(perThread, isUpdate);
+
+      assert seqNo > perThread.lastSeqNo: "seqNo=" + seqNo + " lastSeqNo=" + perThread.lastSeqNo;
+      perThread.lastSeqNo = seqNo;
+
     } finally {
       perThreadPool.release(perThread);
     }
 
     if (postUpdate(flushingDWPT, hasEvents)) {
-      return -seqNo;
-    } else {
-      return seqNo;
+      seqNo = -seqNo;
     }
+    
+    return seqNo;
   }
 
   private boolean doFlush(DocumentsWriterPerThread flushingDWPT) throws IOException, AbortingException {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/5a032168/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
index 3802805..cc72342 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
@@ -59,6 +59,9 @@ final class DocumentsWriterPerThreadPool {
     // write access guarded by DocumentsWriterFlushControl
     long bytesUsed = 0;
 
+    // set by DocumentsWriter after each indexing op finishes
+    volatile long lastSeqNo;
+
     ThreadState(DocumentsWriterPerThread dpwt) {
       this.dwpt = dpwt;
     }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/5a032168/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
index b5e0c22..5fe1648 100644
--- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
@@ -1457,7 +1457,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
             changed();
           }
           //System.out.println("  yes " + info.info.name + " " + docID);
-
           return docWriter.deleteQueue.getNextSequenceNumber();
         }
       } else {
@@ -5049,12 +5048,13 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
     };
   }
 
-  /** Returns the last <a href="#sequence_number">sequence number</a>, or 0
-   *  if no index-changing operations have completed yet.
+  /** Returns the highest <a href="#sequence_number">sequence number</a> across
+   *  all completed operations, or 0 if no operations have finished yet.  Still
+   *  in-flight operations (in other threads) are not counted until they finish.
    *
    * @lucene.experimental */
-  public long getLastSequenceNumber() {
+  public long getMaxCompletedSequenceNumber() {
     ensureOpen();
-    return docWriter.deleteQueue.getLastSequenceNumber();
+    return docWriter.getMaxCompletedSequenceNumber();
   }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/5a032168/lucene/core/src/java/org/apache/lucene/search/ControlledRealTimeReopenThread.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/search/ControlledRealTimeReopenThread.java b/lucene/core/src/java/org/apache/lucene/search/ControlledRealTimeReopenThread.java
index 466d793..a98a30d 100644
--- a/lucene/core/src/java/org/apache/lucene/search/ControlledRealTimeReopenThread.java
+++ b/lucene/core/src/java/org/apache/lucene/search/ControlledRealTimeReopenThread.java
@@ -150,7 +150,6 @@ public class ControlledRealTimeReopenThread<T> extends Thread implements Closeab
    *         or false if maxMS wait time was exceeded
    */
   public synchronized boolean waitForGeneration(long targetGen, int maxMS) throws InterruptedException {
-    final long curGen = writer.getLastSequenceNumber();
     if (targetGen > searchingGen) {
       // Notify the reopen thread that the waitingGen has
       // changed, so it may wake up and realize it should
@@ -232,7 +231,7 @@ public class ControlledRealTimeReopenThread<T> extends Thread implements Closeab
       // Save the gen as of when we started the reopen; the
       // listener (HandleRefresh above) copies this to
       // searchingGen once the reopen completes:
-      refreshStartGen = writer.getLastSequenceNumber();
+      refreshStartGen = writer.getMaxCompletedSequenceNumber();
       try {
         manager.maybeRefreshBlocking();
       } catch (IOException ioe) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/5a032168/lucene/core/src/test/org/apache/lucene/search/TestControlledRealTimeReopenThread.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/test/org/apache/lucene/search/TestControlledRealTimeReopenThread.java b/lucene/core/src/test/org/apache/lucene/search/TestControlledRealTimeReopenThread.java
index 69822a6..779c1f2 100644
--- a/lucene/core/src/test/org/apache/lucene/search/TestControlledRealTimeReopenThread.java
+++ b/lucene/core/src/test/org/apache/lucene/search/TestControlledRealTimeReopenThread.java
@@ -98,13 +98,13 @@ public class TestControlledRealTimeReopenThread extends ThreadedIndexingAndSearc
     // Randomly verify the update "took":
     if (random().nextInt(20) == 2) {
       if (VERBOSE) {
-        System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id);
+        System.out.println(Thread.currentThread().getName() + ": nrt: verify updateDocuments " + id + " gen=" + gen);
       }
       nrtDeletesThread.waitForGeneration(gen);
       assertTrue(gen <= nrtDeletesThread.getSearchingGen());
       final IndexSearcher s = nrtDeletes.acquire();
       if (VERBOSE) {
-        System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s);
+        System.out.println(Thread.currentThread().getName() + ": nrt: got deletes searcher=" + s);
       }
       try {
         assertEquals(docs.size(), s.search(new TermQuery(id), 10).totalHits);
@@ -122,13 +122,13 @@ public class TestControlledRealTimeReopenThread extends ThreadedIndexingAndSearc
     // Randomly verify the add "took":
     if (random().nextInt(20) == 2) {
       if (VERBOSE) {
-        System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id);
+        System.out.println(Thread.currentThread().getName() + ": nrt: verify addDocuments " + id + " gen=" + gen);
       }
       nrtNoDeletesThread.waitForGeneration(gen);
       assertTrue(gen <= nrtNoDeletesThread.getSearchingGen());
       final IndexSearcher s = nrtNoDeletes.acquire();
       if (VERBOSE) {
-        System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s);
+        System.out.println(Thread.currentThread().getName() + ": nrt: got noDeletes searcher=" + s);
       }
       try {
         assertEquals(docs.size(), s.search(new TermQuery(id), 10).totalHits);
@@ -146,13 +146,13 @@ public class TestControlledRealTimeReopenThread extends ThreadedIndexingAndSearc
     // Randomly verify the add "took":
     if (random().nextInt(20) == 2) {
       if (VERBOSE) {
-        System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id);
+        System.out.println(Thread.currentThread().getName() + ": nrt: verify addDocument " + id + " gen=" + gen);
       }
       nrtNoDeletesThread.waitForGeneration(gen);
       assertTrue(gen <= nrtNoDeletesThread.getSearchingGen());
       final IndexSearcher s = nrtNoDeletes.acquire();
       if (VERBOSE) {
-        System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s);
+        System.out.println(Thread.currentThread().getName() + ": nrt: got noDeletes searcher=" + s);
       }
       try {
         assertEquals(1, s.search(new TermQuery(id), 10).totalHits);
@@ -169,13 +169,13 @@ public class TestControlledRealTimeReopenThread extends ThreadedIndexingAndSearc
     // Randomly verify the udpate "took":
     if (random().nextInt(20) == 2) {
       if (VERBOSE) {
-        System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id);
+        System.out.println(Thread.currentThread().getName() + ": nrt: verify updateDocument " + id + " gen=" + gen);
       }
       nrtDeletesThread.waitForGeneration(gen);
       assertTrue(gen <= nrtDeletesThread.getSearchingGen());
       final IndexSearcher s = nrtDeletes.acquire();
       if (VERBOSE) {
-        System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s);
+        System.out.println(Thread.currentThread().getName() + ": nrt: got deletes searcher=" + s);
       }
       try {
         assertEquals(1, s.search(new TermQuery(id), 10).totalHits);
@@ -192,13 +192,13 @@ public class TestControlledRealTimeReopenThread extends ThreadedIndexingAndSearc
     // randomly verify the delete "took":
     if (random().nextInt(20) == 7) {
       if (VERBOSE) {
-        System.out.println(Thread.currentThread().getName() + ": nrt: verify del " + id);
+        System.out.println(Thread.currentThread().getName() + ": nrt: verify deleteDocuments " + id + " gen=" + gen);
       }
       nrtDeletesThread.waitForGeneration(gen);
       assertTrue(gen <= nrtDeletesThread.getSearchingGen());
       final IndexSearcher s = nrtDeletes.acquire();
       if (VERBOSE) {
-        System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s);
+        System.out.println(Thread.currentThread().getName() + ": nrt: got deletes searcher=" + s);
       }
       try {
         assertEquals(0, s.search(new TermQuery(id), 10).totalHits);