You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by mi...@apache.org on 2010/12/19 15:03:51 UTC
svn commit: r1050855 - in /lucene/dev/branches/branch_3x: ./ lucene/
lucene/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java solr/
Author: mikemccand
Date: Sun Dec 19 14:03:50 2010
New Revision: 1050855
URL: http://svn.apache.org/viewvc?rev=1050855&view=rev
Log:
LUCENE-2820: fix CMS to stop all threads during close
Modified:
lucene/dev/branches/branch_3x/ (props changed)
lucene/dev/branches/branch_3x/lucene/ (props changed)
lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java
lucene/dev/branches/branch_3x/solr/ (props changed)
Modified: lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java?rev=1050855&r1=1050854&r2=1050855&view=diff
==============================================================================
--- lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java (original)
+++ lucene/dev/branches/branch_3x/lucene/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java Sun Dec 19 14:03:50 2010
@@ -46,6 +46,7 @@ import java.util.Comparator;
public class ConcurrentMergeScheduler extends MergeScheduler {
private int mergeThreadPriority = -1;
+ private volatile boolean closed;
protected List<MergeThread> mergeThreads = new ArrayList<MergeThread>();
@@ -65,7 +66,6 @@ public class ConcurrentMergeScheduler ex
protected Directory dir;
- private boolean closed;
protected IndexWriter writer;
protected int mergeThreadCount;
@@ -154,18 +154,37 @@ public class ConcurrentMergeScheduler ex
* pause & unpause threads. */
protected synchronized void updateMergeThreads() {
- CollectionUtil.mergeSort(mergeThreads, compareByMergeDocCount);
+ // Only look at threads that are alive & not in the
+ // process of stopping (ie have an active merge):
+ final List<MergeThread> activeMerges = new ArrayList<MergeThread>();
+
+ int threadIdx = 0;
+ while (threadIdx < mergeThreads.size()) {
+ final MergeThread mergeThread = mergeThreads.get(threadIdx);
+ if (!mergeThread.isAlive()) {
+ // Prune any dead threads
+ mergeThreads.remove(threadIdx);
+ continue;
+ }
+ if (mergeThread.getCurrentMerge() != null) {
+ activeMerges.add(mergeThread);
+ }
+ threadIdx++;
+ }
+
+ CollectionUtil.mergeSort(activeMerges, compareByMergeDocCount);
- final int count = mergeThreads.size();
int pri = mergeThreadPriority;
- for(int i=0;i<count;i++) {
- final MergeThread mergeThread = mergeThreads.get(i);
+ final int activeMergeCount = activeMerges.size();
+ for (threadIdx=0;threadIdx<activeMergeCount;threadIdx++) {
+ final MergeThread mergeThread = activeMerges.get(threadIdx);
final MergePolicy.OneMerge merge = mergeThread.getCurrentMerge();
- if (merge == null) {
+ if (merge == null) {
continue;
}
+
final boolean doPause;
- if (i < count-maxThreadCount) {
+ if (threadIdx < activeMergeCount-maxThreadCount) {
doPause = true;
} else {
doPause = false;
@@ -216,22 +235,29 @@ public class ConcurrentMergeScheduler ex
@Override
public void close() {
closed = true;
+ sync();
}
- public synchronized void sync() {
- while(mergeThreadCount() > 0) {
- if (verbose())
- message("now wait for threads; currently " + mergeThreads.size() + " still running");
- final int count = mergeThreads.size();
- if (verbose()) {
- for(int i=0;i<count;i++)
- message(" " + i + ": " + mergeThreads.get(i));
+ /** Wait for any running merge threads to finish */
+ public void sync() {
+ while(true) {
+ MergeThread toSync = null;
+ synchronized(this) {
+ for(MergeThread t : mergeThreads) {
+ if (t.isAlive()) {
+ toSync = t;
+ break;
+ }
+ }
}
-
- try {
- wait();
- } catch (InterruptedException ie) {
- throw new ThreadInterruptedException(ie);
+ if (toSync != null) {
+ try {
+ toSync.join();
+ } catch (InterruptedException ie) {
+ throw new ThreadInterruptedException(ie);
+ }
+ } else {
+ break;
}
}
}
@@ -318,11 +344,17 @@ public class ConcurrentMergeScheduler ex
// merge:
merger = getMergeThread(writer, merge);
mergeThreads.add(merger);
- updateMergeThreads();
- if (verbose())
+ if (verbose()) {
message(" launch new thread [" + merger.getName() + "]");
+ }
merger.start();
+
+ // Must call this after starting the thread else
+ // the new thread is removed from mergeThreads
+ // (since it's not alive yet):
+ updateMergeThreads();
+
success = true;
}
} finally {
@@ -416,7 +448,6 @@ public class ConcurrentMergeScheduler ex
message(" merge thread: do another merge " + merge.segString(dir));
} else {
done = true;
- updateMergeThreads();
break;
}
}
@@ -437,10 +468,8 @@ public class ConcurrentMergeScheduler ex
}
} finally {
synchronized(ConcurrentMergeScheduler.this) {
- ConcurrentMergeScheduler.this.notifyAll();
- boolean removed = mergeThreads.remove(this);
- assert removed;
updateMergeThreads();
+ ConcurrentMergeScheduler.this.notifyAll();
}
}
}