You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by mi...@apache.org on 2010/12/19 18:07:24 UTC
svn commit: r1050899 -
/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java
Author: mikemccand
Date: Sun Dec 19 17:07:24 2010
New Revision: 1050899
URL: http://svn.apache.org/viewvc?rev=1050899&view=rev
Log:
LUCENE-2820: revert until I find the cause of the deadlock
Modified:
lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java
Modified: lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java?rev=1050899&r1=1050898&r2=1050899&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java (original)
+++ lucene/dev/trunk/lucene/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java Sun Dec 19 17:07:24 2010
@@ -65,6 +65,7 @@ public class ConcurrentMergeScheduler ex
protected Directory dir;
+ private boolean closed;
protected IndexWriter writer;
protected int mergeThreadCount;
@@ -146,37 +147,18 @@ public class ConcurrentMergeScheduler ex
* pause & unpause threads. */
protected synchronized void updateMergeThreads() {
- // Only look at threads that are alive & not in the
- // process of stopping (ie have an active merge):
- final List<MergeThread> activeMerges = new ArrayList<MergeThread>();
-
- int threadIdx = 0;
- while (threadIdx < mergeThreads.size()) {
- final MergeThread mergeThread = mergeThreads.get(threadIdx);
- if (!mergeThread.isAlive()) {
- // Prune any dead threads
- mergeThreads.remove(threadIdx);
- continue;
- }
- if (mergeThread.getCurrentMerge() != null) {
- activeMerges.add(mergeThread);
- }
- threadIdx++;
- }
-
- CollectionUtil.mergeSort(activeMerges, compareByMergeDocCount);
+ CollectionUtil.mergeSort(mergeThreads, compareByMergeDocCount);
+ final int count = mergeThreads.size();
int pri = mergeThreadPriority;
- final int activeMergeCount = activeMerges.size();
- for (threadIdx=0;threadIdx<activeMergeCount;threadIdx++) {
- final MergeThread mergeThread = activeMerges.get(threadIdx);
+ for(int i=0;i<count;i++) {
+ final MergeThread mergeThread = mergeThreads.get(i);
final MergePolicy.OneMerge merge = mergeThread.getCurrentMerge();
- if (merge == null) {
+ if (merge == null) {
continue;
}
-
final boolean doPause;
- if (threadIdx < activeMergeCount-maxThreadCount) {
+ if (i < count-maxThreadCount) {
doPause = true;
} else {
doPause = false;
@@ -226,29 +208,23 @@ public class ConcurrentMergeScheduler ex
@Override
public void close() {
- sync();
+ closed = true;
}
- /** Wait for any running merge threads to finish */
- public void sync() {
- while(true) {
- MergeThread toSync = null;
- synchronized(this) {
- for(MergeThread t : mergeThreads) {
- if (t.isAlive()) {
- toSync = t;
- break;
- }
- }
+ public synchronized void sync() {
+ while(mergeThreadCount() > 0) {
+ if (verbose())
+ message("now wait for threads; currently " + mergeThreads.size() + " still running");
+ final int count = mergeThreads.size();
+ if (verbose()) {
+ for(int i=0;i<count;i++)
+ message(" " + i + ": " + mergeThreads.get(i));
}
- if (toSync != null) {
- try {
- toSync.join();
- } catch (InterruptedException ie) {
- throw new ThreadInterruptedException(ie);
- }
- } else {
- break;
+
+ try {
+ wait();
+ } catch (InterruptedException ie) {
+ throw new ThreadInterruptedException(ie);
}
}
}
@@ -335,17 +311,11 @@ public class ConcurrentMergeScheduler ex
// merge:
merger = getMergeThread(writer, merge);
mergeThreads.add(merger);
- if (verbose()) {
+ updateMergeThreads();
+ if (verbose())
message(" launch new thread [" + merger.getName() + "]");
- }
merger.start();
-
- // Must call this after starting the thread else
- // the new thread is removed from mergeThreads
- // (since it's not alive yet):
- updateMergeThreads();
-
success = true;
}
} finally {
@@ -439,6 +409,7 @@ public class ConcurrentMergeScheduler ex
message(" merge thread: do another merge " + merge.segString(dir));
} else {
done = true;
+ updateMergeThreads();
break;
}
}
@@ -458,8 +429,10 @@ public class ConcurrentMergeScheduler ex
}
} finally {
synchronized(ConcurrentMergeScheduler.this) {
- updateMergeThreads();
ConcurrentMergeScheduler.this.notifyAll();
+ boolean removed = mergeThreads.remove(this);
+ assert removed;
+ updateMergeThreads();
}
}
}