You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jackrabbit.apache.org by mr...@apache.org on 2005/08/29 16:07:24 UTC

svn commit: r264144 - in /incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene: AbstractIndex.java IndexListener.java IndexMerger.java MultiIndex.java PersistentIndex.java ReadOnlyIndexReader.java SearchIndex.java

Author: mreutegg
Date: Mon Aug 29 07:07:16 2005
New Revision: 264144

URL: http://svn.apache.org/viewcvs?rev=264144&view=rev
Log:
JCR-197: Index merging should run in a separate thread

Added:
    incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene/IndexListener.java   (with props)
    incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene/IndexMerger.java   (with props)
Modified:
    incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene/AbstractIndex.java
    incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene/MultiIndex.java
    incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene/PersistentIndex.java
    incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene/ReadOnlyIndexReader.java
    incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene/SearchIndex.java

Modified: incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene/AbstractIndex.java
URL: http://svn.apache.org/viewcvs/incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene/AbstractIndex.java?rev=264144&r1=264143&r2=264144&view=diff
==============================================================================
--- incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene/AbstractIndex.java (original)
+++ incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene/AbstractIndex.java Mon Aug 29 07:07:16 2005
@@ -171,7 +171,7 @@
      * @return a read-only index reader.
      * @throws IOException if an error occurs while obtaining the index reader.
      */
-    protected synchronized ReadOnlyIndexReader getReadOnlyIndexReader()
+    synchronized ReadOnlyIndexReader getReadOnlyIndexReader()
             throws IOException {
         // get current modifiable index reader
         IndexReader modifiableReader = getIndexReader();
@@ -190,7 +190,6 @@
         return new ReadOnlyIndexReader(sharedReader, deleted);
     }
 
-
     /**
      * Returns an <code>IndexWriter</code> on this index.
      * @return an <code>IndexWriter</code> on this index.
@@ -217,7 +216,18 @@
      * Commits all pending changes to the underlying <code>Directory</code>.
      * @throws IOException if an error occurs while commiting changes.
      */
