You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by mi...@apache.org on 2015/01/24 16:18:12 UTC

svn commit: r1654538 - in /lucene/dev/branches/lucene_solr_5_0: ./ lucene/ lucene/codecs/ lucene/codecs/src/java/org/apache/lucene/codecs/memory/ lucene/core/ lucene/core/src/java/org/apache/lucene/index/ lucene/core/src/test/org/apache/lucene/index/ l...

Author: mikemccand
Date: Sat Jan 24 15:18:12 2015
New Revision: 1654538

URL: http://svn.apache.org/r1654538
Log:
LUCENE-6197: CMS should not stall its own merge threads

Modified:
    lucene/dev/branches/lucene_solr_5_0/   (props changed)
    lucene/dev/branches/lucene_solr_5_0/lucene/   (props changed)
    lucene/dev/branches/lucene_solr_5_0/lucene/codecs/   (props changed)
    lucene/dev/branches/lucene_solr_5_0/lucene/codecs/src/java/org/apache/lucene/codecs/memory/MemoryDocValuesConsumer.java
    lucene/dev/branches/lucene_solr_5_0/lucene/core/   (props changed)
    lucene/dev/branches/lucene_solr_5_0/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java
    lucene/dev/branches/lucene_solr_5_0/lucene/core/src/java/org/apache/lucene/index/MergeTrigger.java
    lucene/dev/branches/lucene_solr_5_0/lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java
    lucene/dev/branches/lucene_solr_5_0/lucene/test-framework/   (props changed)
    lucene/dev/branches/lucene_solr_5_0/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java

Modified: lucene/dev/branches/lucene_solr_5_0/lucene/codecs/src/java/org/apache/lucene/codecs/memory/MemoryDocValuesConsumer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene_solr_5_0/lucene/codecs/src/java/org/apache/lucene/codecs/memory/MemoryDocValuesConsumer.java?rev=1654538&r1=1654537&r2=1654538&view=diff
==============================================================================
--- lucene/dev/branches/lucene_solr_5_0/lucene/codecs/src/java/org/apache/lucene/codecs/memory/MemoryDocValuesConsumer.java (original)
+++ lucene/dev/branches/lucene_solr_5_0/lucene/codecs/src/java/org/apache/lucene/codecs/memory/MemoryDocValuesConsumer.java Sat Jan 24 15:18:12 2015
@@ -305,6 +305,7 @@ class MemoryDocValuesConsumer extends Do
     int maxLength = Integer.MIN_VALUE;
     final long startFP = data.getFilePointer();
     boolean missing = false;
