You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by mi...@apache.org on 2016/06/14 08:13:51 UTC
lucene-solr:branch_6x: LUCENE-7302: ensure
IW.getMaxCompletedSequenceNumber only reflects a change after NRT reader
refresh would also see it
Repository: lucene-solr
Updated Branches:
refs/heads/branch_6x 816b50202 -> 8ed16fd1f
LUCENE-7302: ensure IW.getMaxCompletedSequenceNumber only reflects a change after NRT reader refresh would also see it
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/8ed16fd1
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/8ed16fd1
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/8ed16fd1
Branch: refs/heads/branch_6x
Commit: 8ed16fd1f9a03c66d4ac81ddaa7ab70359410b95
Parents: 816b502
Author: Mike McCandless <mi...@apache.org>
Authored: Tue Jun 14 04:09:27 2016 -0400
Committer: Mike McCandless <mi...@apache.org>
Committed: Tue Jun 14 04:10:00 2016 -0400
----------------------------------------------------------------------
.../apache/lucene/index/DocumentsWriter.java | 40 +++++++++++++++-----
.../index/DocumentsWriterPerThreadPool.java | 3 ++
.../org/apache/lucene/index/IndexWriter.java | 10 ++---
.../search/ControlledRealTimeReopenThread.java | 3 +-
.../TestControlledRealTimeReopenThread.java | 20 +++++-----
5 files changed, 49 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8ed16fd1/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
index 13800a8..a33d640 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java
@@ -122,7 +122,7 @@ final class DocumentsWriter implements Closeable, Accountable {
final DocumentsWriterFlushControl flushControl;
private final IndexWriter writer;
private final Queue<Event> events;
-
+ private long lastSeqNo;
DocumentsWriter(IndexWriter writer, LiveIndexWriterConfig config, Directory directoryOrig, Directory directory) {
this.directoryOrig = directoryOrig;
@@ -144,6 +144,7 @@ final class DocumentsWriter implements Closeable, Accountable {
if (applyAllDeletes(deleteQueue)) {
seqNo = -seqNo;
}
+ lastSeqNo = Math.max(lastSeqNo, seqNo);
return seqNo;
}
@@ -158,6 +159,7 @@ final class DocumentsWriter implements Closeable, Accountable {
if (applyAllDeletes(deleteQueue)) {
seqNo = -seqNo;
}
+ lastSeqNo = Math.max(lastSeqNo, seqNo);
return seqNo;
}
@@ -168,7 +170,7 @@ final class DocumentsWriter implements Closeable, Accountable {
if (applyAllDeletes(deleteQueue)) {
seqNo = -seqNo;
}
-
+ lastSeqNo = Math.max(lastSeqNo, seqNo);
return seqNo;
}
@@ -317,6 +319,17 @@ final class DocumentsWriter implements Closeable, Accountable {
}
}
+ /** returns the maximum sequence number for all previously completed operations */
+ public long getMaxCompletedSequenceNumber() {
+ long value = lastSeqNo;
+ int limit = perThreadPool.getMaxThreadStates();
+ for(int i = 0; i < limit; i++) {
+ ThreadState perThread = perThreadPool.getThreadState(i);
+ value = Math.max(value, perThread.lastSeqNo);
+ }
+ return value;
+ }
+
boolean anyChanges() {
/*
* changes are either in a DWPT or in the deleteQueue.
@@ -413,7 +426,7 @@ final class DocumentsWriter implements Closeable, Accountable {
final ThreadState perThread = flushControl.obtainAndLock();
final DocumentsWriterPerThread flushingDWPT;
- final long seqNo;
+ long seqNo;
try {
// This must happen after we've pulled the ThreadState because IW.close
@@ -437,15 +450,18 @@ final class DocumentsWriter implements Closeable, Accountable {
}
final boolean isUpdate = delTerm != null;
flushingDWPT = flushControl.doAfterDocument(perThread, isUpdate);
+
+ assert seqNo > perThread.lastSeqNo: "seqNo=" + seqNo + " lastSeqNo=" + perThread.lastSeqNo;
+ perThread.lastSeqNo = seqNo;
+
} finally {
perThreadPool.release(perThread);
}
if (postUpdate(flushingDWPT, hasEvents)) {
- return -seqNo;
- } else {
- return seqNo;
+ seqNo = -seqNo;
}
+ return seqNo;
}
long updateDocument(final Iterable<? extends IndexableField> doc, final Analyzer analyzer,
@@ -456,7 +472,7 @@ final class DocumentsWriter implements Closeable, Accountable {
final ThreadState perThread = flushControl.obtainAndLock();
final DocumentsWriterPerThread flushingDWPT;
- final long seqNo;
+ long seqNo;
try {
// This must happen after we've pulled the ThreadState because IW.close
// waits for all ThreadStates to be released:
@@ -479,15 +495,19 @@ final class DocumentsWriter implements Closeable, Accountable {
}
final boolean isUpdate = delTerm != null;
flushingDWPT = flushControl.doAfterDocument(perThread, isUpdate);
+
+ assert seqNo > perThread.lastSeqNo: "seqNo=" + seqNo + " lastSeqNo=" + perThread.lastSeqNo;
+ perThread.lastSeqNo = seqNo;
+
} finally {
perThreadPool.release(perThread);
}
if (postUpdate(flushingDWPT, hasEvents)) {
- return -seqNo;
- } else {
- return seqNo;
+ seqNo = -seqNo;
}
+
+ return seqNo;
}
private boolean doFlush(DocumentsWriterPerThread flushingDWPT) throws IOException, AbortingException {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8ed16fd1/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
index 3802805..cc72342 100644
--- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
+++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThreadPool.java
@@ -59,6 +59,9 @@ final class DocumentsWriterPerThreadPool {
// write access guarded by DocumentsWriterFlushControl
long bytesUsed = 0;
+ // set by DocumentsWriter after each indexing op finishes
+ volatile long lastSeqNo;
+
ThreadState(DocumentsWriterPerThread dpwt) {
this.dwpt = dpwt;
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8ed16fd1/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
index b5e0c22..5fe1648 100644
--- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
@@ -1457,7 +1457,6 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
changed();
}
//System.out.println(" yes " + info.info.name + " " + docID);
-
return docWriter.deleteQueue.getNextSequenceNumber();
}
} else {
@@ -5049,12 +5048,13 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
};
}
- /** Returns the last <a href="#sequence_number">sequence number</a>, or 0
- * if no index-changing operations have completed yet.
+ /** Returns the highest <a href="#sequence_number">sequence number</a> across
+ * all completed operations, or 0 if no operations have finished yet. Still
+ * in-flight operations (in other threads) are not counted until they finish.
*
* @lucene.experimental */
- public long getLastSequenceNumber() {
+ public long getMaxCompletedSequenceNumber() {
ensureOpen();
- return docWriter.deleteQueue.getLastSequenceNumber();
+ return docWriter.getMaxCompletedSequenceNumber();
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8ed16fd1/lucene/core/src/java/org/apache/lucene/search/ControlledRealTimeReopenThread.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/search/ControlledRealTimeReopenThread.java b/lucene/core/src/java/org/apache/lucene/search/ControlledRealTimeReopenThread.java
index 466d793..a98a30d 100644
--- a/lucene/core/src/java/org/apache/lucene/search/ControlledRealTimeReopenThread.java
+++ b/lucene/core/src/java/org/apache/lucene/search/ControlledRealTimeReopenThread.java
@@ -150,7 +150,6 @@ public class ControlledRealTimeReopenThread<T> extends Thread implements Closeab
* or false if maxMS wait time was exceeded
*/
public synchronized boolean waitForGeneration(long targetGen, int maxMS) throws InterruptedException {
- final long curGen = writer.getLastSequenceNumber();
if (targetGen > searchingGen) {
// Notify the reopen thread that the waitingGen has
// changed, so it may wake up and realize it should
@@ -232,7 +231,7 @@ public class ControlledRealTimeReopenThread<T> extends Thread implements Closeab
// Save the gen as of when we started the reopen; the
// listener (HandleRefresh above) copies this to
// searchingGen once the reopen completes:
- refreshStartGen = writer.getLastSequenceNumber();
+ refreshStartGen = writer.getMaxCompletedSequenceNumber();
try {
manager.maybeRefreshBlocking();
} catch (IOException ioe) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8ed16fd1/lucene/core/src/test/org/apache/lucene/search/TestControlledRealTimeReopenThread.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/test/org/apache/lucene/search/TestControlledRealTimeReopenThread.java b/lucene/core/src/test/org/apache/lucene/search/TestControlledRealTimeReopenThread.java
index 69822a6..779c1f2 100644
--- a/lucene/core/src/test/org/apache/lucene/search/TestControlledRealTimeReopenThread.java
+++ b/lucene/core/src/test/org/apache/lucene/search/TestControlledRealTimeReopenThread.java
@@ -98,13 +98,13 @@ public class TestControlledRealTimeReopenThread extends ThreadedIndexingAndSearc
// Randomly verify the update "took":
if (random().nextInt(20) == 2) {
if (VERBOSE) {
- System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id);
+ System.out.println(Thread.currentThread().getName() + ": nrt: verify updateDocuments " + id + " gen=" + gen);
}
nrtDeletesThread.waitForGeneration(gen);
assertTrue(gen <= nrtDeletesThread.getSearchingGen());
final IndexSearcher s = nrtDeletes.acquire();
if (VERBOSE) {
- System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s);
+ System.out.println(Thread.currentThread().getName() + ": nrt: got deletes searcher=" + s);
}
try {
assertEquals(docs.size(), s.search(new TermQuery(id), 10).totalHits);
@@ -122,13 +122,13 @@ public class TestControlledRealTimeReopenThread extends ThreadedIndexingAndSearc
// Randomly verify the add "took":
if (random().nextInt(20) == 2) {
if (VERBOSE) {
- System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id);
+ System.out.println(Thread.currentThread().getName() + ": nrt: verify addDocuments " + id + " gen=" + gen);
}
nrtNoDeletesThread.waitForGeneration(gen);
assertTrue(gen <= nrtNoDeletesThread.getSearchingGen());
final IndexSearcher s = nrtNoDeletes.acquire();
if (VERBOSE) {
- System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s);
+ System.out.println(Thread.currentThread().getName() + ": nrt: got noDeletes searcher=" + s);
}
try {
assertEquals(docs.size(), s.search(new TermQuery(id), 10).totalHits);
@@ -146,13 +146,13 @@ public class TestControlledRealTimeReopenThread extends ThreadedIndexingAndSearc
// Randomly verify the add "took":
if (random().nextInt(20) == 2) {
if (VERBOSE) {
- System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id);
+ System.out.println(Thread.currentThread().getName() + ": nrt: verify addDocument " + id + " gen=" + gen);
}
nrtNoDeletesThread.waitForGeneration(gen);
assertTrue(gen <= nrtNoDeletesThread.getSearchingGen());
final IndexSearcher s = nrtNoDeletes.acquire();
if (VERBOSE) {
- System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s);
+ System.out.println(Thread.currentThread().getName() + ": nrt: got noDeletes searcher=" + s);
}
try {
assertEquals(1, s.search(new TermQuery(id), 10).totalHits);
@@ -169,13 +169,13 @@ public class TestControlledRealTimeReopenThread extends ThreadedIndexingAndSearc
// Randomly verify the udpate "took":
if (random().nextInt(20) == 2) {
if (VERBOSE) {
- System.out.println(Thread.currentThread().getName() + ": nrt: verify " + id);
+ System.out.println(Thread.currentThread().getName() + ": nrt: verify updateDocument " + id + " gen=" + gen);
}
nrtDeletesThread.waitForGeneration(gen);
assertTrue(gen <= nrtDeletesThread.getSearchingGen());
final IndexSearcher s = nrtDeletes.acquire();
if (VERBOSE) {
- System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s);
+ System.out.println(Thread.currentThread().getName() + ": nrt: got deletes searcher=" + s);
}
try {
assertEquals(1, s.search(new TermQuery(id), 10).totalHits);
@@ -192,13 +192,13 @@ public class TestControlledRealTimeReopenThread extends ThreadedIndexingAndSearc
// randomly verify the delete "took":
if (random().nextInt(20) == 7) {
if (VERBOSE) {
- System.out.println(Thread.currentThread().getName() + ": nrt: verify del " + id);
+ System.out.println(Thread.currentThread().getName() + ": nrt: verify deleteDocuments " + id + " gen=" + gen);
}
nrtDeletesThread.waitForGeneration(gen);
assertTrue(gen <= nrtDeletesThread.getSearchingGen());
final IndexSearcher s = nrtDeletes.acquire();
if (VERBOSE) {
- System.out.println(Thread.currentThread().getName() + ": nrt: got searcher=" + s);
+ System.out.println(Thread.currentThread().getName() + ": nrt: got deletes searcher=" + s);
}
try {
assertEquals(0, s.search(new TermQuery(id), 10).totalHits);