-    protected synchronized void commit() throws IOException {
+    protected void commit() throws IOException {
+        commit(false);
+    }
+
+    /**
+     * Commits all pending changes to the underlying <code>Directory</code>.
+     *
+     * @param optimize if <code>true</code> the index is optimized after after
+     *                 the commit.
+     * @throws IOException if an error occurs while commiting changes.
+     */
+    protected synchronized void commit(boolean optimize) throws IOException {
         // if index is not locked there are no pending changes
         if (!IndexReader.isLocked(getDirectory())) {
             return;
@@ -230,6 +240,11 @@
             log.debug("committing IndexWriter.");
             indexWriter.close();
             indexWriter = null;
+        }
+        // optimize if requested
+        if (optimize) {
+            IndexWriter writer = getIndexWriter();
+            writer.optimize();
         }
     }
 

Added: incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene/IndexListener.java
URL: http://svn.apache.org/viewcvs/incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene/IndexListener.java?rev=264144&view=auto
==============================================================================
--- incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene/IndexListener.java (added)
+++ incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene/IndexListener.java Mon Aug 29 07:07:16 2005
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2004-2005 The Apache Software Foundation or its licensors,
+ *                     as applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.core.query.lucene;
+
+import org.apache.lucene.index.Term;
+
+/**
+ * Defines an interface that allows implementing classes to listen for index
+ * changes (namely document deletes) while using a {@link ReadOnlyIndexReader}.
+ */
+public interface IndexListener {
+
+    /**
+     * Informs this listener, that the document with <code>id</code> has been
+     * deleted.
+     *
+     * @param id the <code>Term</code> that identifies the deleted document.
+     */
+    public void documentDeleted(Term id);
+}

Propchange: incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene/IndexListener.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene/IndexMerger.java
URL: http://svn.apache.org/viewcvs/incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene/IndexMerger.java?rev=264144&view=auto
==============================================================================
--- incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene/IndexMerger.java (added)
+++ incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene/IndexMerger.java Mon Aug 29 07:07:16 2005
@@ -0,0 +1,462 @@
+/*
+ * Copyright 2004-2005 The Apache Software Foundation or its licensors,
+ *                     as applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.core.query.lucene;
+
+import org.apache.lucene.index.Term;
+import org.apache.lucene.index.IndexReader;
+import org.apache.commons.collections.Buffer;
+import org.apache.commons.collections.BufferUtils;
+import org.apache.commons.collections.buffer.UnboundedFifoBuffer;
+import org.apache.log4j.Logger;
+
+import java.util.List;
+import java.util.Collections;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.io.IOException;
+
+import EDU.oswego.cs.dl.util.concurrent.Sync;
+import EDU.oswego.cs.dl.util.concurrent.Mutex;
+
+/**
+ * Merges indexes in a separate deamon thread.
+ */
+class IndexMerger extends Thread implements IndexListener {
+
+    /**
+     * Logger instance for this class.
+     */
+    private static final Logger log = Logger.getLogger(IndexMerger.class);
+
+    /**
+     * Marker task to signal the background thread to quit.
+     */
+    private static final Merge QUIT = new Merge(new Index[0]);
+
+    /**
+     * minMergeDocs config parameter.
+     */
+    private int minMergeDocs = SearchIndex.DEFAULT_MIN_MERGE_DOCS;
+
+    /**
+     * maxMergeDocs config parameter
+     */
+    private int maxMergeDocs = SearchIndex.DEFAULT_MAX_MERGE_DOCS;
+
+    /**
+     * mergeFactor config parameter
+     */
+    private int mergeFactor = SearchIndex.DEFAULT_MERGE_FACTOR;
+
+    /**
+     * Queue of merge Tasks
+     */
+    private final Buffer mergeTasks = BufferUtils.blockingBuffer(new UnboundedFifoBuffer());
+
+    /**
+     * List of id <code>Term</code> that identify documents that were deleted
+     * while a merge was running.
+     */
+    private final List deletedDocuments = Collections.synchronizedList(new ArrayList());
+
+    /**
+     * List of <code>IndexBucket</code>s in ascending document limit.
+     */
+    private final List indexBuckets = new ArrayList();
+
+    /**
+     * The <code>MultiIndex</code> this index merger is working on.
+     */
+    private final MultiIndex multiIndex;
+
+    /**
+     * Monitor object to synchronize merge calculation.
+     */
+    private final Object lock = new Object();
+
+    /**
+     * Mutex that is acquired when replacing indexes on MultiIndex.
+     */
+    private final Sync indexReplacement = new Mutex();
+
+    /**
+     * Creates an <code>IndexMerger</code>.
+     *
+     * @param multiIndex the <code>MultiIndex</code>.
+     */
+    IndexMerger(MultiIndex multiIndex) {
+        this.multiIndex = multiIndex;
+        setName("IndexMerger");
+        setDaemon(true);
+    }
+
+    /**
+     * Informs the index merger that an index was added / created.
+     *
+     * @param name the name of the index.
+     * @param numDocs the number of documents it contains.
+     */
+    void indexAdded(String name, int numDocs) {
+        if (numDocs < 0) {
+            throw new IllegalArgumentException("numDocs must be positive");
+        }
+        // multiple threads may enter this method:
+        // - the background thread of this IndexMerger, when it replaces indexes
+        //   after a successful merge
+        // - a regular thread that updates the workspace
+        //
+        // therefore we have to synchronize this block
+        synchronized (lock) {
+            // initially create buckets
+            if (indexBuckets.size() == 0) {
+                int lower = 0;
+                int upper = minMergeDocs;
+                while (upper < maxMergeDocs) {
+                    indexBuckets.add(new IndexBucket(lower, upper, true));
+                    lower = upper + 1;
+                    upper *= mergeFactor;
+                }
+                // one with upper = maxMergeDocs
+                indexBuckets.add(new IndexBucket(lower, maxMergeDocs, false));
+                // and another one as overflow, just in case...
+                indexBuckets.add(new IndexBucket(maxMergeDocs + 1, Integer.MAX_VALUE, false));
+            }
+
+            // put index in bucket
+            IndexBucket bucket = (IndexBucket) indexBuckets.get(indexBuckets.size() - 1);
+            for (int i = 0; i < indexBuckets.size(); i++) {
+                bucket = (IndexBucket) indexBuckets.get(i);
+                if (bucket.fits(numDocs)) {
+                    break;
+                }
+            }
+            bucket.add(new Index(name, numDocs));
+
+            if (log.isDebugEnabled()) {
+                log.debug("index added: name=" + name + ", numDocs=" + numDocs);
+            }
+
+            // if bucket does not allow merge, we don't have to continue
+            if (!bucket.allowsMerge()) {
+                return;
+            }
+
+            // check if we need a merge
+            if (bucket.size() >= mergeFactor) {
+                long targetMergeDocs = bucket.upper;
+                targetMergeDocs = Math.min(targetMergeDocs * mergeFactor, maxMergeDocs);
+                // sum up docs in bucket
+                List indexesToMerge = new ArrayList();
+                int mergeDocs = 0;
+                for (Iterator it = bucket.iterator(); it.hasNext() && mergeDocs <= targetMergeDocs; ) {
+                    indexesToMerge.add(it.next());
+                }
+                if (indexesToMerge.size() > 2) {
+                    // found merge
+                    Index[] idxs = (Index[]) indexesToMerge.toArray(new Index[indexesToMerge.size()]);
+                    bucket.removeAll(indexesToMerge);
+                    if (log.isDebugEnabled()) {
+                        log.debug("requesting merge for " + indexesToMerge);
+                    }
+                    mergeTasks.add(new Merge(idxs));
+                    log.debug("merge queue now contains " + mergeTasks.size() + " tasks.");
+                }
+            }
+        }
+    }
+
+    /**
+     * @inheritDoc
+     */
+    public void documentDeleted(Term id) {
+        log.debug("document deleted: " + id.text());
+        deletedDocuments.add(id);
+    }
+
+    /**
+     * Signals this <code>IndexMerger</code> to stop and waits until it
+     * has terminated.
+     */
+    void dispose() {
+        log.debug("dispose IndexMerger");
+        // get mutex for index replacements
+        try {
+            indexReplacement.acquire();
+        } catch (InterruptedException e) {
+            log.warn("Interrupted while acquiring index replacement sync: " + e);
+            // try to stop IndexMerger without the sync
+        }
+
+        // clear task queue
+        mergeTasks.clear();
+
+        // send quit
+        mergeTasks.add(QUIT);
+        log.debug("quit sent");
+
+        try {
+            // give the merger thread some time to quit,
+            // it is possible that the merger is busy working on a large index.
+            // if that is the case we will just ignore it and the deamon will
+            // die without being able to finish the merge.
+            // at this point it is not possible anymore to replace indexes
+            // on the MultiIndex because we hold the indexReplacement Sync.
+            this.join(500);
+            if (isAlive()) {
+                log.info("Unable to stop IndexMerger. Deamon is busy.");
+            } else {
+                log.debug("IndexMerger thread stopped");
+            }
+            log.debug("merge queue size: " + mergeTasks.size());
+        } catch (InterruptedException e) {
+            log.warn("Interrupted while waiting for IndexMerger thread to terminate.");
+        }
+    }
+
+    /**
+     * Implements the index merging.
+     */
+    public void run() {
+        for (;;) {
+            Merge task = (Merge) mergeTasks.remove();
+            if (task == QUIT) {
+                break;
+            }
+
+            log.debug("accepted merge request");
+
+            // reset deleted documents
+            deletedDocuments.clear();
+
+            // get readers
+            String[] names = new String[task.indexes.length];
+            for (int i = 0; i < task.indexes.length; i++) {
+                names[i] = task.indexes[i].name;
+            }
+            try {
+                log.debug("create new index");
+                PersistentIndex index = multiIndex.createIndex();
+                boolean success = false;
+                try {
+
+                    log.debug("get index readers from MultiIndex");
+                    IndexReader[] readers = multiIndex.getIndexReaders(names, this);
+                    try {
+                        // do the merge
+                        long time = System.currentTimeMillis();
+                        index.addIndexes(readers);
+                        time = System.currentTimeMillis() - time;
+                        int docCount = 0;
+                        for (int i = 0; i < readers.length; i++) {
+                            docCount += readers[i].numDocs();
+                        }
+                        log.info("merged " + docCount + " documents in " + time + " ms into " + index.getName() + ".");
+                    } finally {
+                        for (int i = 0; i < readers.length; i++) {
+                            try {
+                                readers[i].close();
+                            } catch (IOException e) {
+                                log.warn("Unable to close IndexReader: " + e);
+                            }
+                        }
+                    }
+
+                    // inform multi index
+                    // if we cannot get the sync immediately we have to quit
+                    if (!indexReplacement.attempt(0)) {
+                        log.debug("index merging canceled");
+                        break;
+                    }
+                    try {
+                        log.debug("replace indexes");
+                        multiIndex.replaceIndexes(names, index, deletedDocuments);
+                    } finally {
+                        indexReplacement.release();
+                    }
+
+                    success = true;
+
+                } finally {
+                    if (!success) {
+                        // delete index
+                        log.debug("deleting index " + index.getName());
+                        multiIndex.deleteIndex(index);
+                    }
+                }
+            } catch (Throwable e) {
+                log.error("Error while merging indexes: " + e);
+            }
+        }
+        log.info("IndexMerger terminated");
+    }
+
+    //-----------------------< merge properties >-------------------------------
+
+    /**
+     * The merge factor.
+     */
+    public void setMergeFactor(int mergeFactor) {
+        this.mergeFactor = mergeFactor;
+    }
+
+
+    /**
+     * The initial threshold for number of documents to merge to a new index.
+     */
+    public void setMinMergeDocs(int minMergeDocs) {
+        this.minMergeDocs = minMergeDocs;
+    }
+
+    /**
+     * The maximum number of document to merge.
+     */
+    public void setMaxMergeDocs(int maxMergeDocs) {
+        this.maxMergeDocs = maxMergeDocs;
+    }
+
+    //------------------------------< internal >--------------------------------
+
+    /**
+     * Implements a simple struct that holds the name of an index and how
+     * many document it contains. <code>Index</code> is comparable using the
+     * number of documents it contains.
+     */
+    private static final class Index implements Comparable {
+
+        /**
+         * The name of the index.
+         */
+        final String name;
+
+        /**
+         * The number of documents the index contains.
+         */
+        final int numDocs;
+
+        /**
+         * Creates a new index struct.
+         *
+         * @param name name of an index.
+         * @param numDocs number of documents it contains.
+         */
+        Index(String name, int numDocs) {
+            this.name = name;
+            this.numDocs = numDocs;
+        }
+
+        /**
+         * Indexes are first ordered by {@link #numDocs} and then by {@link
+         * #name}.
+         *
+         * @param o the other <code>Index</code>.
+         * @return a negative integer, zero, or a positive integer as this
+         *         Index is less than, equal to, or greater than the specified
+         *         Index.
+         */
+        public int compareTo(Object o) {
+            Index other = (Index) o;
+            int val = numDocs < other.numDocs ? -1 : (numDocs == other.numDocs ? 0 : 1);
+            if (val != 0) {
+                return val;
+            } else {
+                return name.compareTo(other.name);
+            }
+        }
+
+        /**
+         * @inheritDoc
+         */
+        public String toString() {
+            return name + ":" + numDocs;
+        }
+    }
+
+    /**
+     * Defines a merge task, to merge a couple of indexes into a new index.
+     */
+    private static final class Merge {
+
+        final Index[] indexes;
+
+        /**
+         * Merge task, to merge <code>indexes</code> into a new index with
+         * <code>name</code>.
+         *
+         * @param indexes the indexes to merge.
+         */
+        Merge(Index[] indexes) {
+            this.indexes = new Index[indexes.length];
+            System.arraycopy(indexes, 0, this.indexes, 0, indexes.length);
+        }
+    }
+
+    /**
+     * Implements a <code>List</code> with a document limit value. An
+     * <code>IndexBucket</code> contains {@link Index}es with documents less
+     * or equal the document limit of the bucket.
+     */
+    private static final class IndexBucket extends ArrayList {
+
+        /**
+         * The lower document limit.
+         */
+        private final int lower;
+
+        /**
+         * The upper document limit.
+         */
+        private final int upper;
+
+        /**
+         * Flag indicating if indexes in this bucket can be merged.
+         */
+        private final boolean allowMerge;
+
+        /**
+         * Creates a new <code>IndexBucket</code>. Limits are both inclusive.
+         *
+         * @param lower document limit.
+         * @param upper document limit.
+         * @param allowMerge if indexes in this bucket can be merged.
+         */
+        IndexBucket(int lower, int upper, boolean allowMerge) {
+            this.lower = lower;
+            this.upper = upper;
+            this.allowMerge = allowMerge;
+        }
+
+        /**
+         * Returns <code>true</code> if the number of documents fit in this
+         * <code>IndexBucket</code>; otherwise <code>false</code>
+         *
+         * @param numDocs the number of documents.
+         * @return <code>true</code> if <code>numDocs</code> fit.
+         */
+        boolean fits(int numDocs) {
+            return numDocs >= lower && numDocs <= upper;
+        }
+
+        /**
+         * Returns <code>true</code> if indexes in this bucket can be merged.
+         *
+         * @return <code>true</code> if indexes in this bucket can be merged.
+         */
+        boolean allowsMerge() {
+            return allowMerge;
+        }
+    }
+}

Propchange: incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene/IndexMerger.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene/MultiIndex.java
URL: http://svn.apache.org/viewcvs/incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene/MultiIndex.java?rev=264144&r1=264143&r2=264144&view=diff
==============================================================================
--- incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene/MultiIndex.java (original)
+++ incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene/MultiIndex.java Mon Aug 29 07:07:16 2005
@@ -36,6 +36,11 @@
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.Arrays;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Collection;
 
 /**
  * A <code>MultiIndex</code> consists of a {@link VolatileIndex} and multiple
@@ -55,6 +60,13 @@
  * {@link SearchIndex#setMergeFactor(int)} and {@link SearchIndex#setMinMergeDocs(int)}. For detailed
  * description of the configuration parameters see also the lucene
  * <code>IndexWriter</code> class.
+ * <p/>
+ * This class is thread-safe.
+ * <p/>
+ * Note on implementation: Multiple modifying threads are synchronized on a
+ * <code>MultiIndex</code> instance itself. Sychronization between a modifying
+ * thread and reader threads is done using {@link #updateMonitor} and
+ * {@link #updateInProgress}.
  */
 class MultiIndex {
 
@@ -136,12 +148,16 @@
     private boolean redoLogApplied = false;
 
     /**
-     * The last time this index was modified. That is, a document was added
-     * or removed.
+     * The last time this index was modified. That is, a document was added.
      */
     private long lastModificationTime;
 
     /**
+     * The <code>IndexMerger</code> for this <code>MultiIndex</code>.
+     */
+    private final IndexMerger merger;
+
+    /**
      * Timer to schedule commits of the volatile index after some idle time.
      */
     private final Timer commitTimer = new Timer(true);
@@ -180,6 +196,12 @@
         File mapFile = new File(indexDir, NS_MAPPING_FILE);
         nsMappings = new NamespaceMappings(mapFile);
 
+        // initialize IndexMerger
+        merger = new IndexMerger(this);
+        merger.setMaxMergeDocs(handler.getMaxMergeDocs());
+        merger.setMergeFactor(handler.getMergeFactor());
+        merger.setMinMergeDocs(handler.getMinMergeDocs());
+
         try {
             // open persistent indexes
             for (int i = 0; i < indexNames.size(); i++) {
@@ -194,6 +216,7 @@
                 index.setMinMergeDocs(handler.getMinMergeDocs());
                 index.setUseCompoundFile(handler.getUseCompoundFile());
                 indexes.add(index);
+                merger.indexAdded(index.getName(), index.getNumDocuments());
             }
 
             // create volatile index and check / apply redo log
@@ -222,16 +245,18 @@
                         deleteNodePersistent(entry.uuid);
                     }
                 }
-                maybeMergeIndexes();
                 log.warn("Redo changes applied.");
                 redoLog.clear();
                 redoLogApplied = true;
             }
 
             volatileIndex = new VolatileIndex(handler.getAnalyzer(), redoLog);