+    int upto = 0;
     for(BytesRef v : values) {
       final int length;
       if (v == null) {
@@ -314,8 +315,9 @@ class MemoryDocValuesConsumer extends Do
         length = v.length;
       }
       if (length > MemoryDocValuesFormat.MAX_BINARY_FIELD_LENGTH) {
-        throw new IllegalArgumentException("DocValuesField \"" + field.name + "\" is too large, must be <= " + MemoryDocValuesFormat.MAX_BINARY_FIELD_LENGTH);
+        throw new IllegalArgumentException("DocValuesField \"" + field.name + "\" is too large, must be <= " + MemoryDocValuesFormat.MAX_BINARY_FIELD_LENGTH + " but got length=" + length + " v=" + v + "; upto=" + upto + " values=" + values);
       }
+      upto++;
       minLength = Math.min(minLength, length);
       maxLength = Math.max(maxLength, length);
       if (v != null) {

Modified: lucene/dev/branches/lucene_solr_5_0/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene_solr_5_0/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java?rev=1654538&r1=1654537&r2=1654538&view=diff
==============================================================================
--- lucene/dev/branches/lucene_solr_5_0/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java (original)
+++ lucene/dev/branches/lucene_solr_5_0/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java Sat Jan 24 15:18:12 2015
@@ -214,9 +214,18 @@ public class ConcurrentMergeScheduler ex
     return maxMergeCount;
   }
 
-  synchronized void removeMergeThread(MergeThread thread) {
-    boolean result = mergeThreads.remove(thread);
-    assert result;
+  /** Removes the calling thread from the active merge threads. */
+  synchronized void removeMergeThread() {
+    Thread currentThread = Thread.currentThread();
+    // Paranoia: don't trust Thread.equals:
+    for(int i=0;i<mergeThreads.size();i++) {
+      if (mergeThreads.get(i) == currentThread) {
+        mergeThreads.remove(i);
+        return;
+      }
+    }
+      
+    assert false: "merge thread " + currentThread + " was not found";
   }
 
   /**
@@ -392,15 +401,16 @@ public class ConcurrentMergeScheduler ex
   }
 
   /**
-   * Returns the number of merge threads that are alive. Note that this number
-   * is &le; {@link #mergeThreads} size.
+   * Returns the number of merge threads that are alive, ignoring the calling thread
+   * if it is a merge thread.  Note that this number is &le; {@link #mergeThreads} size.
    *
    * @lucene.internal
    */
   public synchronized int mergeThreadCount() {
+    Thread currentThread = Thread.currentThread();
     int count = 0;
     for (MergeThread mergeThread : mergeThreads) {
-      if (mergeThread.isAlive() && mergeThread.merge.rateLimiter.getAbort() == false) {
+      if (currentThread != mergeThread && mergeThread.isAlive() && mergeThread.merge.rateLimiter.getAbort() == false) {
         count++;
       }
     }
@@ -436,7 +446,9 @@ public class ConcurrentMergeScheduler ex
     // pending merges, until it's empty:
     while (true) {
 
-      maybeStall(writer);
+      if (maybeStall(writer) == false) {
+        break;
+      }
 
       OneMerge merge = writer.getNextMerge();
       if (merge == null) {
@@ -481,11 +493,16 @@ public class ConcurrentMergeScheduler ex
    *  many segments for merging to keep up, to wait until merges catch
    *  up. Applications that can take other less drastic measures, such
    *  as limiting how many threads are allowed to index, can do nothing
-   *  here and throttle elsewhere. */
+   *  here and throttle elsewhere.
+   *
+   *  If this method wants to stall but the calling thread is a merge
+   *  thread, it should return false to tell caller not to kick off
+   *  any new merges. */
 
-  protected synchronized void maybeStall(IndexWriter writer) {
+  protected synchronized boolean maybeStall(IndexWriter writer) {
     long startStallTime = 0;
     while (writer.hasPendingMerges() && mergeThreadCount() >= maxMergeCount) {
+
       // This means merging has fallen too far behind: we
       // have already created maxMergeCount threads, and
       // now there's at least one more merge pending.
@@ -495,22 +512,35 @@ public class ConcurrentMergeScheduler ex
       // updateMergeThreads).  We stall this producer
       // thread to prevent creation of new segments,
       // until merging has caught up:
+
+      if (mergeThreads.contains(Thread.currentThread())) {
+        // Never stall a merge thread since this blocks the thread from
+        // finishing and calling updateMergeThreads, and blocking it
+        // accomplishes nothing anyway (it's not really a segment producer):
+        return false;
+      }
+
       if (verbose() && startStallTime == 0) {
         message("    too many merges; stalling...");
       }
       startStallTime = System.currentTimeMillis();
-      try {
-        // Only wait 0.25 seconds, so if all merges are aborted (by IW.rollback) we notice:
-        wait(250);
-      } catch (InterruptedException ie) {
-        throw new ThreadInterruptedException(ie);
-      }
+      doStall();
     }
 
-    if (verbose()) {
-      if (startStallTime != 0) {
-        message("  stalled for " + (System.currentTimeMillis()-startStallTime) + " msec");
-      }
+    if (verbose() && startStallTime != 0) {
+      message("  stalled for " + (System.currentTimeMillis()-startStallTime) + " msec");
+    }
+
+    return true;
+  }
+
+  /** Called from {@link #maybeStall} to pause the calling thread for a bit. */
+  protected synchronized void doStall() {
+    try {
+      // Defensively wait for only .25 seconds in case we are missing a .notify/All somewhere:
+      wait(250);
+    } catch (InterruptedException ie) {
+      throw new ThreadInterruptedException(ie);
     }
   }
 
@@ -560,8 +590,6 @@ public class ConcurrentMergeScheduler ex
           message("  merge thread: done");
         }
 
-        removeMergeThread(this);
-
         // Let CMS run new merges if necessary:
         try {
           merge(writer, MergeTrigger.MERGE_FINISHED, true);
@@ -583,6 +611,8 @@ public class ConcurrentMergeScheduler ex
 
       } finally {
         synchronized(ConcurrentMergeScheduler.this) {
+          removeMergeThread();
+
           updateMergeThreads();
 
           // In case we had stalled indexing, we can now wake up

Modified: lucene/dev/branches/lucene_solr_5_0/lucene/core/src/java/org/apache/lucene/index/MergeTrigger.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene_solr_5_0/lucene/core/src/java/org/apache/lucene/index/MergeTrigger.java?rev=1654538&r1=1654537&r2=1654538&view=diff
==============================================================================
--- lucene/dev/branches/lucene_solr_5_0/lucene/core/src/java/org/apache/lucene/index/MergeTrigger.java (original)
+++ lucene/dev/branches/lucene_solr_5_0/lucene/core/src/java/org/apache/lucene/index/MergeTrigger.java Sat Jan 24 15:18:12 2015
@@ -27,11 +27,13 @@ public enum MergeTrigger {
    * Merge was triggered by a segment flush.
    */
   SEGMENT_FLUSH,
+
   /**
    * Merge was triggered by a full flush. Full flushes
    * can be caused by a commit, NRT reader reopen or a close call on the index writer.
    */
   FULL_FLUSH,
+
   /**
    * Merge has been triggered explicitly by the user.
    */

Modified: lucene/dev/branches/lucene_solr_5_0/lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene_solr_5_0/lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java?rev=1654538&r1=1654537&r2=1654538&view=diff
==============================================================================
--- lucene/dev/branches/lucene_solr_5_0/lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java (original)
+++ lucene/dev/branches/lucene_solr_5_0/lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java Sat Jan 24 15:18:12 2015
@@ -460,7 +460,6 @@ public class TestConcurrentMergeSchedule
 
     // No merges should have run so far, because TMP has high segmentsPerTier:
     assertEquals(0, maxRunningMergeCount.get());
-
     w.forceMerge(1);
 
     // At most 5 merge threads should have launched at once:
@@ -489,8 +488,9 @@ public class TestConcurrentMergeSchedule
     IndexWriterConfig iwc = newIndexWriterConfig(new MockAnalyzer(random()));
     iwc.setMergeScheduler(new ConcurrentMergeScheduler() {
         @Override
-        protected void maybeStall(IndexWriter writer) {
+        protected boolean maybeStall(IndexWriter writer) {
           wasCalled.set(true);
+          return true;
         }
       });
     IndexWriter w = new IndexWriter(dir, iwc);
@@ -640,4 +640,42 @@ public class TestConcurrentMergeSchedule
     assertTrue(threadCount <= 4);
     assertEquals(5+threadCount, cms.getMaxMergeCount());
   }
+
+  // LUCENE-6197
+  public void testNoStallMergeThreads() throws Exception {
+    MockDirectoryWrapper dir = newMockDirectory();
+
+    IndexWriterConfig iwc = newIndexWriterConfig(new MockAnalyzer(random()));
+    iwc.setMergePolicy(NoMergePolicy.INSTANCE);
+    iwc.setMaxBufferedDocs(2);
+    IndexWriter w = new IndexWriter(dir, iwc);
+    for(int i=0;i<1000;i++) {
+      Document doc = new Document();
+      doc.add(newStringField("field", ""+i, Field.Store.YES));
+      w.addDocument(doc);
+    }
+    w.close();
+
+    iwc = newIndexWriterConfig(new MockAnalyzer(random()));
+    final AtomicBoolean failed = new AtomicBoolean();
+    ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler() {
+        @Override
+        protected void doStall() {
+          if (Thread.currentThread().getName().startsWith("Lucene Merge Thread")) {
+            failed.set(true);
+          }
+          super.doStall();
+        }
+      };
+    cms.setMaxMergesAndThreads(2, 1);
+    iwc.setMergeScheduler(cms);
+    iwc.setMaxBufferedDocs(2);
+
+    w = new IndexWriter(dir, iwc);
+    w.forceMerge(1);
+    w.close();
+    dir.close();
+
+    assertFalse(failed.get());
+  }
 }

Modified: lucene/dev/branches/lucene_solr_5_0/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene_solr_5_0/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java?rev=1654538&r1=1654537&r2=1654538&view=diff
==============================================================================
--- lucene/dev/branches/lucene_solr_5_0/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java (original)
+++ lucene/dev/branches/lucene_solr_5_0/lucene/test-framework/src/java/org/apache/lucene/util/LuceneTestCase.java Sat Jan 24 15:18:12 2015
@@ -58,8 +58,8 @@ import java.util.logging.Logger;
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.analysis.MockAnalyzer;
 import org.apache.lucene.document.Document;
-import org.apache.lucene.document.Field;
 import org.apache.lucene.document.Field.Store;
+import org.apache.lucene.document.Field;
 import org.apache.lucene.document.FieldType;
 import org.apache.lucene.document.StringField;
 import org.apache.lucene.document.TextField;
@@ -78,8 +78,8 @@ import org.apache.lucene.index.FieldInfo
 import org.apache.lucene.index.FieldInfos;
 import org.apache.lucene.index.Fields;
 import org.apache.lucene.index.IndexOptions;
-import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.index.IndexReader.ReaderClosedListener;
+import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.index.IndexWriterConfig;
 import org.apache.lucene.index.IndexableField;
@@ -107,8 +107,8 @@ import org.apache.lucene.index.SortedDoc
 import org.apache.lucene.index.SortedNumericDocValues;
 import org.apache.lucene.index.SortedSetDocValues;
 import org.apache.lucene.index.Terms;
-import org.apache.lucene.index.TermsEnum;
 import org.apache.lucene.index.TermsEnum.SeekStatus;
+import org.apache.lucene.index.TermsEnum;
 import org.apache.lucene.index.TieredMergePolicy;
 import org.apache.lucene.search.AssertingIndexSearcher;
 import org.apache.lucene.search.DocIdSet;
@@ -125,8 +125,8 @@ import org.apache.lucene.store.FlushInfo
 import org.apache.lucene.store.IOContext;
 import org.apache.lucene.store.LockFactory;
 import org.apache.lucene.store.MergeInfo;
-import org.apache.lucene.store.MockDirectoryWrapper;
 import org.apache.lucene.store.MockDirectoryWrapper.Throttling;
+import org.apache.lucene.store.MockDirectoryWrapper;
 import org.apache.lucene.store.NRTCachingDirectory;
 import org.apache.lucene.util.automaton.AutomatonTestUtil;
 import org.apache.lucene.util.automaton.CompiledAutomaton;
@@ -142,7 +142,6 @@ import org.junit.Test;
 import org.junit.rules.RuleChain;
 import org.junit.rules.TestRule;
 import org.junit.runner.RunWith;
-
 import com.carrotsearch.randomizedtesting.JUnit4MethodProvider;
 import com.carrotsearch.randomizedtesting.LifecycleScope;
 import com.carrotsearch.randomizedtesting.MixWithSuiteName;
@@ -153,16 +152,16 @@ import com.carrotsearch.randomizedtestin
 import com.carrotsearch.randomizedtesting.annotations.SeedDecorators;
 import com.carrotsearch.randomizedtesting.annotations.TestGroup;
 import com.carrotsearch.randomizedtesting.annotations.TestMethodProviders;
-import com.carrotsearch.randomizedtesting.annotations.ThreadLeakAction;
 import com.carrotsearch.randomizedtesting.annotations.ThreadLeakAction.Action;
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakAction;
 import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
-import com.carrotsearch.randomizedtesting.annotations.ThreadLeakGroup;
 import com.carrotsearch.randomizedtesting.annotations.ThreadLeakGroup.Group;
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakGroup;
 import com.carrotsearch.randomizedtesting.annotations.ThreadLeakLingering;
-import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;
 import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope.Scope;
-import com.carrotsearch.randomizedtesting.annotations.ThreadLeakZombies;
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;
 import com.carrotsearch.randomizedtesting.annotations.ThreadLeakZombies.Consequence;
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakZombies;
 import com.carrotsearch.randomizedtesting.annotations.TimeoutSuite;
 import com.carrotsearch.randomizedtesting.generators.RandomPicks;
 import com.carrotsearch.randomizedtesting.rules.NoClassHooksShadowingRule;
@@ -912,7 +911,8 @@ public abstract class LuceneTestCase ext
       } else {
         cms = new ConcurrentMergeScheduler() {
             @Override
-            protected synchronized void maybeStall(IndexWriter writer) {
+            protected synchronized boolean maybeStall(IndexWriter writer) {
+              return true;
             }
           };
       }