You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jackrabbit.apache.org by ju...@apache.org on 2009/08/07 10:47:40 UTC
svn commit: r801913 - in
/jackrabbit/branches/1.x/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene:
IndexMerger.java MultiIndex.java SearchIndex.java
Author: jukka
Date: Fri Aug 7 08:47:40 2009
New Revision: 801913
URL: http://svn.apache.org/viewvc?rev=801913&view=rev
Log:
1.x: Reverted revision 801245 as it used java.util.concurrent classes not available in Java 1.4 (JCR-1818)
Modified:
jackrabbit/branches/1.x/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/IndexMerger.java
jackrabbit/branches/1.x/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/MultiIndex.java
jackrabbit/branches/1.x/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/SearchIndex.java
Modified: jackrabbit/branches/1.x/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/IndexMerger.java
URL: http://svn.apache.org/viewvc/jackrabbit/branches/1.x/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/IndexMerger.java?rev=801913&r1=801912&r2=801913&view=diff
==============================================================================
--- jackrabbit/branches/1.x/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/IndexMerger.java (original)
+++ jackrabbit/branches/1.x/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/IndexMerger.java Fri Aug 7 08:47:40 2009
@@ -18,6 +18,9 @@
import org.apache.lucene.index.Term;
import org.apache.lucene.index.IndexReader;
+import org.apache.commons.collections.Buffer;
+import org.apache.commons.collections.BufferUtils;
+import org.apache.commons.collections.buffer.UnboundedFifoBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -25,15 +28,15 @@
import java.util.Collections;
import java.util.ArrayList;
import java.util.Iterator;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Semaphore;
import java.io.IOException;
+import EDU.oswego.cs.dl.util.concurrent.Sync;
+import EDU.oswego.cs.dl.util.concurrent.Mutex;
+
/**
* Merges indexes in a separate daemon thread.
*/
-class IndexMerger implements IndexListener {
+class IndexMerger extends Thread implements IndexListener {
/**
* Logger instance for this class.
@@ -63,7 +66,13 @@
/**
* Queue of merge Tasks
*/
- private final BlockingQueue mergeTasks = new LinkedBlockingQueue();
+ private final Buffer mergeTasks = BufferUtils.blockingBuffer(new UnboundedFifoBuffer());
+
+ /**
+ * List of id <code>Term</code> that identify documents that were deleted
+ * while a merge was running.
+ */
+ private final List deletedDocuments = Collections.synchronizedList(new ArrayList());
/**
* List of <code>IndexBucket</code>s in ascending document limit.
@@ -83,41 +92,27 @@
/**
* Mutex that is acquired when replacing indexes on MultiIndex.
*/
- private final Semaphore indexReplacement;
+ private final Sync indexReplacement = new Mutex();
/**
- * List of merger threads that are currently busy.
+ * When released, indicates that this index merger is idle.
*/
- private final List busyMergers = new ArrayList();
-
- /**
- * List of merger threads.
- */
- private final List workers = new ArrayList();
+ private final Sync mergerIdle = new Mutex();
/**
* Creates an <code>IndexMerger</code>.
*
* @param multiIndex the <code>MultiIndex</code>.
- * @param numWorkers the number of worker threads to use.
*/
- IndexMerger(MultiIndex multiIndex, int numWorkers) {
+ IndexMerger(MultiIndex multiIndex) {
this.multiIndex = multiIndex;
- for (int i = 0; i < numWorkers; i++) {
- Worker w = new Worker();
- workers.add(w);
- busyMergers.add(w);
- }
- this.indexReplacement = new Semaphore(workers.size());
- }
-
- /**
- * Starts this index merger.
- */
- void start() {
- Iterator iterator = workers.iterator();
- while (iterator.hasNext()) {
- ((Worker) iterator.next()).start();
+ setName("IndexMerger");
+ setDaemon(true);
+ try {
+ mergerIdle.acquire();
+ } catch (InterruptedException e) {
+ // will never happen, lock is free upon construction
+ throw new InternalError("Unable to acquire mutex after construction");
}
}
@@ -155,9 +150,8 @@
// put index in bucket
IndexBucket bucket = (IndexBucket) indexBuckets.get(indexBuckets.size() - 1);
- Iterator iterator = indexBuckets.iterator();
- while (iterator.hasNext()) {
- bucket = (IndexBucket) iterator.next();
+ for (int i = 0; i < indexBuckets.size(); i++) {
+ bucket = (IndexBucket) indexBuckets.get(i);
if (bucket.fits(numDocs)) {
break;
}
@@ -190,15 +184,8 @@
if (log.isDebugEnabled()) {
log.debug("requesting merge for " + indexesToMerge);
}
- addMergeTask(new Merge(idxs));
- if (log.isDebugEnabled()) {
- log.debug("merge queue now contains " + mergeTasks.size() + " tasks.");
- int numBusy;
- synchronized (busyMergers) {
- numBusy = busyMergers.size();
- }
- log.debug("# of busy merge workers: " + numBusy);
- }
+ mergeTasks.add(new Merge(idxs));
+ log.debug("merge queue now contains " + mergeTasks.size() + " tasks.");
}
}
}
@@ -209,28 +196,18 @@
*/
public void documentDeleted(Term id) {
log.debug("document deleted: " + id.text());
- synchronized (busyMergers) {
- Iterator iterator = busyMergers.iterator();
- while (iterator.hasNext()) {
- ((Worker) iterator.next()).documentDeleted(id);
- }
- }
+ deletedDocuments.add(id);
}
/**
* When the calling thread returns this index merger will be idle, that is
- * there will be no merge tasks pending anymore. The method returns
- * immediately if there are currently no tasks pending at all.
- *
- * @throws InterruptedException if this thread is interrupted while waiting
- * for the worker threads to become idle.
+ * there will be no merge tasks pending anymore. The method returns immediately
+ * if there are currently no tasks pending at all.
*/
void waitUntilIdle() throws InterruptedException {
- synchronized (busyMergers) {
- while (!busyMergers.isEmpty()) {
- busyMergers.wait();
- }
- }
+ mergerIdle.acquire();
+ // and immediately release again
+ mergerIdle.release();
}
/**
@@ -239,50 +216,135 @@
*/
void dispose() {
log.debug("dispose IndexMerger");
- // get all permits for index replacements
+ // get mutex for index replacements
try {
- indexReplacement.acquire(workers.size());
+ indexReplacement.acquire();
} catch (InterruptedException e) {
- log.warn("Interrupted while acquiring index replacement permits: " + e);
+ log.warn("Interrupted while acquiring index replacement sync: " + e);
// try to stop IndexMerger without the sync
}
- log.debug("merge queue size: " + mergeTasks.size());
// clear task queue
mergeTasks.clear();
// send quit
- addMergeTask(QUIT);
+ mergeTasks.add(QUIT);
log.debug("quit sent");
try {
- // give the merger threads some time to quit,
- // it is possible that the mergers are busy working on a large index.
+ // give the merger thread some time to quit,
+ // it is possible that the merger is busy working on a large index.
// if that is the case we will just ignore it and the daemon will
// die without being able to finish the merge.
// at this point it is not possible anymore to replace indexes
- // on the MultiIndex because we hold all indexReplacement permits.
- Iterator iterator = workers.iterator();
- while (iterator.hasNext()) {
- Thread t = (Thread) iterator.next();
- t.join(500);
- if (t.isAlive()) {
- log.info("Unable to stop IndexMerger.Worker. Daemon is busy.");
- } else {
- log.debug("IndexMerger.Worker thread stopped");
- }
+ // on the MultiIndex because we hold the indexReplacement Sync.
+ this.join(500);
+ if (isAlive()) {
+ log.info("Unable to stop IndexMerger. Daemon is busy.");
+ } else {
+ log.debug("IndexMerger thread stopped");
}
+ log.debug("merge queue size: " + mergeTasks.size());
} catch (InterruptedException e) {
- log.warn("Interrupted while waiting for IndexMerger threads to terminate.");
+ log.warn("Interrupted while waiting for IndexMerger thread to terminate.");
+ }
+ }
+
+ /**
+ * Implements the index merging.
+ */
+ public void run() {
+ for (;;) {
+ boolean isIdle = false;
+ if (mergeTasks.size() == 0) {
+ mergerIdle.release();
+ isIdle = true;
+ }
+ Merge task = (Merge) mergeTasks.remove();
+ if (task == QUIT) {
+ mergerIdle.release();
+ break;
+ }
+ if (isIdle) {
+ try {
+ mergerIdle.acquire();
+ } catch (InterruptedException e) {
+ Thread.interrupted();
+ log.warn("Unable to acquire mergerIdle sync");
+ }
+ }
+
+ log.debug("accepted merge request");
+
+ // reset deleted documents
+ deletedDocuments.clear();
+
+ // get readers
+ String[] names = new String[task.indexes.length];
+ for (int i = 0; i < task.indexes.length; i++) {
+ names[i] = task.indexes[i].name;
+ }
+ try {
+ log.debug("create new index");
+ PersistentIndex index = multiIndex.getOrCreateIndex(null);
+ boolean success = false;
+ try {
+
+ log.debug("get index readers from MultiIndex");
+ IndexReader[] readers = multiIndex.getIndexReaders(names, this);
+ try {
+ // do the merge
+ long time = System.currentTimeMillis();
+ index.addIndexes(readers);
+ time = System.currentTimeMillis() - time;
+ int docCount = 0;
+ for (int i = 0; i < readers.length; i++) {
+ docCount += readers[i].numDocs();
+ }
+ log.info("merged " + docCount + " documents in " + time + " ms into " + index.getName() + ".");
+ } finally {
+ for (int i = 0; i < readers.length; i++) {
+ try {
+ Util.closeOrRelease(readers[i]);
+ } catch (IOException e) {
+ log.warn("Unable to close IndexReader: " + e);
+ }
+ }
+ }
+
+ // inform multi index
+ // if we cannot get the sync immediately we have to quit
+ if (!indexReplacement.attempt(0)) {
+ log.debug("index merging canceled");
+ break;
+ }
+ try {
+ log.debug("replace indexes");
+ multiIndex.replaceIndexes(names, index, deletedDocuments);
+ } finally {
+ indexReplacement.release();
+ }
+
+ success = true;
+
+ } finally {
+ if (!success) {
+ // delete index
+ log.debug("deleting index " + index.getName());
+ multiIndex.deleteIndex(index);
+ }
+ }
+ } catch (Throwable e) {
+ log.error("Error while merging indexes: ", e);
+ }
}
+ log.info("IndexMerger terminated");
}
//-----------------------< merge properties >-------------------------------
/**
* The merge factor.
- *
- * @param mergeFactor the merge factor.
*/
public void setMergeFactor(int mergeFactor) {
this.mergeFactor = mergeFactor;
@@ -291,8 +353,6 @@
/**
* The initial threshold for number of documents to merge to a new index.
- *
- * @param minMergeDocs the min merge docs number.
*/
public void setMinMergeDocs(int minMergeDocs) {
this.minMergeDocs = minMergeDocs;
@@ -300,8 +360,6 @@
/**
* The maximum number of document to merge.
- *
- * @param maxMergeDocs the max merge docs number.
*/
public void setMaxMergeDocs(int maxMergeDocs) {
this.maxMergeDocs = maxMergeDocs;
@@ -309,18 +367,6 @@
//------------------------------< internal >--------------------------------
- private void addMergeTask(Merge task) {
- for (;;) {
- try {
- mergeTasks.put(task);
- break;
- } catch (InterruptedException e) {
- // try again
- Thread.interrupted();
- }
- }
- }
-
/**
* Implements a simple struct that holds the name of an index and how
* many document it contains. <code>Index</code> is comparable using the
@@ -402,8 +448,6 @@
*/
private static final class IndexBucket extends ArrayList {
- private static final long serialVersionUID = 2985514550083374904L;
-
/**
* The lower document limit.
*/
@@ -452,130 +496,4 @@
return allowMerge;
}
}
-
- private class Worker extends Thread implements IndexListener {
-
- /**
- * List of id <code>Term</code> that identify documents that were deleted
- * while a merge was running.
- */
- private final List deletedDocuments = Collections.synchronizedList(new ArrayList());
-
- public Worker() {
- setName("IndexMerger.Worker");
- setDaemon(true);
- }
-
- /**
- * Implements the index merging.
- */
- public void run() {
- for (;;) {
- boolean isIdle = false;
- if (mergeTasks.size() == 0) {
- synchronized (busyMergers) {
- busyMergers.remove(this);
- busyMergers.notifyAll();
- }
- isIdle = true;
- }
- Merge task;
- for (;;) {
- try {
- task = (Merge) mergeTasks.take();
- break;
- } catch (InterruptedException e) {
- // try again
- Thread.interrupted();
- }
- }
- if (task == QUIT) {
- synchronized (busyMergers) {
- busyMergers.remove(this);
- }
- // put back QUIT to signal other workers
- addMergeTask(task);
- break;
- }
- if (isIdle) {
- synchronized (busyMergers) {
- busyMergers.add(this);
- }
- }
-
- log.debug("accepted merge request");
-
- // reset deleted documents
- deletedDocuments.clear();
-
- // get readers
- String[] names = new String[task.indexes.length];
- for (int i = 0; i < task.indexes.length; i++) {
- names[i] = task.indexes[i].name;
- }
- try {
- log.debug("create new index");
- PersistentIndex index = multiIndex.getOrCreateIndex(null);
- boolean success = false;
- try {
-
- log.debug("get index readers from MultiIndex");
- IndexReader[] readers = multiIndex.getIndexReaders(names, IndexMerger.this);
- try {
- // do the merge
- long time = System.currentTimeMillis();
- index.addIndexes(readers);
- time = System.currentTimeMillis() - time;
- int docCount = 0;
- for (int i = 0; i < readers.length; i++) {
- docCount += readers[i].numDocs();
- }
- log.info("merged " + docCount + " documents in " + time + " ms into " + index.getName() + ".");
- } finally {
- for (int i = 0; i < readers.length; i++) {
- try {
- Util.closeOrRelease(readers[i]);
- } catch (IOException e) {
- log.warn("Unable to close IndexReader: " + e);
- }
- }
- }
-
- // inform multi index
- // if we cannot get the sync immediately we have to quit
- if (!indexReplacement.tryAcquire()) {
- log.debug("index merging canceled");
- break;
- }
- try {
- log.debug("replace indexes");
- multiIndex.replaceIndexes(names, index, deletedDocuments);
- } finally {
- indexReplacement.release();
- }
-
- success = true;
-
- } finally {
- if (!success) {
- // delete index
- log.debug("deleting index " + index.getName());
- multiIndex.deleteIndex(index);
- }
- }
- } catch (Throwable e) {
- log.error("Error while merging indexes: ", e);
- }
- }
- log.info("IndexMerger.Worker terminated");
- }
-
- /**
- * @inheritDoc
- */
- public void documentDeleted(Term id) {
- log.debug("document deleted: " + id.text());
- deletedDocuments.add(id);
- }
- }
}
Modified: jackrabbit/branches/1.x/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/MultiIndex.java
URL: http://svn.apache.org/viewvc/jackrabbit/branches/1.x/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/MultiIndex.java?rev=801913&r1=801912&r2=801913&view=diff
==============================================================================
--- jackrabbit/branches/1.x/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/MultiIndex.java (original)
+++ jackrabbit/branches/1.x/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/MultiIndex.java Fri Aug 7 08:47:40 2009
@@ -241,7 +241,7 @@
removeDeletable();
// initialize IndexMerger
- merger = new IndexMerger(this, handler.getIndexMergerPoolSize());
+ merger = new IndexMerger(this);
merger.setMaxMergeDocs(handler.getMaxMergeDocs());
merger.setMergeFactor(handler.getMergeFactor());
merger.setMinMergeDocs(handler.getMinMergeDocs());
Modified: jackrabbit/branches/1.x/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/SearchIndex.java
URL: http://svn.apache.org/viewvc/jackrabbit/branches/1.x/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/SearchIndex.java?rev=801913&r1=801912&r2=801913&view=diff
==============================================================================
--- jackrabbit/branches/1.x/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/SearchIndex.java (original)
+++ jackrabbit/branches/1.x/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/SearchIndex.java Fri Aug 7 08:47:40 2009
@@ -167,11 +167,6 @@
public static final int DEFAULT_TERM_INFOS_INDEX_DIVISOR = 1;
/**
- * The default value for {@link #indexMergerPoolSize}.
- */
- public static final int DEFAULT_INDEX_MERGER_POOL_SIZE = 2;
-
- /**
* The path factory.
*/
protected static final PathFactory PATH_FACTORY = PathFactoryImpl.getInstance();
@@ -445,11 +440,6 @@
private boolean initializeHierarchyCache = true;
/**
- * The number of worker threads for merging index segments.
- */
- private int indexMergerPoolSize = DEFAULT_INDEX_MERGER_POOL_SIZE;
-
- /**
* Indicates if this <code>SearchIndex</code> is closed and cannot be used
* anymore.
*/
@@ -708,8 +698,6 @@
/**
* This method returns the QueryNodeFactory used to parse Queries. This method
* may be overridden to provide a customized QueryNodeFactory
- *
- * @return the query node factory.
*/
protected DefaultQueryNodeFactory getQueryNodeFactory() {
return DEFAULT_QUERY_NODE_FACTORY;
@@ -2155,26 +2143,6 @@
this.initializeHierarchyCache = initializeHierarchyCache;
}
- /**
- * @return the current size of the index merger pool.
- */
- public int getIndexMergerPoolSize() {
- return indexMergerPoolSize;
- }
-
- /**
- * Sets a new value for the index merger pool size.
- *
- * @param indexMergerPoolSize the number of worker threads.
- * @throws IllegalArgumentException if the size is less than or equal 0.
- */
- public void setIndexMergerPoolSize(int indexMergerPoolSize) {
- if (indexMergerPoolSize <= 0) {
- throw new IllegalArgumentException("must be greater than 0");
- }
- this.indexMergerPoolSize = indexMergerPoolSize;
- }
-
//----------------------------< internal >----------------------------------
/**