-            volatileIndex.setUseCompoundFile(false);
+            volatileIndex.setUseCompoundFile(handler.getUseCompoundFile());
             volatileIndex.setBufferSize(handler.getBufferSize());
 
+            // now that we are ready, start index merger
+            merger.start();
+
             if (doInitialIndex) {
                 // index root node
                 NodeState rootState = (NodeState) stateMgr.getItemState(new NodeId(rootUUID));
@@ -344,6 +369,125 @@
     }
 
     /**
+     * Returns <code>IndexReader</code>s for the indexes named
+     * <code>indexNames</code>. An <code>IndexListener</code> is registered and
+     * notified when documents are deleted from one of the indexes in
+     * <code>indexNames</code>.
+     * <p/>
+     * Note: the number of <code>IndexReaders</code> returned by this method is
+     * not necessarily the same as the number of index names passed. An index
+     * might have been deleted and is not reachable anymore.
+     *
+     * @param indexNames the names of the indexes for which to obtain readers.
+     * @param listener   the listener to notify when documents are deleted.
+     * @return the <code>IndexReaders</code>.
+     * @throws IOException if an error occurs acquiring the index readers.
+     */
+    synchronized IndexReader[] getIndexReaders(String[] indexNames, IndexListener listener)
+            throws IOException {
+        Set names = new HashSet(Arrays.asList(indexNames));
+        Map indexReaders = new HashMap();
+
+        try {
+            for (Iterator it = indexes.iterator(); it.hasNext(); ) {
+                PersistentIndex index = (PersistentIndex) it.next();
+                if (names.contains(index.getName())) {
+                    indexReaders.put(index.getReadOnlyIndexReader(listener), index);
+                }
+            }
+        } catch (IOException e) {
+            // close readers obtained so far
+            for (Iterator it = indexReaders.keySet().iterator(); it.hasNext(); ) {
+                ReadOnlyIndexReader reader = (ReadOnlyIndexReader) it.next();
+                try {
+                    reader.close();
+                } catch (IOException ex) {
+                    log.warn("Exception closing index reader: " + ex);
+                }
+                ((PersistentIndex) indexReaders.get(reader)).resetListener();
+            }
+            throw e;
+        }
+
+        return (IndexReader[]) indexReaders.keySet().toArray(new IndexReader[indexReaders.size()]);
+    }
+
+    /**
+     * Creates a new Persistent index. The new index is not registered with this
+     * <code>MultiIndex</code>.
+     *
+     * @return a new <code>PersistentIndex</code>.
+     * @throws IOException if a new index cannot be created.
+     */
+    synchronized PersistentIndex createIndex() throws IOException {
+        File sub = newIndexFolder();
+        String name = sub.getName();
+        PersistentIndex index = new PersistentIndex(name, sub, true,
+                handler.getAnalyzer(), cache);
+        index.setMaxMergeDocs(handler.getMaxMergeDocs());
+        index.setMergeFactor(handler.getMergeFactor());
+        index.setMinMergeDocs(handler.getMinMergeDocs());
+        index.setUseCompoundFile(handler.getUseCompoundFile());
+        return index;
+    }
+
+    /**
+     * Replaces the indexes with names <code>obsoleteIndexes</code> with
+     * <code>index</code>. Documents that must be deleted in <code>index</code>
+     * can be identified with <code>Term</code>s in <code>deleted</code>.
+     *
+     * @param obsoleteIndexes the names of the indexes to replace.
+     * @param index      the new index that is the result of a merge of the
+     *                   indexes to replace.
+     * @param deleted    <code>Term</code>s that identify documents that must be
+     *                   deleted in <code>index</code>.
+     * @throws IOException if an exception occurs while replacing the indexes.
+     */
+    synchronized void replaceIndexes(String[] obsoleteIndexes,
+                                     PersistentIndex index,
+                                     Collection deleted)
+            throws IOException {
+        Set names = new HashSet(Arrays.asList(obsoleteIndexes));
+        // delete documents in index
+        for (Iterator it = deleted.iterator(); it.hasNext(); ) {
+            Term id = (Term) it.next();
+            int del = index.removeDocument(id);
+            log.error("deleted " + del + " document for id: " + id.text());
+        }
+        index.commit();
+
+        // now replace indexes
+        synchronized (updateMonitor) {
+            updateInProgress = true;
+        }
+        try {
+            for (Iterator it = indexes.iterator(); it.hasNext(); ) {
+                PersistentIndex idx = (PersistentIndex) it.next();
+                if (names.contains(idx.getName())) {
+                    it.remove();
+                    indexNames.removeName(idx.getName());
+                    idx.close();
+                    deleteIndex(idx);
+                }
+            }
+            // add new
+            indexes.add(index);
+            indexNames.addName(index.getName());
+            merger.indexAdded(index.getName(), index.getNumDocuments());
+            indexNames.write(indexDir);
+        } finally {
+            synchronized (updateMonitor) {
+                updateInProgress = false;
+                updateMonitor.notifyAll();
+                if (multiReader != null) {
+                    multiReader.close();
+                    multiReader = null;
+                }
+            }
+        }
+    }
+
+    /**
      * Returns an read-only <code>IndexReader</code> that spans alls indexes of this
      * <code>MultiIndex</code>.
      *
@@ -383,29 +527,37 @@
     /**
      * Closes this <code>MultiIndex</code>.
      */
