You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jackrabbit.apache.org by ju...@apache.org on 2009/08/07 10:47:40 UTC

svn commit: r801913 - in /jackrabbit/branches/1.x/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene: IndexMerger.java MultiIndex.java SearchIndex.java

Author: jukka
Date: Fri Aug  7 08:47:40 2009
New Revision: 801913

URL: http://svn.apache.org/viewvc?rev=801913&view=rev
Log:
1.x: Reverted revision 801245 as it used java.util.concurrent classes not available in Java 1.4 (JCR-1818)

Modified:
    jackrabbit/branches/1.x/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/IndexMerger.java
    jackrabbit/branches/1.x/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/MultiIndex.java
    jackrabbit/branches/1.x/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/SearchIndex.java

Modified: jackrabbit/branches/1.x/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/IndexMerger.java
URL: http://svn.apache.org/viewvc/jackrabbit/branches/1.x/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/IndexMerger.java?rev=801913&r1=801912&r2=801913&view=diff
==============================================================================
--- jackrabbit/branches/1.x/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/IndexMerger.java (original)
+++ jackrabbit/branches/1.x/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/IndexMerger.java Fri Aug  7 08:47:40 2009
@@ -18,6 +18,9 @@
 
 import org.apache.lucene.index.Term;
 import org.apache.lucene.index.IndexReader;
+import org.apache.commons.collections.Buffer;
+import org.apache.commons.collections.BufferUtils;
+import org.apache.commons.collections.buffer.UnboundedFifoBuffer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -25,15 +28,15 @@
 import java.util.Collections;
 import java.util.ArrayList;
 import java.util.Iterator;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Semaphore;
 import java.io.IOException;
 
+import EDU.oswego.cs.dl.util.concurrent.Sync;
+import EDU.oswego.cs.dl.util.concurrent.Mutex;
+
 /**
  * Merges indexes in a separate daemon thread.
  */
