You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by mi...@apache.org on 2015/01/24 16:18:12 UTC
svn commit: r1654538 - in /lucene/dev/branches/lucene_solr_5_0: ./ lucene/
lucene/codecs/ lucene/codecs/src/java/org/apache/lucene/codecs/memory/
lucene/core/ lucene/core/src/java/org/apache/lucene/index/
lucene/core/src/test/org/apache/lucene/index/ l...
Author: mikemccand
Date: Sat Jan 24 15:18:12 2015
New Revision: 1654538
URL: http://svn.apache.org/r1654538
Log:
LUCENE-6197: CMS should not stall its own merge threads
Modified:
lucene/dev/branches/lucene_solr_5_0/ (props changed)
lucene/dev/branches/lucene_solr_5_0/lucene/ (props changed)
lucene/dev/branches/lucene_solr_5_0/lucene/codecs/ (props changed)
lucene/dev/branches/lucene_solr_5_0/lucene/codecs/src/java/org/apache/lucene/codecs/memory/MemoryDocValuesConsumer.java
lucene/dev/branches/lucene_solr_5_0/lucene/core/ (props changed)
lucene/dev/branches/lucene_solr_5_0/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java
lucene/dev/branches/lucene_solr_5_0/lucene/core/src/java/org/apache/lucene/index/MergeTrigger.java
lucene/dev/branches/lucene_solr_5_0/lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java
lucene/dev/branches/lucene_solr_5_0/lucene/test-framework/ (props changed)
lucene/dev/branches/lucene_solr_5_0/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java
Modified: lucene/dev/branches/lucene_solr_5_0/lucene/codecs/src/java/org/apache/lucene/codecs/memory/MemoryDocValuesConsumer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene_solr_5_0/lucene/codecs/src/java/org/apache/lucene/codecs/memory/MemoryDocValuesConsumer.java?rev=1654538&r1=1654537&r2=1654538&view=diff
==============================================================================
--- lucene/dev/branches/lucene_solr_5_0/lucene/codecs/src/java/org/apache/lucene/codecs/memory/MemoryDocValuesConsumer.java (original)
+++ lucene/dev/branches/lucene_solr_5_0/lucene/codecs/src/java/org/apache/lucene/codecs/memory/MemoryDocValuesConsumer.java Sat Jan 24 15:18:12 2015
@@ -305,6 +305,7 @@ class MemoryDocValuesConsumer extends Do
int maxLength = Integer.MIN_VALUE;
final long startFP = data.getFilePointer();
boolean missing = false;
+ int upto = 0;
for(BytesRef v : values) {
final int length;
if (v == null) {
@@ -314,8 +315,9 @@ class MemoryDocValuesConsumer extends Do
length = v.length;
}
if (length > MemoryDocValuesFormat.MAX_BINARY_FIELD_LENGTH) {
- throw new IllegalArgumentException("DocValuesField \"" + field.name + "\" is too large, must be <= " + MemoryDocValuesFormat.MAX_BINARY_FIELD_LENGTH);
+ throw new IllegalArgumentException("DocValuesField \"" + field.name + "\" is too large, must be <= " + MemoryDocValuesFormat.MAX_BINARY_FIELD_LENGTH + " but got length=" + length + " v=" + v + "; upto=" + upto + " values=" + values);
}
+ upto++;
minLength = Math.min(minLength, length);
maxLength = Math.max(maxLength, length);
if (v != null) {
Modified: lucene/dev/branches/lucene_solr_5_0/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene_solr_5_0/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java?rev=1654538&r1=1654537&r2=1654538&view=diff
==============================================================================
--- lucene/dev/branches/lucene_solr_5_0/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java (original)
+++ lucene/dev/branches/lucene_solr_5_0/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java Sat Jan 24 15:18:12 2015
@@ -214,9 +214,18 @@ public class ConcurrentMergeScheduler ex
return maxMergeCount;
}
- synchronized void removeMergeThread(MergeThread thread) {
- boolean result = mergeThreads.remove(thread);
- assert result;
+ /** Removes the calling thread from the active merge threads. */
+ synchronized void removeMergeThread() {
+ Thread currentThread = Thread.currentThread();
+ // Paranoia: don't trust Thread.equals:
+ for(int i=0;i<mergeThreads.size();i++) {
+ if (mergeThreads.get(i) == currentThread) {
+ mergeThreads.remove(i);
+ return;
+ }
+ }
+
+ assert false: "merge thread " + currentThread + " was not found";
}
/**
@@ -392,15 +401,16 @@ public class ConcurrentMergeScheduler ex
}
/**
- * Returns the number of merge threads that are alive. Note that this number
- * is ≤ {@link #mergeThreads} size.
+ * Returns the number of merge threads that are alive, ignoring the calling thread
+ * if it is a merge thread. Note that this number is ≤ {@link #mergeThreads} size.
*
* @lucene.internal
*/
public synchronized int mergeThreadCount() {
+ Thread currentThread = Thread.currentThread();
int count = 0;
for (MergeThread mergeThread : mergeThreads) {
- if (mergeThread.isAlive() && mergeThread.merge.rateLimiter.getAbort() == false) {
+ if (currentThread != mergeThread && mergeThread.isAlive() && mergeThread.merge.rateLimiter.getAbort() == false) {
count++;
}
}
@@ -436,7 +446,9 @@ public class ConcurrentMergeScheduler ex
// pending merges, until it's empty:
while (true) {
- maybeStall(writer);
+ if (maybeStall(writer) == false) {
+ break;
+ }
OneMerge merge = writer.getNextMerge();
if (merge == null) {
@@ -481,11 +493,16 @@ public class ConcurrentMergeScheduler ex
* many segments for merging to keep up, to wait until merges catch
* up. Applications that can take other less drastic measures, such
* as limiting how many threads are allowed to index, can do nothing
- * here and throttle elsewhere. */
+ * here and throttle elsewhere.
+ *
+ * If this method wants to stall but the calling thread is a merge
+ * thread, it should return false to tell caller not to kick off
+ * any new merges. */
- protected synchronized void maybeStall(IndexWriter writer) {
+ protected synchronized boolean maybeStall(IndexWriter writer) {
long startStallTime = 0;
while (writer.hasPendingMerges() && mergeThreadCount() >= maxMergeCount) {
+
// This means merging has fallen too far behind: we
// have already created maxMergeCount threads, and
// now there's at least one more merge pending.
@@ -495,22 +512,35 @@ public class ConcurrentMergeScheduler ex
// updateMergeThreads). We stall this producer
// thread to prevent creation of new segments,
// until merging has caught up:
+
+ if (mergeThreads.contains(Thread.currentThread())) {
+ // Never stall a merge thread since this blocks the thread from
+ // finishing and calling updateMergeThreads, and blocking it
+ // accomplishes nothing anyway (it's not really a segment producer):
+ return false;
+ }
+
if (verbose() && startStallTime == 0) {
message(" too many merges; stalling...");
}
startStallTime = System.currentTimeMillis();
- try {
- // Only wait 0.25 seconds, so if all merges are aborted (by IW.rollback) we notice:
- wait(250);
- } catch (InterruptedException ie) {
- throw new ThreadInterruptedException(ie);
- }
+ doStall();
}
- if (verbose()) {
- if (startStallTime != 0) {
- message(" stalled for " + (System.currentTimeMillis()-startStallTime) + " msec");
- }
+ if (verbose() && startStallTime != 0) {
+ message(" stalled for " + (System.currentTimeMillis()-startStallTime) + " msec");
+ }
+
+ return true;
+ }
+
+ /** Called from {@link #maybeStall} to pause the calling thread for a bit. */
+ protected synchronized void doStall() {
+ try {
+ // Defensively wait for only .25 seconds in case we are missing a .notify/All somewhere:
+ wait(250);
+ } catch (InterruptedException ie) {
+ throw new ThreadInterruptedException(ie);
}
}
@@ -560,8 +590,6 @@ public class ConcurrentMergeScheduler ex
message(" merge thread: done");
}
- removeMergeThread(this);
-
// Let CMS run new merges if necessary:
try {
merge(writer, MergeTrigger.MERGE_FINISHED, true);
@@ -583,6 +611,8 @@ public class ConcurrentMergeScheduler ex
} finally {
synchronized(ConcurrentMergeScheduler.this) {
+ removeMergeThread();
+
updateMergeThreads();
// In case we had stalled indexing, we can now wake up
Modified: lucene/dev/branches/lucene_solr_5_0/lucene/core/src/java/org/apache/lucene/index/MergeTrigger.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene_solr_5_0/lucene/core/src/java/org/apache/lucene/index/MergeTrigger.java?rev=1654538&r1=1654537&r2=1654538&view=diff
==============================================================================
--- lucene/dev/branches/lucene_solr_5_0/lucene/core/src/java/org/apache/lucene/index/MergeTrigger.java (original)
+++ lucene/dev/branches/lucene_solr_5_0/lucene/core/src/java/org/apache/lucene/index/MergeTrigger.java Sat Jan 24 15:18:12 2015
@@ -27,11 +27,13 @@ public enum MergeTrigger {
* Merge was triggered by a segment flush.
*/
SEGMENT_FLUSH,
+
/**
* Merge was triggered by a full flush. Full flushes
* can be caused by a commit, NRT reader reopen or a close call on the index writer.
*/
FULL_FLUSH,
+
/**
* Merge has been triggered explicitly by the user.
*/
Modified: lucene/dev/branches/lucene_solr_5_0/lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene_solr_5_0/lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java?rev=1654538&r1=1654537&r2=1654538&view=diff
==============================================================================
--- lucene/dev/branches/lucene_solr_5_0/lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java (original)
+++ lucene/dev/branches/lucene_solr_5_0/lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java Sat Jan 24 15:18:12 2015
@@ -460,7 +460,6 @@ public class TestConcurrentMergeSchedule
// No merges should have run so far, because TMP has high segmentsPerTier:
assertEquals(0, maxRunningMergeCount.get());
-
w.forceMerge(1);
// At most 5 merge threads should have launched at once:
@@ -489,8 +488,9 @@ public class TestConcurrentMergeSchedule
IndexWriterConfig iwc = newIndexWriterConfig(new MockAnalyzer(random()));
iwc.setMergeScheduler(new ConcurrentMergeScheduler() {
@Override
- protected void maybeStall(IndexWriter writer) {
+ protected boolean maybeStall(IndexWriter writer) {
wasCalled.set(true);
+ return true;
}
});
IndexWriter w = new IndexWriter(dir, iwc);
@@ -640,4 +640,42 @@ public class TestConcurrentMergeSchedule
assertTrue(threadCount <= 4);
assertEquals(5+threadCount, cms.getMaxMergeCount());
}
+
+ // LUCENE-6197
+ public void testNoStallMergeThreads() throws Exception {
+ MockDirectoryWrapper dir = newMockDirectory();
+
+ IndexWriterConfig iwc = newIndexWriterConfig(new MockAnalyzer(random()));
+ iwc.setMergePolicy(NoMergePolicy.INSTANCE);
+ iwc.setMaxBufferedDocs(2);
+ IndexWriter w = new IndexWriter(dir, iwc);
+ for(int i=0;i<1000;i++) {
+ Document doc = new Document();
+ doc.add(newStringField("field", ""+i, Field.Store.YES));
+ w.addDocument(doc);
+ }
+ w.close();
+
+ iwc = newIndexWriterConfig(new MockAnalyzer(random()));
+ final AtomicBoolean failed = new AtomicBoolean();
+ ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler() {
+ @Override
+ protected void doStall() {
+ if (Thread.currentThread().getName().startsWith("Lucene Merge Thread")) {
+ failed.set(true);
+ }
+ super.doStall();
+ }
+ };
+ cms.setMaxMergesAndThreads(2, 1);
+ iwc.setMergeScheduler(cms);
+ iwc.setMaxBufferedDocs(2);
+
+ w = new IndexWriter(dir, iwc);
+ w.forceMerge(1);
+ w.close();
+ dir.close();
+
+ assertFalse(failed.get());
+ }
}
Modified: lucene/dev/branches/lucene_solr_5_0/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene_solr_5_0/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java?rev=1654538&r1=1654537&r2=1654538&view=diff
==============================================================================
--- lucene/dev/branches/lucene_solr_5_0/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java (original)
+++ lucene/dev/branches/lucene_solr_5_0/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java Sat Jan 24 15:18:12 2015
@@ -58,8 +58,8 @@ import java.util.logging.Logger;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.document.Document;
-import org.apache.lucene.document.Field;
import org.apache.lucene.document.Field.Store;
+import org.apache.lucene.document.Field;
import org.apache.lucene.document.FieldType;
import org.apache.lucene.document.StringField;
import org.apache.lucene.document.TextField;
@@ -78,8 +78,8 @@ import org.apache.lucene.index.FieldInfo
import org.apache.lucene.index.FieldInfos;
import org.apache.lucene.index.Fields;
import org.apache.lucene.index.IndexOptions;
-import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexReader.ReaderClosedListener;
+import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.IndexableField;
@@ -107,8 +107,8 @@ import org.apache.lucene.index.SortedDoc
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.index.SortedSetDocValues;
import org.apache.lucene.index.Terms;
-import org.apache.lucene.index.TermsEnum;
import org.apache.lucene.index.TermsEnum.SeekStatus;
+import org.apache.lucene.index.TermsEnum;
import org.apache.lucene.index.TieredMergePolicy;
import org.apache.lucene.search.AssertingIndexSearcher;
import org.apache.lucene.search.DocIdSet;
@@ -125,8 +125,8 @@ import org.apache.lucene.store.FlushInfo
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.LockFactory;
import org.apache.lucene.store.MergeInfo;
-import org.apache.lucene.store.MockDirectoryWrapper;
import org.apache.lucene.store.MockDirectoryWrapper.Throttling;
+import org.apache.lucene.store.MockDirectoryWrapper;
import org.apache.lucene.store.NRTCachingDirectory;
import org.apache.lucene.util.automaton.AutomatonTestUtil;
import org.apache.lucene.util.automaton.CompiledAutomaton;
@@ -142,7 +142,6 @@ import org.junit.Test;
import org.junit.rules.RuleChain;
import org.junit.rules.TestRule;
import org.junit.runner.RunWith;
-
import com.carrotsearch.randomizedtesting.JUnit4MethodProvider;
import com.carrotsearch.randomizedtesting.LifecycleScope;
import com.carrotsearch.randomizedtesting.MixWithSuiteName;
@@ -153,16 +152,16 @@ import com.carrotsearch.randomizedtestin
import com.carrotsearch.randomizedtesting.annotations.SeedDecorators;
import com.carrotsearch.randomizedtesting.annotations.TestGroup;
import com.carrotsearch.randomizedtesting.annotations.TestMethodProviders;
-import com.carrotsearch.randomizedtesting.annotations.ThreadLeakAction;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakAction.Action;
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakAction;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
-import com.carrotsearch.randomizedtesting.annotations.ThreadLeakGroup;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakGroup.Group;
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakGroup;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakLingering;
-import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope.Scope;
-import com.carrotsearch.randomizedtesting.annotations.ThreadLeakZombies;
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakZombies.Consequence;
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakZombies;
import com.carrotsearch.randomizedtesting.annotations.TimeoutSuite;
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
import com.carrotsearch.randomizedtesting.rules.NoClassHooksShadowingRule;
@@ -912,7 +911,8 @@ public abstract class LuceneTestCase ext
} else {
cms = new ConcurrentMergeScheduler() {
@Override
- protected synchronized void maybeStall(IndexWriter writer) {
+ protected synchronized boolean maybeStall(IndexWriter writer) {
+ return true;
}
};
}