You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by mi...@apache.org on 2010/12/19 15:03:51 UTC

svn commit: r1050855 - in /lucene/dev/branches/branch_3x: ./ lucene/ lucene/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java solr/

Author: mikemccand
Date: Sun Dec 19 14:03:50 2010
New Revision: 1050855

URL: http://svn.apache.org/viewvc?rev=1050855&view=rev
Log:
LUCENE-2820: fix CMS to stop all threads during close

Modified:
    lucene/dev/branches/branch_3x/   (props changed)
    lucene/dev/branches/branch_3x/lucene/   (props changed)
    lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java
    lucene/dev/branches/branch_3x/solr/   (props changed)

Modified: lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java?rev=1050855&r1=1050854&r2=1050855&view=diff
==============================================================================
--- lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java (original)
+++ lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java Sun Dec 19 14:03:50 2010
@@ -46,6 +46,7 @@ import java.util.Comparator;
 public class ConcurrentMergeScheduler extends MergeScheduler {
 
   private int mergeThreadPriority = -1;
+  private volatile boolean closed;
 
   protected List<MergeThread> mergeThreads = new ArrayList<MergeThread>();
 
@@ -65,7 +66,6 @@ public class ConcurrentMergeScheduler ex
 
   protected Directory dir;
 
-  private boolean closed;
   protected IndexWriter writer;
   protected int mergeThreadCount;
 
@@ -154,18 +154,37 @@ public class ConcurrentMergeScheduler ex
    *  pause & unpause threads. */
   protected synchronized void updateMergeThreads() {
 
-    CollectionUtil.mergeSort(mergeThreads, compareByMergeDocCount);
+    // Only look at threads that are alive & not in the
+    // process of stopping (ie have an active merge):
+    final List<MergeThread> activeMerges = new ArrayList<MergeThread>();
+
+    int threadIdx = 0;
+    while (threadIdx < mergeThreads.size()) {
+      final MergeThread mergeThread = mergeThreads.get(threadIdx);
+      if (!mergeThread.isAlive()) {
+        // Prune any dead threads
+        mergeThreads.remove(threadIdx);
+        continue;
+      }
+      if (mergeThread.getCurrentMerge() != null) {
+        activeMerges.add(mergeThread);
+      }
+      threadIdx++;
+    }
+
+    CollectionUtil.mergeSort(activeMerges, compareByMergeDocCount);
     
-    final int count = mergeThreads.size();
     int pri = mergeThreadPriority;
-    for(int i=0;i<count;i++) {
-      final MergeThread mergeThread = mergeThreads.get(i);
+    final int activeMergeCount = activeMerges.size();
+    for (threadIdx=0;threadIdx<activeMergeCount;threadIdx++) {
+      final MergeThread mergeThread = activeMerges.get(threadIdx);
       final MergePolicy.OneMerge merge = mergeThread.getCurrentMerge();
-      if (merge == null) {
+      if (merge == null) { 
         continue;
       }
+
       final boolean doPause;
-      if (i < count-maxThreadCount) {
+      if (threadIdx < activeMergeCount-maxThreadCount) {
         doPause = true;
       } else {
         doPause = false;
@@ -216,22 +235,29 @@ public class ConcurrentMergeScheduler ex
   @Override
   public void close() {
     closed = true;
+    sync();
   }
 
-  public synchronized void sync() {
-    while(mergeThreadCount() > 0) {
-      if (verbose())
-        message("now wait for threads; currently " + mergeThreads.size() + " still running");
-      final int count = mergeThreads.size();
-      if (verbose()) {
-        for(int i=0;i<count;i++)
-          message("    " + i + ": " + mergeThreads.get(i));
+  /** Wait for any running merge threads to finish */
+  public void sync() {
+    while(true) {
+      MergeThread toSync = null;
+      synchronized(this) {
+        for(MergeThread t : mergeThreads) {
+          if (t.isAlive()) {
+            toSync = t;
+            break;
+          }
+        }
       }
-      
-      try {
-        wait();
-      } catch (InterruptedException ie) {
-        throw new ThreadInterruptedException(ie);
+      if (toSync != null) {
+        try {
+          toSync.join();
+        } catch (InterruptedException ie) {
+          throw new ThreadInterruptedException(ie);
+        }
+      } else {
+        break;
       }
     }
   }
@@ -318,11 +344,17 @@ public class ConcurrentMergeScheduler ex
           // merge:
           merger = getMergeThread(writer, merge);
           mergeThreads.add(merger);
-          updateMergeThreads();
-          if (verbose())
+          if (verbose()) {
             message("    launch new thread [" + merger.getName() + "]");
+          }
 
           merger.start();
+
+          // Must call this after starting the thread else
+          // the new thread is removed from mergeThreads
+          // (since it's not alive yet):
+          updateMergeThreads();
+
           success = true;
         }
       } finally {
@@ -416,7 +448,6 @@ public class ConcurrentMergeScheduler ex
               message("  merge thread: do another merge " + merge.segString(dir));
           } else {
             done = true;
-            updateMergeThreads();
             break;
           }
         }
@@ -437,10 +468,8 @@ public class ConcurrentMergeScheduler ex
         }
       } finally {
         synchronized(ConcurrentMergeScheduler.this) {
-          ConcurrentMergeScheduler.this.notifyAll();
-          boolean removed = mergeThreads.remove(this);
-          assert removed;
           updateMergeThreads();
+          ConcurrentMergeScheduler.this.notifyAll();
         }
       }
     }