-class IndexMerger implements IndexListener {
+class IndexMerger extends Thread implements IndexListener {
 
     /**
      * Logger instance for this class.
@@ -63,7 +66,13 @@
     /**
      * Queue of merge Tasks
      */
-    private final BlockingQueue mergeTasks = new LinkedBlockingQueue();
+    private final Buffer mergeTasks = BufferUtils.blockingBuffer(new UnboundedFifoBuffer());
+
+    /**
+     * List of id <code>Term</code> that identify documents that were deleted
+     * while a merge was running.
+     */
+    private final List deletedDocuments = Collections.synchronizedList(new ArrayList());
 
     /**
      * List of <code>IndexBucket</code>s in ascending document limit.
@@ -83,41 +92,27 @@
     /**
      * Mutex that is acquired when replacing indexes on MultiIndex.
      */
-    private final Semaphore indexReplacement;
+    private final Sync indexReplacement = new Mutex();
 
     /**
-     * List of merger threads that are currently busy.
+     * When released, indicates that this index merger is idle.
      */
-    private final List busyMergers = new ArrayList();
-
-    /**
-     * List of merger threads.
-     */
-    private final List workers = new ArrayList();
+    private final Sync mergerIdle = new Mutex();
 
     /**
      * Creates an <code>IndexMerger</code>.
      *
      * @param multiIndex the <code>MultiIndex</code>.
-     * @param numWorkers the number of worker threads to use.
      */
-    IndexMerger(MultiIndex multiIndex, int numWorkers) {
+    IndexMerger(MultiIndex multiIndex) {
         this.multiIndex = multiIndex;
-        for (int i = 0; i < numWorkers; i++) {
-            Worker w = new Worker();
-            workers.add(w);
-            busyMergers.add(w);
-        }
-        this.indexReplacement = new Semaphore(workers.size());
-    }
-
-    /**
-     * Starts this index merger.
-     */
-    void start() {
-        Iterator iterator = workers.iterator();
-        while (iterator.hasNext()) {
-            ((Worker) iterator.next()).start();
+        setName("IndexMerger");
+        setDaemon(true);
+        try {
+            mergerIdle.acquire();
+        } catch (InterruptedException e) {
+            // will never happen, lock is free upon construction
+            throw new InternalError("Unable to acquire mutex after construction");
         }
     }
 
@@ -155,9 +150,8 @@
 
             // put index in bucket
             IndexBucket bucket = (IndexBucket) indexBuckets.get(indexBuckets.size() - 1);
-            Iterator iterator = indexBuckets.iterator();
-            while (iterator.hasNext()) {
-                bucket = (IndexBucket) iterator.next();
+            for (int i = 0; i < indexBuckets.size(); i++) {
+                bucket = (IndexBucket) indexBuckets.get(i);
                 if (bucket.fits(numDocs)) {
                     break;
                 }
@@ -190,15 +184,8 @@
                     if (log.isDebugEnabled()) {
                         log.debug("requesting merge for " + indexesToMerge);
                     }
-                    addMergeTask(new Merge(idxs));
-                    if (log.isDebugEnabled()) {
-                        log.debug("merge queue now contains " + mergeTasks.size() + " tasks.");
-                        int numBusy;
-                        synchronized (busyMergers) {
-                            numBusy = busyMergers.size();
-                        }
-                        log.debug("# of busy merge workers: " + numBusy);
-                    }
+                    mergeTasks.add(new Merge(idxs));
+                    log.debug("merge queue now contains " + mergeTasks.size() + " tasks.");
                 }
             }
         }
@@ -209,28 +196,18 @@
      */
     public void documentDeleted(Term id) {
         log.debug("document deleted: " + id.text());
-        synchronized (busyMergers) {
-            Iterator iterator = busyMergers.iterator();
-            while (iterator.hasNext()) {
-                ((Worker) iterator.next()).documentDeleted(id);
-            }
-        }
+        deletedDocuments.add(id);
     }
 
     /**
      * When the calling thread returns this index merger will be idle, that is
-     * there will be no merge tasks pending anymore. The method returns
-     * immediately if there are currently no tasks pending at all.
-     *
-     * @throws InterruptedException if this thread is interrupted while waiting
-     *                              for the worker threads to become idle.
+     * there will be no merge tasks pending anymore. The method returns immediately
+     * if there are currently no tasks pending at all.
      */
     void waitUntilIdle() throws InterruptedException {
-        synchronized (busyMergers) {
-            while (!busyMergers.isEmpty()) {
-                busyMergers.wait();
-            }
-        }
+        mergerIdle.acquire();
+        // and immediately release again
+        mergerIdle.release();
     }
 
     /**
@@ -239,50 +216,135 @@
      */
     void dispose() {
         log.debug("dispose IndexMerger");
-        // get all permits for index replacements
+        // get mutex for index replacements
         try {
-            indexReplacement.acquire(workers.size());
+            indexReplacement.acquire();
         } catch (InterruptedException e) {
-            log.warn("Interrupted while acquiring index replacement permits: " + e);
+            log.warn("Interrupted while acquiring index replacement sync: " + e);
             // try to stop IndexMerger without the sync
         }
 
-        log.debug("merge queue size: " + mergeTasks.size());
         // clear task queue
         mergeTasks.clear();
 
         // send quit
-        addMergeTask(QUIT);
+        mergeTasks.add(QUIT);
         log.debug("quit sent");
 
         try {
-            // give the merger threads some time to quit,
-            // it is possible that the mergers are busy working on a large index.
+            // give the merger thread some time to quit,
+            // it is possible that the merger is busy working on a large index.
             // if that is the case we will just ignore it and the daemon will
             // die without being able to finish the merge.
             // at this point it is not possible anymore to replace indexes
-            // on the MultiIndex because we hold all indexReplacement permits.
-            Iterator iterator = workers.iterator();
-            while (iterator.hasNext()) {
-                Thread t = (Thread) iterator.next();
-                t.join(500);
-                if (t.isAlive()) {
-                    log.info("Unable to stop IndexMerger.Worker. Daemon is busy.");
-                } else {
-                    log.debug("IndexMerger.Worker thread stopped");
-                }
+            // on the MultiIndex because we hold the indexReplacement Sync.
+            this.join(500);
+            if (isAlive()) {
+                log.info("Unable to stop IndexMerger. Daemon is busy.");
+            } else {
+                log.debug("IndexMerger thread stopped");
             }
+            log.debug("merge queue size: " + mergeTasks.size());
         } catch (InterruptedException e) {
-            log.warn("Interrupted while waiting for IndexMerger threads to terminate.");
+            log.warn("Interrupted while waiting for IndexMerger thread to terminate.");
+        }
+    }
+
+    /**
+     * Implements the index merging.
+     */
+    public void run() {
+        for (;;) {
+            boolean isIdle = false;
+            if (mergeTasks.size() == 0) {
+                mergerIdle.release();
+                isIdle = true;
+            }
+            Merge task = (Merge) mergeTasks.remove();
+            if (task == QUIT) {
+                mergerIdle.release();
+                break;
+            }
+            if (isIdle) {
+                try {
+                    mergerIdle.acquire();
+                } catch (InterruptedException e) {
+                    Thread.interrupted();
+                    log.warn("Unable to acquire mergerIdle sync");
+                }
+            }
+
+            log.debug("accepted merge request");
+
+            // reset deleted documents
+            deletedDocuments.clear();
+
+            // get readers
+            String[] names = new String[task.indexes.length];
+            for (int i = 0; i < task.indexes.length; i++) {
+                names[i] = task.indexes[i].name;
+            }
+            try {
+                log.debug("create new index");
+                PersistentIndex index = multiIndex.getOrCreateIndex(null);
+                boolean success = false;
+                try {
+
+                    log.debug("get index readers from MultiIndex");
+                    IndexReader[] readers = multiIndex.getIndexReaders(names, this);
+                    try {
+                        // do the merge
+                        long time = System.currentTimeMillis();
+                        index.addIndexes(readers);
+                        time = System.currentTimeMillis() - time;
+                        int docCount = 0;
+                        for (int i = 0; i < readers.length; i++) {
+                            docCount += readers[i].numDocs();
+                        }
+                        log.info("merged " + docCount + " documents in " + time + " ms into " + index.getName() + ".");
+                    } finally {
+                        for (int i = 0; i < readers.length; i++) {
+                            try {
+                                Util.closeOrRelease(readers[i]);
+                            } catch (IOException e) {
+                                log.warn("Unable to close IndexReader: " + e);
+                            }
+                        }
+                    }
+
+                    // inform multi index
+                    // if we cannot get the sync immediately we have to quit
+                    if (!indexReplacement.attempt(0)) {
+                        log.debug("index merging canceled");
+                        break;
+                    }
+                    try {
+                        log.debug("replace indexes");
+                        multiIndex.replaceIndexes(names, index, deletedDocuments);
+                    } finally {
+                        indexReplacement.release();
+                    }
+
+                    success = true;
+
+                } finally {
+                    if (!success) {
+                        // delete index
+                        log.debug("deleting index " + index.getName());
+                        multiIndex.deleteIndex(index);
+                    }
+                }
+            } catch (Throwable e) {
+                log.error("Error while merging indexes: ", e);
+            }
         }
+        log.info("IndexMerger terminated");
     }
 
     //-----------------------< merge properties >-------------------------------
 
     /**
      * The merge factor.
-     *
-     * @param mergeFactor the merge factor.
      */
     public void setMergeFactor(int mergeFactor) {
         this.mergeFactor = mergeFactor;
@@ -291,8 +353,6 @@
 
     /**
      * The initial threshold for number of documents to merge to a new index.
-     *
-     * @param minMergeDocs the min merge docs number.
      */
     public void setMinMergeDocs(int minMergeDocs) {
         this.minMergeDocs = minMergeDocs;
@@ -300,8 +360,6 @@
 
     /**
      * The maximum number of document to merge.
-     *
-     * @param maxMergeDocs the max merge docs number.
      */
     public void setMaxMergeDocs(int maxMergeDocs) {
         this.maxMergeDocs = maxMergeDocs;
@@ -309,18 +367,6 @@
 
     //------------------------------< internal >--------------------------------
 
-    private void addMergeTask(Merge task) {
-        for (;;) {
-            try {
-                mergeTasks.put(task);
-                break;
-            } catch (InterruptedException e) {
-                // try again
-                Thread.interrupted();
-            }
-        }
-    }
-
     /**
      * Implements a simple struct that holds the name of an index and how
      * many document it contains. <code>Index</code> is comparable using the
@@ -402,8 +448,6 @@
      */
     private static final class IndexBucket extends ArrayList {
 
-        private static final long serialVersionUID = 2985514550083374904L;
-
         /**
          * The lower document limit.
          */
@@ -452,130 +496,4 @@
             return allowMerge;
         }
     }
-
-    private class Worker extends Thread implements IndexListener {
-
-        /**
-         * List of id <code>Term</code> that identify documents that were deleted
-         * while a merge was running.
-         */
-        private final List deletedDocuments = Collections.synchronizedList(new ArrayList());
-
-        public Worker() {
-            setName("IndexMerger.Worker");
-            setDaemon(true);
-        }
-
-        /**
-         * Implements the index merging.
-         */
-        public void run() {
-            for (;;) {
-                boolean isIdle = false;
-                if (mergeTasks.size() == 0) {
-                    synchronized (busyMergers) {
-                        busyMergers.remove(this);
-                        busyMergers.notifyAll();
-                    }
-                    isIdle = true;
-                }
-                Merge task;
-                for (;;) {
-                    try {
-                        task = (Merge) mergeTasks.take();
-                        break;
-                    } catch (InterruptedException e) {
-                        // try again
-                        Thread.interrupted();
-                    }
-                }
-                if (task == QUIT) {
-                    synchronized (busyMergers) {
-                        busyMergers.remove(this);
-                    }
-                    // put back QUIT to signal other workers
-                    addMergeTask(task);
-                    break;
-                }
-                if (isIdle) {
-                    synchronized (busyMergers) {
-                        busyMergers.add(this);
-                    }
-                }
-
-                log.debug("accepted merge request");
-
-                // reset deleted documents
-                deletedDocuments.clear();
-
-                // get readers
-                String[] names = new String[task.indexes.length];
-                for (int i = 0; i < task.indexes.length; i++) {
-                    names[i] = task.indexes[i].name;
-                }
-                try {
-                    log.debug("create new index");
-                    PersistentIndex index = multiIndex.getOrCreateIndex(null);
-                    boolean success = false;
-                    try {
-
-                        log.debug("get index readers from MultiIndex");
-                        IndexReader[] readers = multiIndex.getIndexReaders(names, IndexMerger.this);
-                        try {
-                            // do the merge
-                            long time = System.currentTimeMillis();
-                            index.addIndexes(readers);
-                            time = System.currentTimeMillis() - time;
-                            int docCount = 0;
-                            for (int i = 0; i < readers.length; i++) {
-                                docCount += readers[i].numDocs();
-                            }
-                            log.info("merged " + docCount + " documents in " + time + " ms into " + index.getName() + ".");
-                        } finally {
-                            for (int i = 0; i < readers.length; i++) {
-                                try {
-                                    Util.closeOrRelease(readers[i]);
-                                } catch (IOException e) {
-                                    log.warn("Unable to close IndexReader: " + e);
-                                }
-                            }
-                        }
-
-                        // inform multi index
-                        // if we cannot get the sync immediately we have to quit
-                        if (!indexReplacement.tryAcquire()) {
-                            log.debug("index merging canceled");
-                            break;
-                        }
-                        try {
-                            log.debug("replace indexes");
-                            multiIndex.replaceIndexes(names, index, deletedDocuments);
-                        } finally {
-                            indexReplacement.release();
-                        }
-
-                        success = true;
-
-                    } finally {
-                        if (!success) {
-                            // delete index
-                            log.debug("deleting index " + index.getName());
-                            multiIndex.deleteIndex(index);
-                        }
-                    }
-                } catch (Throwable e) {
-                    log.error("Error while merging indexes: ", e);
-                }
-            }
-            log.info("IndexMerger.Worker terminated");
-        }
-
-        /**
-         * @inheritDoc
-         */
-        public void documentDeleted(Term id) {
-            log.debug("document deleted: " + id.text());
-            deletedDocuments.add(id);
-        }
-    }
 }

Modified: jackrabbit/branches/1.x/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/MultiIndex.java
URL: http://svn.apache.org/viewvc/jackrabbit/branches/1.x/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/MultiIndex.java?rev=801913&r1=801912&r2=801913&view=diff
==============================================================================
--- jackrabbit/branches/1.x/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/MultiIndex.java (original)
+++ jackrabbit/branches/1.x/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/MultiIndex.java Fri Aug  7 08:47:40 2009
@@ -241,7 +241,7 @@
         removeDeletable();
 
         // initialize IndexMerger
-        merger = new IndexMerger(this, handler.getIndexMergerPoolSize());
+        merger = new IndexMerger(this);
         merger.setMaxMergeDocs(handler.getMaxMergeDocs());
         merger.setMergeFactor(handler.getMergeFactor());
         merger.setMinMergeDocs(handler.getMinMergeDocs());

Modified: jackrabbit/branches/1.x/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/SearchIndex.java
URL: http://svn.apache.org/viewvc/jackrabbit/branches/1.x/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/SearchIndex.java?rev=801913&r1=801912&r2=801913&view=diff
==============================================================================
--- jackrabbit/branches/1.x/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/SearchIndex.java (original)
+++ jackrabbit/branches/1.x/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/query/lucene/SearchIndex.java Fri Aug  7 08:47:40 2009
@@ -167,11 +167,6 @@
     public static final int DEFAULT_TERM_INFOS_INDEX_DIVISOR = 1;
 
     /**
-     * The default value for {@link #indexMergerPoolSize}.
-     */
-    public static final int DEFAULT_INDEX_MERGER_POOL_SIZE = 2;
-
-    /**
      * The path factory.
      */
     protected static final PathFactory PATH_FACTORY = PathFactoryImpl.getInstance();
@@ -445,11 +440,6 @@
     private boolean initializeHierarchyCache = true;
 
     /**
-     * The number of worker threads for merging index segments.
-     */
-    private int indexMergerPoolSize = DEFAULT_INDEX_MERGER_POOL_SIZE;
-
-    /**
      * Indicates if this <code>SearchIndex</code> is closed and cannot be used
      * anymore.
      */
@@ -708,8 +698,6 @@
     /**
      * This method returns the QueryNodeFactory used to parse Queries. This method
      * may be overridden to provide a customized QueryNodeFactory
-     *
-     * @return the query node factory.
      */
     protected DefaultQueryNodeFactory getQueryNodeFactory() {
         return DEFAULT_QUERY_NODE_FACTORY;
@@ -2155,26 +2143,6 @@
         this.initializeHierarchyCache = initializeHierarchyCache;
     }
 
-    /**
-     * @return the current size of the index merger pool.
-     */
-    public int getIndexMergerPoolSize() {
-        return indexMergerPoolSize;
-    }
-
-    /**
-     * Sets a new value for the index merger pool size.
-     *
-     * @param indexMergerPoolSize the number of worker threads.
-     * @throws IllegalArgumentException if the size is less than or equal 0.
-     */
-    public void setIndexMergerPoolSize(int indexMergerPoolSize) {
-        if (indexMergerPoolSize <= 0) {
-            throw new IllegalArgumentException("must be greater than 0");
-        }
-        this.indexMergerPoolSize = indexMergerPoolSize;
-    }
-
     //----------------------------< internal >----------------------------------
 
     /**