-    synchronized void close() {
-        // stop timer
-        commitTimer.cancel();
+    void close() {
+
+        // stop index merger
+        // when calling this method we must not lock this MultiIndex, otherwise
+        // a deadlock might occur
+        merger.dispose();
+
+        synchronized (this) {
+            // stop timer
+            commitTimer.cancel();
 
-        // commit / close indexes
-        if (multiReader != null) {
+            // commit / close indexes
+            if (multiReader != null) {
+                try {
+                    multiReader.close();
+                } catch (IOException e) {
+                    log.error("Exception while closing search index.", e);
+                }
+                multiReader = null;
+            }
             try {
-                multiReader.close();
+                if (volatileIndex.getRedoLog().hasEntries()) {
+                    commit();
+                }
             } catch (IOException e) {
                 log.error("Exception while closing search index.", e);
             }
-            multiReader = null;
-        }
-        try {
-            if (volatileIndex.getRedoLog().hasEntries()) {
-                commit();
+            volatileIndex.close();
+            for (int i = 0; i < indexes.size(); i++) {
+                ((PersistentIndex) indexes.get(i)).close();
             }
-        } catch (IOException e) {
-            log.error("Exception while closing search index.", e);
-        }
-        volatileIndex.close();
-        for (int i = 0; i < indexes.size(); i++) {
-            ((PersistentIndex) indexes.get(i)).close();
         }
     }
 
@@ -437,6 +589,29 @@
         return redoLogApplied;
     }
 
+    /**
+     * Deletes the <code>index</code>. If the index directory cannot be removed
+     * because (windows) file handles are still open, the directory is marked
+     * for future deletion.
+     * <p/>
+     * This method does not close the index, but rather expects that the index
+     * has already been closed.
+     *
+     * @param index the index to delete.
+     */
+    void deleteIndex(PersistentIndex index) {
+        File dir = new File(indexDir, index.getName());
+        if (!deleteIndex(dir)) {
+            // try again later
+            deletable.addName(index.getName());
+        }
+        try {
+            deletable.write(indexDir);
+        } catch (IOException e) {
+            log.warn("Exception while writing deletable indexes: " + e);
+        }
+    }
+
     //-------------------------< internal >-------------------------------------
 
     /**
@@ -502,21 +677,32 @@
             index.setMergeFactor(handler.getMergeFactor());
             index.setMinMergeDocs(handler.getMinMergeDocs());
             index.setUseCompoundFile(handler.getUseCompoundFile());
-            index.mergeIndex(volatileIndex);
+            index.copyIndex(volatileIndex);
 
             // if merge has been successful add index
             indexes.add(index);
             indexNames.addName(name);
             indexNames.write(indexDir);
 
+            merger.indexAdded(index.getName(), index.getNumDocuments());
+
             // check if obsolete indexes can be deleted
             // todo move to other place?
             attemptDelete();
         }
 
         // commit persistent indexes
-        for (int i = 0; i < indexes.size(); i++) {
-            ((PersistentIndex) indexes.get(i)).commit();
+        for (int i = indexes.size() - 1; i >= 0; i--) {
+            PersistentIndex index = (PersistentIndex) indexes.get(i);
+            index.commit();
+            // check if index still contains documents
+            if (index.getNumDocuments() == 0) {
+                indexes.remove(i);
+                indexNames.removeName(index.getName());
+                indexNames.write(indexDir);
+                index.close();
+                deleteIndex(index);
+            }
         }
 
         // reset redo log
@@ -524,10 +710,9 @@
 
         // create new volatile index
         volatileIndex = new VolatileIndex(handler.getAnalyzer(), volatileIndex.getRedoLog());
-        volatileIndex.setUseCompoundFile(false);
+        volatileIndex.setUseCompoundFile(handler.getUseCompoundFile());
         volatileIndex.setBufferSize(handler.getBufferSize());
 
-        maybeMergeIndexes();
     }
 
     /**
@@ -609,106 +794,6 @@
                 break;
             }
         }
-    }
-
-    /**
-     * Merges multiple persistent index into a single one according to the
-     * properties: {@link SearchIndex#setMaxMergeDocs(int)}, {@link
-     * SearchIndex#setMergeFactor(int)} and {@link SearchIndex#setMinMergeDocs(int)}.
-     *
-     * @throws IOException if an error occurs during the merge.
-     */
-    private void maybeMergeIndexes() throws IOException {
-        // remove unused indexes
-        for (int i = indexes.size() - 1; i >= 0; i--) {
-            PersistentIndex index = (PersistentIndex) indexes.get(i);
-            if (!index.hasDocuments()) {
-                indexes.remove(i);
-                indexNames.removeName(index.getName());
-                indexNames.write(indexDir);
-                index.close();
-                File dir = new File(indexDir, index.getName());
-                if (!deleteIndex(dir)) {
-                    // try again later
-                    deletable.addName(index.getName());
-                    deletable.write(indexDir);
-                }
-            }
-        }
-
-        // only check for merge if there are more than mergeFactor indexes
-        if (indexes.size() >= handler.getMergeFactor()) {
-            long targetMergeDocs = handler.getMinMergeDocs();
-            while (targetMergeDocs <= handler.getMaxMergeDocs()) {
-                // find index smaller or equal than current target size
-                int minIndex = indexes.size();
-                int mergeDocs = 0;
-                while (--minIndex >= 0) {
-                    PersistentIndex index = (PersistentIndex) indexes.get(minIndex);
-                    int numDocs = index.getIndexReader().numDocs();
-                    if (numDocs > targetMergeDocs) {
-                        break;
-                    }
-                    mergeDocs += numDocs;
-                }
-
-                if (indexes.size() - (minIndex + 1) >= handler.getMergeFactor()
-                        && mergeDocs < handler.getMaxMergeDocs()) {
-                    // found a merge to do
-                    mergeIndex(minIndex + 1);
-                } else {
-                    break;
-                }
-                // increase target size
-                targetMergeDocs *= handler.getMergeFactor();
-            }
-        }
-    }
-
-    /**
-     * Merges indexes <code>indexes.get(i)</code> to <code>indexes.get(indexes.size()
-     * - 1)</code> into a new persistent index.
-     *
-     * @param min the min position inside the indexes list.
-     * @throws IOException if an error occurs while merging.
-     */
-    private void mergeIndex(int min) throws IOException {
-        // create new index
-        File sub = newIndexFolder();
-        String name = sub.getName();
-        PersistentIndex index = new PersistentIndex(name, sub, true,
-                handler.getAnalyzer(), cache);
-        index.setMaxMergeDocs(handler.getMaxMergeDocs());
-        index.setMergeFactor(handler.getMergeFactor());
-        index.setMinMergeDocs(handler.getMinMergeDocs());
-        index.setUseCompoundFile(handler.getUseCompoundFile());
-
-        // the indexes to merge
-        List toMerge = indexes.subList(min, indexes.size());
-        IndexReader[] readers = new IndexReader[toMerge.size()];
-        for (int i = 0; i < toMerge.size(); i++) {
-            readers[i] = ((PersistentIndex) toMerge.get(i)).getIndexReader();
-        }
-        // do the merge
-        index.getIndexWriter().addIndexes(readers);
-        index.getIndexWriter().optimize();
-        // close and remove obsolete indexes
-
-        for (int i = indexes.size() - 1; i >= min; i--) {
-            PersistentIndex pi = (PersistentIndex) indexes.get(i);
-            pi.close();
-            File dir = new File(indexDir, pi.getName());
-            if (!deleteIndex(dir)) {
-                // try again later
-                deletable.addName(pi.getName());
-            }
-            indexNames.removeName(pi.getName());
-            indexes.remove(i);
-        }
-        indexNames.addName(name);
-        indexes.add(index);
-        indexNames.write(indexDir);
-        deletable.write(indexDir);
     }
 
     /**

Modified: incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene/PersistentIndex.java
URL: http://svn.apache.org/viewcvs/incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene/PersistentIndex.java?rev=264144&r1=264143&r2=264144&view=diff
==============================================================================
--- incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene/PersistentIndex.java (original)
+++ incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene/PersistentIndex.java Mon Aug 29 07:07:16 2005
@@ -20,7 +20,10 @@
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.Term;
 import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.InputStream;
+import org.apache.lucene.store.OutputStream;
 
 import java.io.IOException;
 import java.io.File;
@@ -47,6 +50,12 @@
     private boolean lockEncountered = false;
 
     /**
+     * If non <code>null</code>, <code>listener</code> needs to be informed
+     * when a document is deleted.
+     */
+    private IndexListener listener;
+
+    /**
      * Creates a new <code>PersistentIndex</code> based on the file system
      * <code>indexDir</code>.
      * @param name the name of this index.
@@ -86,6 +95,17 @@
     }
 
     /**
+     * @inheritDoc
+     */
+    int removeDocument(Term idTerm) throws IOException {
+        int num = super.removeDocument(idTerm);
+        if (num > 0 && listener != null) {
+            listener.documentDeleted(idTerm);
+        }
+        return num;
+    }
+
+    /**
      * Returns <code>true</code> if this index encountered a lock on the file
      * system during startup. This indicates a unclean shutdown.
      *
@@ -115,23 +135,87 @@
     }
 
     /**
-     * Returns <code>true</code> if this index has valid documents. Returns
-     * <code>false</code> if all documents are deleted, or the index does not
-     * contain any documents.
-     * @return
-     * @throws IOException
-     */
-    boolean hasDocuments() throws IOException {
-        if (getIndexReader().numDocs() == 0) {
-            return false;
-        }
-        IndexReader reader = getIndexReader();
-        for (int i = 0; i < reader.maxDoc(); i++) {
-            if (!reader.isDeleted(i)) {
-                return true;
+     * Merges the provided indexes into this index. After this completes, the
+     * index is optimized.
+     * <p/>
+     * The provided IndexReaders are not closed.
+     *
+     * @param readers the readers of indexes to add.
+     * @throws IOException if an error occurs while adding indexes.
+     */
+    void addIndexes(IndexReader[] readers) throws IOException {
+        getIndexWriter().addIndexes(readers);
+        getIndexWriter().optimize();
     }
+
+    /**
+     * Copies <code>index</code> into this persistent index. This method should
+     * only be called when <code>this</code> index is empty otherwise the
+     * behaviour is undefined.
+     *
+     * @param index the index to copy from.
+     * @throws IOException if an error occurs while copying.
+     */
+    void copyIndex(AbstractIndex index) throws IOException {
+        // commit changes to directory on other index.
+        index.commit(true);
+        // simply copy over the files
+        byte[] buffer = new byte[1024];
+        Directory dir = index.getDirectory();
+        Directory dest = getDirectory();
+        String[] files = dir.list();
+        for (int i = 0; i < files.length; i++) {
+            InputStream in = dir.openFile(files[i]);
+            try {
+                OutputStream out = dest.createFile(files[i]);
+                try {
+                    long remaining = in.length();
+                    while (remaining > 0) {
+                        int num = (int) Math.min(remaining, buffer.length);
+                        in.readBytes(buffer, 0, num);
+                        out.writeBytes(buffer, num);
+                        remaining -= num;
+                    }
+                } finally {
+                    out.close();
+                }
+            } finally {
+                in.close();
+            }
         }
-        return false;
+    }
+
+    /**
+     * Returns a <code>ReadOnlyIndexReader</code> and registeres
+     * <code>listener</code> to send notifications when documents are deleted on
+     * <code>this</code> index.
+     *
+     * @param listener the listener to notify when documents are deleted.
+     * @return a <code>ReadOnlyIndexReader</code>.
+     * @throws IOException if the reader cannot be obtained.
+     */
+    synchronized ReadOnlyIndexReader getReadOnlyIndexReader(IndexListener listener)
+            throws IOException {
+        ReadOnlyIndexReader reader = getReadOnlyIndexReader();
+        this.listener = listener;
+        return reader;
+    }
+
+    /**
+     * Removes a potentially registered {@link IndexListener}.
+     */
+    synchronized void resetListener() {
+        this.listener = null;
+    }
+
+    /**
+     * Returns the number of documents in this persistent index.
+     *
+     * @return the number of documents in this persistent index.
+     * @throws IOException if an error occurs while reading from the index.
+     */
+    int getNumDocuments() throws IOException {
+        return getIndexReader().numDocs();
     }
 
     /**

Modified: incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene/ReadOnlyIndexReader.java
URL: http://svn.apache.org/viewcvs/incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene/ReadOnlyIndexReader.java?rev=264144&r1=264143&r2=264144&view=diff
==============================================================================
--- incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene/ReadOnlyIndexReader.java (original)
+++ incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene/ReadOnlyIndexReader.java Mon Aug 29 07:07:16 2005
@@ -92,6 +92,24 @@
     }
 
     /**
+     * Returns <code>true</code> if any documents have been deleted.
+     *
+     * @return <code>true</code> if any documents have been deleted.
+     */
+    public boolean hasDeletions() {
+        return !deleted.isEmpty();
+    }
+
+    /**
+     * Returns the number of documents in this index reader.
+     *
+     * @return the number of documents in this index reader.
+     */
+    public int numDocs() {
+        return maxDoc() - deleted.cardinality();
+    }
+
+    /**
      * @exception UnsupportedOperationException always
      */
     final protected void doDelete(int docNum) {

Modified: incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene/SearchIndex.java
URL: http://svn.apache.org/viewcvs/incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene/SearchIndex.java?rev=264144&r1=264143&r2=264144&view=diff
==============================================================================
--- incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene/SearchIndex.java (original)
+++ incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene/SearchIndex.java Mon Aug 29 07:07:16 2005
@@ -55,6 +55,21 @@
     private static final Logger log = Logger.getLogger(SearchIndex.class);
 
     /**
+     * The default value for property {@link #minMergeDocs}.
+     */
+    public static final int DEFAULT_MIN_MERGE_DOCS = 100;
+
+    /**
+     * The default value for property {@link #maxMergeDocs}.
+     */
+    public static final int DEFAULT_MAX_MERGE_DOCS = 100000;
+
+    /**
+     * the default value for property {@link #mergeFactor}.
+     */
+    public static final int DEFAULT_MERGE_FACTOR = 10;
+
+    /**
      * The actual index
      */
     private MultiIndex index;
@@ -74,7 +89,7 @@
     /**
      * minMergeDocs config parameter.
      */
-    private int minMergeDocs = 100;
+    private int minMergeDocs = DEFAULT_MIN_MERGE_DOCS;
 
     /**
      * volatileIdleTime config parameter.
@@ -84,12 +99,12 @@
     /**
      * maxMergeDocs config parameter
      */
-    private int maxMergeDocs = 100000;
+    private int maxMergeDocs = DEFAULT_MAX_MERGE_DOCS;
 
     /**
      * mergeFactor config parameter
      */
-    private int mergeFactor = 10;
+    private int mergeFactor = DEFAULT_MERGE_FACTOR;
 
     /**
      * Number of documents that are buffered before they are added to the index.
@@ -262,9 +277,9 @@
      * to this handler.
      */
     public void close() {
-        log.info("Closing search index.");
         index.close();
         getContext().destroy();
+        log.info("Search index closed.");
     }
 
     /**