You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jackrabbit.apache.org by mr...@apache.org on 2005/08/29 16:07:24 UTC
svn commit: r264144 - in
/incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene:
AbstractIndex.java IndexListener.java IndexMerger.java MultiIndex.java
PersistentIndex.java ReadOnlyIndexReader.java SearchIndex.java
Author: mreutegg
Date: Mon Aug 29 07:07:16 2005
New Revision: 264144
URL: http://svn.apache.org/viewcvs?rev=264144&view=rev
Log:
JCR-197: Index merging should run in a separate thread
Added:
incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene/IndexListener.java (with props)
incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene/IndexMerger.java (with props)
Modified:
incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene/AbstractIndex.java
incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene/MultiIndex.java
incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene/PersistentIndex.java
incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene/ReadOnlyIndexReader.java
incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene/SearchIndex.java
Modified: incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene/AbstractIndex.java
URL: http://svn.apache.org/viewcvs/incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene/AbstractIndex.java?rev=264144&r1=264143&r2=264144&view=diff
==============================================================================
--- incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene/AbstractIndex.java (original)
+++ incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene/AbstractIndex.java Mon Aug 29 07:07:16 2005
@@ -171,7 +171,7 @@
* @return a read-only index reader.
* @throws IOException if an error occurs while obtaining the index reader.
*/
- protected synchronized ReadOnlyIndexReader getReadOnlyIndexReader()
+ synchronized ReadOnlyIndexReader getReadOnlyIndexReader()
throws IOException {
// get current modifiable index reader
IndexReader modifiableReader = getIndexReader();
@@ -190,7 +190,6 @@
return new ReadOnlyIndexReader(sharedReader, deleted);
}
-
/**
* Returns an <code>IndexWriter</code> on this index.
* @return an <code>IndexWriter</code> on this index.
@@ -217,7 +216,18 @@
* Commits all pending changes to the underlying <code>Directory</code>.
* @throws IOException if an error occurs while commiting changes.
*/
- protected synchronized void commit() throws IOException {
+ protected void commit() throws IOException {
+ commit(false);
+ }
+
+ /**
+ * Commits all pending changes to the underlying <code>Directory</code>.
+ *
+ * @param optimize if <code>true</code> the index is optimized after after
+ * the commit.
+ * @throws IOException if an error occurs while commiting changes.
+ */
+ protected synchronized void commit(boolean optimize) throws IOException {
// if index is not locked there are no pending changes
if (!IndexReader.isLocked(getDirectory())) {
return;
@@ -230,6 +240,11 @@
log.debug("committing IndexWriter.");
indexWriter.close();
indexWriter = null;
+ }
+ // optimize if requested
+ if (optimize) {
+ IndexWriter writer = getIndexWriter();
+ writer.optimize();
}
}
Added: incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene/IndexListener.java
URL: http://svn.apache.org/viewcvs/incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene/IndexListener.java?rev=264144&view=auto
==============================================================================
--- incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene/IndexListener.java (added)
+++ incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene/IndexListener.java Mon Aug 29 07:07:16 2005
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2004-2005 The Apache Software Foundation or its licensors,
+ * as applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.core.query.lucene;
+
+import org.apache.lucene.index.Term;
+
+/**
+ * Defines an interface that allows implementing classes to listen for index
+ * changes (namely document deletes) while using a {@link ReadOnlyIndexReader}.
+ */
+public interface IndexListener {
+
+ /**
+ * Informs this listener, that the document with <code>id</code> has been
+ * deleted.
+ *
+ * @param id the <code>Term</code> that identifies the deleted document.
+ */
+ public void documentDeleted(Term id);
+}
Propchange: incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene/IndexListener.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene/IndexMerger.java
URL: http://svn.apache.org/viewcvs/incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene/IndexMerger.java?rev=264144&view=auto
==============================================================================
--- incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene/IndexMerger.java (added)
+++ incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene/IndexMerger.java Mon Aug 29 07:07:16 2005
@@ -0,0 +1,462 @@
+/*
+ * Copyright 2004-2005 The Apache Software Foundation or its licensors,
+ * as applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.core.query.lucene;
+
+import org.apache.lucene.index.Term;
+import org.apache.lucene.index.IndexReader;
+import org.apache.commons.collections.Buffer;
+import org.apache.commons.collections.BufferUtils;
+import org.apache.commons.collections.buffer.UnboundedFifoBuffer;
+import org.apache.log4j.Logger;
+
+import java.util.List;
+import java.util.Collections;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.io.IOException;
+
+import EDU.oswego.cs.dl.util.concurrent.Sync;
+import EDU.oswego.cs.dl.util.concurrent.Mutex;
+
+/**
+ * Merges indexes in a separate deamon thread.
+ */
+class IndexMerger extends Thread implements IndexListener {
+
+ /**
+ * Logger instance for this class.
+ */
+ private static final Logger log = Logger.getLogger(IndexMerger.class);
+
+ /**
+ * Marker task to signal the background thread to quit.
+ */
+ private static final Merge QUIT = new Merge(new Index[0]);
+
+ /**
+ * minMergeDocs config parameter.
+ */
+ private int minMergeDocs = SearchIndex.DEFAULT_MIN_MERGE_DOCS;
+
+ /**
+ * maxMergeDocs config parameter
+ */
+ private int maxMergeDocs = SearchIndex.DEFAULT_MAX_MERGE_DOCS;
+
+ /**
+ * mergeFactor config parameter
+ */
+ private int mergeFactor = SearchIndex.DEFAULT_MERGE_FACTOR;
+
+ /**
+ * Queue of merge Tasks
+ */
+ private final Buffer mergeTasks = BufferUtils.blockingBuffer(new UnboundedFifoBuffer());
+
+ /**
+ * List of id <code>Term</code> that identify documents that were deleted
+ * while a merge was running.
+ */
+ private final List deletedDocuments = Collections.synchronizedList(new ArrayList());
+
+ /**
+ * List of <code>IndexBucket</code>s in ascending document limit.
+ */
+ private final List indexBuckets = new ArrayList();
+
+ /**
+ * The <code>MultiIndex</code> this index merger is working on.
+ */
+ private final MultiIndex multiIndex;
+
+ /**
+ * Monitor object to synchronize merge calculation.
+ */
+ private final Object lock = new Object();
+
+ /**
+ * Mutex that is acquired when replacing indexes on MultiIndex.
+ */
+ private final Sync indexReplacement = new Mutex();
+
+ /**
+ * Creates an <code>IndexMerger</code>.
+ *
+ * @param multiIndex the <code>MultiIndex</code>.
+ */
+ IndexMerger(MultiIndex multiIndex) {
+ this.multiIndex = multiIndex;
+ setName("IndexMerger");
+ setDaemon(true);
+ }
+
+ /**
+ * Informs the index merger that an index was added / created.
+ *
+ * @param name the name of the index.
+ * @param numDocs the number of documents it contains.
+ */
+ void indexAdded(String name, int numDocs) {
+ if (numDocs < 0) {
+ throw new IllegalArgumentException("numDocs must be positive");
+ }
+ // multiple threads may enter this method:
+ // - the background thread of this IndexMerger, when it replaces indexes
+ // after a successful merge
+ // - a regular thread that updates the workspace
+ //
+ // therefore we have to synchronize this block
+ synchronized (lock) {
+ // initially create buckets
+ if (indexBuckets.size() == 0) {
+ int lower = 0;
+ int upper = minMergeDocs;
+ while (upper < maxMergeDocs) {
+ indexBuckets.add(new IndexBucket(lower, upper, true));
+ lower = upper + 1;
+ upper *= mergeFactor;
+ }
+ // one with upper = maxMergeDocs
+ indexBuckets.add(new IndexBucket(lower, maxMergeDocs, false));
+ // and another one as overflow, just in case...
+ indexBuckets.add(new IndexBucket(maxMergeDocs + 1, Integer.MAX_VALUE, false));
+ }
+
+ // put index in bucket
+ IndexBucket bucket = (IndexBucket) indexBuckets.get(indexBuckets.size() - 1);
+ for (int i = 0; i < indexBuckets.size(); i++) {
+ bucket = (IndexBucket) indexBuckets.get(i);
+ if (bucket.fits(numDocs)) {
+ break;
+ }
+ }
+ bucket.add(new Index(name, numDocs));
+
+ if (log.isDebugEnabled()) {
+ log.debug("index added: name=" + name + ", numDocs=" + numDocs);
+ }
+
+ // if bucket does not allow merge, we don't have to continue
+ if (!bucket.allowsMerge()) {
+ return;
+ }
+
+ // check if we need a merge
+ if (bucket.size() >= mergeFactor) {
+ long targetMergeDocs = bucket.upper;
+ targetMergeDocs = Math.min(targetMergeDocs * mergeFactor, maxMergeDocs);
+ // sum up docs in bucket
+ List indexesToMerge = new ArrayList();
+ int mergeDocs = 0;
+ for (Iterator it = bucket.iterator(); it.hasNext() && mergeDocs <= targetMergeDocs; ) {
+ indexesToMerge.add(it.next());
+ }
+ if (indexesToMerge.size() > 2) {
+ // found merge
+ Index[] idxs = (Index[]) indexesToMerge.toArray(new Index[indexesToMerge.size()]);
+ bucket.removeAll(indexesToMerge);
+ if (log.isDebugEnabled()) {
+ log.debug("requesting merge for " + indexesToMerge);
+ }
+ mergeTasks.add(new Merge(idxs));
+ log.debug("merge queue now contains " + mergeTasks.size() + " tasks.");
+ }
+ }
+ }
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public void documentDeleted(Term id) {
+ log.debug("document deleted: " + id.text());
+ deletedDocuments.add(id);
+ }
+
+ /**
+ * Signals this <code>IndexMerger</code> to stop and waits until it
+ * has terminated.
+ */
+ void dispose() {
+ log.debug("dispose IndexMerger");
+ // get mutex for index replacements
+ try {
+ indexReplacement.acquire();
+ } catch (InterruptedException e) {
+ log.warn("Interrupted while acquiring index replacement sync: " + e);
+ // try to stop IndexMerger without the sync
+ }
+
+ // clear task queue
+ mergeTasks.clear();
+
+ // send quit
+ mergeTasks.add(QUIT);
+ log.debug("quit sent");
+
+ try {
+ // give the merger thread some time to quit,
+ // it is possible that the merger is busy working on a large index.
+ // if that is the case we will just ignore it and the deamon will
+ // die without being able to finish the merge.
+ // at this point it is not possible anymore to replace indexes
+ // on the MultiIndex because we hold the indexReplacement Sync.
+ this.join(500);
+ if (isAlive()) {
+ log.info("Unable to stop IndexMerger. Deamon is busy.");
+ } else {
+ log.debug("IndexMerger thread stopped");
+ }
+ log.debug("merge queue size: " + mergeTasks.size());
+ } catch (InterruptedException e) {
+ log.warn("Interrupted while waiting for IndexMerger thread to terminate.");
+ }
+ }
+
+ /**
+ * Implements the index merging.
+ */
+ public void run() {
+ for (;;) {
+ Merge task = (Merge) mergeTasks.remove();
+ if (task == QUIT) {
+ break;
+ }
+
+ log.debug("accepted merge request");
+
+ // reset deleted documents
+ deletedDocuments.clear();
+
+ // get readers
+ String[] names = new String[task.indexes.length];
+ for (int i = 0; i < task.indexes.length; i++) {
+ names[i] = task.indexes[i].name;
+ }
+ try {
+ log.debug("create new index");
+ PersistentIndex index = multiIndex.createIndex();
+ boolean success = false;
+ try {
+
+ log.debug("get index readers from MultiIndex");
+ IndexReader[] readers = multiIndex.getIndexReaders(names, this);
+ try {
+ // do the merge
+ long time = System.currentTimeMillis();
+ index.addIndexes(readers);
+ time = System.currentTimeMillis() - time;
+ int docCount = 0;
+ for (int i = 0; i < readers.length; i++) {
+ docCount += readers[i].numDocs();
+ }
+ log.info("merged " + docCount + " documents in " + time + " ms into " + index.getName() + ".");
+ } finally {
+ for (int i = 0; i < readers.length; i++) {
+ try {
+ readers[i].close();
+ } catch (IOException e) {
+ log.warn("Unable to close IndexReader: " + e);
+ }
+ }
+ }
+
+ // inform multi index
+ // if we cannot get the sync immediately we have to quit
+ if (!indexReplacement.attempt(0)) {
+ log.debug("index merging canceled");
+ break;
+ }
+ try {
+ log.debug("replace indexes");
+ multiIndex.replaceIndexes(names, index, deletedDocuments);
+ } finally {
+ indexReplacement.release();
+ }
+
+ success = true;
+
+ } finally {
+ if (!success) {
+ // delete index
+ log.debug("deleting index " + index.getName());
+ multiIndex.deleteIndex(index);
+ }
+ }
+ } catch (Throwable e) {
+ log.error("Error while merging indexes: " + e);
+ }
+ }
+ log.info("IndexMerger terminated");
+ }
+
+ //-----------------------< merge properties >-------------------------------
+
+ /**
+ * The merge factor.
+ */
+ public void setMergeFactor(int mergeFactor) {
+ this.mergeFactor = mergeFactor;
+ }
+
+
+ /**
+ * The initial threshold for number of documents to merge to a new index.
+ */
+ public void setMinMergeDocs(int minMergeDocs) {
+ this.minMergeDocs = minMergeDocs;
+ }
+
+ /**
+ * The maximum number of document to merge.
+ */
+ public void setMaxMergeDocs(int maxMergeDocs) {
+ this.maxMergeDocs = maxMergeDocs;
+ }
+
+ //------------------------------< internal >--------------------------------
+
+ /**
+ * Implements a simple struct that holds the name of an index and how
+ * many document it contains. <code>Index</code> is comparable using the
+ * number of documents it contains.
+ */
+ private static final class Index implements Comparable {
+
+ /**
+ * The name of the index.
+ */
+ final String name;
+
+ /**
+ * The number of documents the index contains.
+ */
+ final int numDocs;
+
+ /**
+ * Creates a new index struct.
+ *
+ * @param name name of an index.
+ * @param numDocs number of documents it contains.
+ */
+ Index(String name, int numDocs) {
+ this.name = name;
+ this.numDocs = numDocs;
+ }
+
+ /**
+ * Indexes are first ordered by {@link #numDocs} and then by {@link
+ * #name}.
+ *
+ * @param o the other <code>Index</code>.
+ * @return a negative integer, zero, or a positive integer as this
+ * Index is less than, equal to, or greater than the specified
+ * Index.
+ */
+ public int compareTo(Object o) {
+ Index other = (Index) o;
+ int val = numDocs < other.numDocs ? -1 : (numDocs == other.numDocs ? 0 : 1);
+ if (val != 0) {
+ return val;
+ } else {
+ return name.compareTo(other.name);
+ }
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public String toString() {
+ return name + ":" + numDocs;
+ }
+ }
+
+ /**
+ * Defines a merge task, to merge a couple of indexes into a new index.
+ */
+ private static final class Merge {
+
+ final Index[] indexes;
+
+ /**
+ * Merge task, to merge <code>indexes</code> into a new index with
+ * <code>name</code>.
+ *
+ * @param indexes the indexes to merge.
+ */
+ Merge(Index[] indexes) {
+ this.indexes = new Index[indexes.length];
+ System.arraycopy(indexes, 0, this.indexes, 0, indexes.length);
+ }
+ }
+
+ /**
+ * Implements a <code>List</code> with a document limit value. An
+ * <code>IndexBucket</code> contains {@link Index}es with documents less
+ * or equal the document limit of the bucket.
+ */
+ private static final class IndexBucket extends ArrayList {
+
+ /**
+ * The lower document limit.
+ */
+ private final int lower;
+
+ /**
+ * The upper document limit.
+ */
+ private final int upper;
+
+ /**
+ * Flag indicating if indexes in this bucket can be merged.
+ */
+ private final boolean allowMerge;
+
+ /**
+ * Creates a new <code>IndexBucket</code>. Limits are both inclusive.
+ *
+ * @param lower document limit.
+ * @param upper document limit.
+ * @param allowMerge if indexes in this bucket can be merged.
+ */
+ IndexBucket(int lower, int upper, boolean allowMerge) {
+ this.lower = lower;
+ this.upper = upper;
+ this.allowMerge = allowMerge;
+ }
+
+ /**
+ * Returns <code>true</code> if the number of documents fit in this
+ * <code>IndexBucket</code>; otherwise <code>false</code>
+ *
+ * @param numDocs the number of documents.
+ * @return <code>true</code> if <code>numDocs</code> fit.
+ */
+ boolean fits(int numDocs) {
+ return numDocs >= lower && numDocs <= upper;
+ }
+
+ /**
+ * Returns <code>true</code> if indexes in this bucket can be merged.
+ *
+ * @return <code>true</code> if indexes in this bucket can be merged.
+ */
+ boolean allowsMerge() {
+ return allowMerge;
+ }
+ }
+}
Propchange: incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene/IndexMerger.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene/MultiIndex.java
URL: http://svn.apache.org/viewcvs/incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene/MultiIndex.java?rev=264144&r1=264143&r2=264144&view=diff
==============================================================================
--- incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene/MultiIndex.java (original)
+++ incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene/MultiIndex.java Mon Aug 29 07:07:16 2005
@@ -36,6 +36,11 @@
import java.util.Timer;
import java.util.TimerTask;
import java.util.Arrays;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Collection;
/**
* A <code>MultiIndex</code> consists of a {@link VolatileIndex} and multiple
@@ -55,6 +60,13 @@
* {@link SearchIndex#setMergeFactor(int)} and {@link SearchIndex#setMinMergeDocs(int)}. For detailed
* description of the configuration parameters see also the lucene
* <code>IndexWriter</code> class.
+ * <p/>
+ * This class is thread-safe.
+ * <p/>
+ * Note on implementation: Multiple modifying threads are synchronized on a
+ * <code>MultiIndex</code> instance itself. Sychronization between a modifying
+ * thread and reader threads is done using {@link #updateMonitor} and
+ * {@link #updateInProgress}.
*/
class MultiIndex {
@@ -136,12 +148,16 @@
private boolean redoLogApplied = false;
/**
- * The last time this index was modified. That is, a document was added
- * or removed.
+ * The last time this index was modified. That is, a document was added.
*/
private long lastModificationTime;
/**
+ * The <code>IndexMerger</code> for this <code>MultiIndex</code>.
+ */
+ private final IndexMerger merger;
+
+ /**
* Timer to schedule commits of the volatile index after some idle time.
*/
private final Timer commitTimer = new Timer(true);
@@ -180,6 +196,12 @@
File mapFile = new File(indexDir, NS_MAPPING_FILE);
nsMappings = new NamespaceMappings(mapFile);
+ // initialize IndexMerger
+ merger = new IndexMerger(this);
+ merger.setMaxMergeDocs(handler.getMaxMergeDocs());
+ merger.setMergeFactor(handler.getMergeFactor());
+ merger.setMinMergeDocs(handler.getMinMergeDocs());
+
try {
// open persistent indexes
for (int i = 0; i < indexNames.size(); i++) {
@@ -194,6 +216,7 @@
index.setMinMergeDocs(handler.getMinMergeDocs());
index.setUseCompoundFile(handler.getUseCompoundFile());
indexes.add(index);
+ merger.indexAdded(index.getName(), index.getNumDocuments());
}
// create volatile index and check / apply redo log
@@ -222,16 +245,18 @@
deleteNodePersistent(entry.uuid);
}
}
- maybeMergeIndexes();
log.warn("Redo changes applied.");
redoLog.clear();
redoLogApplied = true;
}
volatileIndex = new VolatileIndex(handler.getAnalyzer(), redoLog);
- volatileIndex.setUseCompoundFile(false);
+ volatileIndex.setUseCompoundFile(handler.getUseCompoundFile());
volatileIndex.setBufferSize(handler.getBufferSize());
+ // now that we are ready, start index merger
+ merger.start();
+
if (doInitialIndex) {
// index root node
NodeState rootState = (NodeState) stateMgr.getItemState(new NodeId(rootUUID));
@@ -344,6 +369,125 @@
}
/**
+ * Returns <code>IndexReader</code>s for the indexes named
+ * <code>indexNames</code>. An <code>IndexListener</code> is registered and
+ * notified when documents are deleted from one of the indexes in
+ * <code>indexNames</code>.
+ * <p/>
+ * Note: the number of <code>IndexReaders</code> returned by this method is
+ * not necessarily the same as the number of index names passed. An index
+ * might have been deleted and is not reachable anymore.
+ *
+ * @param indexNames the names of the indexes for which to obtain readers.
+ * @param listener the listener to notify when documents are deleted.
+ * @return the <code>IndexReaders</code>.
+ * @throws IOException if an error occurs acquiring the index readers.
+ */
+ synchronized IndexReader[] getIndexReaders(String[] indexNames, IndexListener listener)
+ throws IOException {
+ Set names = new HashSet(Arrays.asList(indexNames));
+ Map indexReaders = new HashMap();
+
+ try {
+ for (Iterator it = indexes.iterator(); it.hasNext(); ) {
+ PersistentIndex index = (PersistentIndex) it.next();
+ if (names.contains(index.getName())) {
+ indexReaders.put(index.getReadOnlyIndexReader(listener), index);
+ }
+ }
+ } catch (IOException e) {
+ // close readers obtained so far
+ for (Iterator it = indexReaders.keySet().iterator(); it.hasNext(); ) {
+ ReadOnlyIndexReader reader = (ReadOnlyIndexReader) it.next();
+ try {
+ reader.close();
+ } catch (IOException ex) {
+ log.warn("Exception closing index reader: " + ex);
+ }
+ ((PersistentIndex) indexReaders.get(reader)).resetListener();
+ }
+ throw e;
+ }
+
+ return (IndexReader[]) indexReaders.keySet().toArray(new IndexReader[indexReaders.size()]);
+ }
+
+ /**
+ * Creates a new Persistent index. The new index is not registered with this
+ * <code>MultiIndex</code>.
+ *
+ * @return a new <code>PersistentIndex</code>.
+ * @throws IOException if a new index cannot be created.
+ */
+ synchronized PersistentIndex createIndex() throws IOException {
+ File sub = newIndexFolder();
+ String name = sub.getName();
+ PersistentIndex index = new PersistentIndex(name, sub, true,
+ handler.getAnalyzer(), cache);
+ index.setMaxMergeDocs(handler.getMaxMergeDocs());
+ index.setMergeFactor(handler.getMergeFactor());
+ index.setMinMergeDocs(handler.getMinMergeDocs());
+ index.setUseCompoundFile(handler.getUseCompoundFile());
+ return index;
+ }
+
+ /**
+ * Replaces the indexes with names <code>obsoleteIndexes</code> with
+ * <code>index</code>. Documents that must be deleted in <code>index</code>
+ * can be identified with <code>Term</code>s in <code>deleted</code>.
+ *
+ * @param obsoleteIndexes the names of the indexes to replace.
+ * @param index the new index that is the result of a merge of the
+ * indexes to replace.
+ * @param deleted <code>Term</code>s that identify documents that must be
+ * deleted in <code>index</code>.
+ * @throws IOException if an exception occurs while replacing the indexes.
+ */
+ synchronized void replaceIndexes(String[] obsoleteIndexes,
+ PersistentIndex index,
+ Collection deleted)
+ throws IOException {
+ Set names = new HashSet(Arrays.asList(obsoleteIndexes));
+ // delete documents in index
+ for (Iterator it = deleted.iterator(); it.hasNext(); ) {
+ Term id = (Term) it.next();
+ int del = index.removeDocument(id);
+ log.error("deleted " + del + " document for id: " + id.text());
+ }
+ index.commit();
+
+ // now replace indexes
+ synchronized (updateMonitor) {
+ updateInProgress = true;
+ }
+ try {
+ for (Iterator it = indexes.iterator(); it.hasNext(); ) {
+ PersistentIndex idx = (PersistentIndex) it.next();
+ if (names.contains(idx.getName())) {
+ it.remove();
+ indexNames.removeName(idx.getName());
+ idx.close();
+ deleteIndex(idx);
+ }
+ }
+ // add new
+ indexes.add(index);
+ indexNames.addName(index.getName());
+ merger.indexAdded(index.getName(), index.getNumDocuments());
+ indexNames.write(indexDir);
+ } finally {
+ synchronized (updateMonitor) {
+ updateInProgress = false;
+ updateMonitor.notifyAll();
+ if (multiReader != null) {
+ multiReader.close();
+ multiReader = null;
+ }
+ }
+ }
+ }
+
+ /**
* Returns an read-only <code>IndexReader</code> that spans alls indexes of this
* <code>MultiIndex</code>.
*
@@ -383,29 +527,37 @@
/**
* Closes this <code>MultiIndex</code>.
*/
- synchronized void close() {
- // stop timer
- commitTimer.cancel();
+ void close() {
+
+ // stop index merger
+ // when calling this method we must not lock this MultiIndex, otherwise
+ // a deadlock might occur
+ merger.dispose();
+
+ synchronized (this) {
+ // stop timer
+ commitTimer.cancel();
- // commit / close indexes
- if (multiReader != null) {
+ // commit / close indexes
+ if (multiReader != null) {
+ try {
+ multiReader.close();
+ } catch (IOException e) {
+ log.error("Exception while closing search index.", e);
+ }
+ multiReader = null;
+ }
try {
- multiReader.close();
+ if (volatileIndex.getRedoLog().hasEntries()) {
+ commit();
+ }
} catch (IOException e) {
log.error("Exception while closing search index.", e);
}
- multiReader = null;
- }
- try {
- if (volatileIndex.getRedoLog().hasEntries()) {
- commit();
+ volatileIndex.close();
+ for (int i = 0; i < indexes.size(); i++) {
+ ((PersistentIndex) indexes.get(i)).close();
}
- } catch (IOException e) {
- log.error("Exception while closing search index.", e);
- }
- volatileIndex.close();
- for (int i = 0; i < indexes.size(); i++) {
- ((PersistentIndex) indexes.get(i)).close();
}
}
@@ -437,6 +589,29 @@
return redoLogApplied;
}
+ /**
+ * Deletes the <code>index</code>. If the index directory cannot be removed
+ * because (windows) file handles are still open, the directory is marked
+ * for future deletion.
+ * <p/>
+ * This method does not close the index, but rather expects that the index
+ * has already been closed.
+ *
+ * @param index the index to delete.
+ */
+ void deleteIndex(PersistentIndex index) {
+ File dir = new File(indexDir, index.getName());
+ if (!deleteIndex(dir)) {
+ // try again later
+ deletable.addName(index.getName());
+ }
+ try {
+ deletable.write(indexDir);
+ } catch (IOException e) {
+ log.warn("Exception while writing deletable indexes: " + e);
+ }
+ }
+
//-------------------------< internal >-------------------------------------
/**
@@ -502,21 +677,32 @@
index.setMergeFactor(handler.getMergeFactor());
index.setMinMergeDocs(handler.getMinMergeDocs());
index.setUseCompoundFile(handler.getUseCompoundFile());
- index.mergeIndex(volatileIndex);
+ index.copyIndex(volatileIndex);
// if merge has been successful add index
indexes.add(index);
indexNames.addName(name);
indexNames.write(indexDir);
+ merger.indexAdded(index.getName(), index.getNumDocuments());
+
// check if obsolete indexes can be deleted
// todo move to other place?
attemptDelete();
}
// commit persistent indexes
- for (int i = 0; i < indexes.size(); i++) {
- ((PersistentIndex) indexes.get(i)).commit();
+ for (int i = indexes.size() - 1; i >= 0; i--) {
+ PersistentIndex index = (PersistentIndex) indexes.get(i);
+ index.commit();
+ // check if index still contains documents
+ if (index.getNumDocuments() == 0) {
+ indexes.remove(i);
+ indexNames.removeName(index.getName());
+ indexNames.write(indexDir);
+ index.close();
+ deleteIndex(index);
+ }
}
// reset redo log
@@ -524,10 +710,9 @@
// create new volatile index
volatileIndex = new VolatileIndex(handler.getAnalyzer(), volatileIndex.getRedoLog());
- volatileIndex.setUseCompoundFile(false);
+ volatileIndex.setUseCompoundFile(handler.getUseCompoundFile());
volatileIndex.setBufferSize(handler.getBufferSize());
- maybeMergeIndexes();
}
/**
@@ -609,106 +794,6 @@
break;
}
}
- }
-
- /**
- * Merges multiple persistent index into a single one according to the
- * properties: {@link SearchIndex#setMaxMergeDocs(int)}, {@link
- * SearchIndex#setMergeFactor(int)} and {@link SearchIndex#setMinMergeDocs(int)}.
- *
- * @throws IOException if an error occurs during the merge.
- */
- private void maybeMergeIndexes() throws IOException {
- // remove unused indexes
- for (int i = indexes.size() - 1; i >= 0; i--) {
- PersistentIndex index = (PersistentIndex) indexes.get(i);
- if (!index.hasDocuments()) {
- indexes.remove(i);
- indexNames.removeName(index.getName());
- indexNames.write(indexDir);
- index.close();
- File dir = new File(indexDir, index.getName());
- if (!deleteIndex(dir)) {
- // try again later
- deletable.addName(index.getName());
- deletable.write(indexDir);
- }
- }
- }
-
- // only check for merge if there are more than mergeFactor indexes
- if (indexes.size() >= handler.getMergeFactor()) {
- long targetMergeDocs = handler.getMinMergeDocs();
- while (targetMergeDocs <= handler.getMaxMergeDocs()) {
- // find index smaller or equal than current target size
- int minIndex = indexes.size();
- int mergeDocs = 0;
- while (--minIndex >= 0) {
- PersistentIndex index = (PersistentIndex) indexes.get(minIndex);
- int numDocs = index.getIndexReader().numDocs();
- if (numDocs > targetMergeDocs) {
- break;
- }
- mergeDocs += numDocs;
- }
-
- if (indexes.size() - (minIndex + 1) >= handler.getMergeFactor()
- && mergeDocs < handler.getMaxMergeDocs()) {
- // found a merge to do
- mergeIndex(minIndex + 1);
- } else {
- break;
- }
- // increase target size
- targetMergeDocs *= handler.getMergeFactor();
- }
- }
- }
-
- /**
- * Merges indexes <code>indexes.get(i)</code> to <code>indexes.get(indexes.size()
- * - 1)</code> into a new persistent index.
- *
- * @param min the min position inside the indexes list.
- * @throws IOException if an error occurs while merging.
- */
- private void mergeIndex(int min) throws IOException {
- // create new index
- File sub = newIndexFolder();
- String name = sub.getName();
- PersistentIndex index = new PersistentIndex(name, sub, true,
- handler.getAnalyzer(), cache);
- index.setMaxMergeDocs(handler.getMaxMergeDocs());
- index.setMergeFactor(handler.getMergeFactor());
- index.setMinMergeDocs(handler.getMinMergeDocs());
- index.setUseCompoundFile(handler.getUseCompoundFile());
-
- // the indexes to merge
- List toMerge = indexes.subList(min, indexes.size());
- IndexReader[] readers = new IndexReader[toMerge.size()];
- for (int i = 0; i < toMerge.size(); i++) {
- readers[i] = ((PersistentIndex) toMerge.get(i)).getIndexReader();
- }
- // do the merge
- index.getIndexWriter().addIndexes(readers);
- index.getIndexWriter().optimize();
- // close and remove obsolete indexes
-
- for (int i = indexes.size() - 1; i >= min; i--) {
- PersistentIndex pi = (PersistentIndex) indexes.get(i);
- pi.close();
- File dir = new File(indexDir, pi.getName());
- if (!deleteIndex(dir)) {
- // try again later
- deletable.addName(pi.getName());
- }
- indexNames.removeName(pi.getName());
- indexes.remove(i);
- }
- indexNames.addName(name);
- indexes.add(index);
- indexNames.write(indexDir);
- deletable.write(indexDir);
}
/**
Modified: incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene/PersistentIndex.java
URL: http://svn.apache.org/viewcvs/incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene/PersistentIndex.java?rev=264144&r1=264143&r2=264144&view=diff
==============================================================================
--- incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene/PersistentIndex.java (original)
+++ incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene/PersistentIndex.java Mon Aug 29 07:07:16 2005
@@ -20,7 +20,10 @@
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.Term;
import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.InputStream;
+import org.apache.lucene.store.OutputStream;
import java.io.IOException;
import java.io.File;
@@ -47,6 +50,12 @@
private boolean lockEncountered = false;
/**
+ * If non <code>null</code>, <code>listener</code> needs to be informed
+ * when a document is deleted.
+ */
+ private IndexListener listener;
+
+ /**
* Creates a new <code>PersistentIndex</code> based on the file system
* <code>indexDir</code>.
* @param name the name of this index.
@@ -86,6 +95,17 @@
}
/**
+ * @inheritDoc
+ */
+ int removeDocument(Term idTerm) throws IOException {
+ int num = super.removeDocument(idTerm);
+ if (num > 0 && listener != null) {
+ listener.documentDeleted(idTerm);
+ }
+ return num;
+ }
+
+ /**
* Returns <code>true</code> if this index encountered a lock on the file
* system during startup. This indicates a unclean shutdown.
*
@@ -115,23 +135,87 @@
}
/**
- * Returns <code>true</code> if this index has valid documents. Returns
- * <code>false</code> if all documents are deleted, or the index does not
- * contain any documents.
- * @return
- * @throws IOException
- */
- boolean hasDocuments() throws IOException {
- if (getIndexReader().numDocs() == 0) {
- return false;
- }
- IndexReader reader = getIndexReader();
- for (int i = 0; i < reader.maxDoc(); i++) {
- if (!reader.isDeleted(i)) {
- return true;
+ * Merges the provided indexes into this index. After this completes, the
+ * index is optimized.
+ * <p/>
+ * The provided IndexReaders are not closed.
+ *
+ * @param readers the readers of indexes to add.
+ * @throws IOException if an error occurs while adding indexes.
+ */
+ void addIndexes(IndexReader[] readers) throws IOException {
+ getIndexWriter().addIndexes(readers);
+ getIndexWriter().optimize();
}
+
+ /**
+ * Copies <code>index</code> into this persistent index. This method should
+ * only be called when <code>this</code> index is empty otherwise the
+ * behaviour is undefined.
+ *
+ * @param index the index to copy from.
+ * @throws IOException if an error occurs while copying.
+ */
+ void copyIndex(AbstractIndex index) throws IOException {
+ // commit changes to directory on other index.
+ index.commit(true);
+ // simply copy over the files
+ byte[] buffer = new byte[1024];
+ Directory dir = index.getDirectory();
+ Directory dest = getDirectory();
+ String[] files = dir.list();
+ for (int i = 0; i < files.length; i++) {
+ InputStream in = dir.openFile(files[i]);
+ try {
+ OutputStream out = dest.createFile(files[i]);
+ try {
+ long remaining = in.length();
+ while (remaining > 0) {
+ int num = (int) Math.min(remaining, buffer.length);
+ in.readBytes(buffer, 0, num);
+ out.writeBytes(buffer, num);
+ remaining -= num;
+ }
+ } finally {
+ out.close();
+ }
+ } finally {
+ in.close();
+ }
}
- return false;
+ }
+
+ /**
+ * Returns a <code>ReadOnlyIndexReader</code> and registeres
+ * <code>listener</code> to send notifications when documents are deleted on
+ * <code>this</code> index.
+ *
+ * @param listener the listener to notify when documents are deleted.
+ * @return a <code>ReadOnlyIndexReader</code>.
+ * @throws IOException if the reader cannot be obtained.
+ */
+ synchronized ReadOnlyIndexReader getReadOnlyIndexReader(IndexListener listener)
+ throws IOException {
+ ReadOnlyIndexReader reader = getReadOnlyIndexReader();
+ this.listener = listener;
+ return reader;
+ }
+
+ /**
+ * Removes a potentially registered {@link IndexListener}.
+ */
+ synchronized void resetListener() {
+ this.listener = null;
+ }
+
+ /**
+ * Returns the number of documents in this persistent index.
+ *
+ * @return the number of documents in this persistent index.
+ * @throws IOException if an error occurs while reading from the index.
+ */
+ int getNumDocuments() throws IOException {
+ return getIndexReader().numDocs();
}
/**
Modified: incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene/ReadOnlyIndexReader.java
URL: http://svn.apache.org/viewcvs/incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene/ReadOnlyIndexReader.java?rev=264144&r1=264143&r2=264144&view=diff
==============================================================================
--- incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene/ReadOnlyIndexReader.java (original)
+++ incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene/ReadOnlyIndexReader.java Mon Aug 29 07:07:16 2005
@@ -92,6 +92,24 @@
}
/**
+ * Returns <code>true</code> if any documents have been deleted.
+ *
+ * @return <code>true</code> if any documents have been deleted.
+ */
+ public boolean hasDeletions() {
+ return !deleted.isEmpty();
+ }
+
+ /**
+ * Returns the number of documents in this index reader.
+ *
+ * @return the number of documents in this index reader.
+ */
+ public int numDocs() {
+ return maxDoc() - deleted.cardinality();
+ }
+
+ /**
* @exception UnsupportedOperationException always
*/
final protected void doDelete(int docNum) {
Modified: incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene/SearchIndex.java
URL: http://svn.apache.org/viewcvs/incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene/SearchIndex.java?rev=264144&r1=264143&r2=264144&view=diff
==============================================================================
--- incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene/SearchIndex.java (original)
+++ incubator/jackrabbit/trunk/core/src/java/org/apache/jackrabbit/core/query/lucene/SearchIndex.java Mon Aug 29 07:07:16 2005
@@ -55,6 +55,21 @@
private static final Logger log = Logger.getLogger(SearchIndex.class);
/**
+ * The default value for property {@link #minMergeDocs}.
+ */
+ public static final int DEFAULT_MIN_MERGE_DOCS = 100;
+
+ /**
+ * The default value for property {@link #maxMergeDocs}.
+ */
+ public static final int DEFAULT_MAX_MERGE_DOCS = 100000;
+
+ /**
+ * the default value for property {@link #mergeFactor}.
+ */
+ public static final int DEFAULT_MERGE_FACTOR = 10;
+
+ /**
* The actual index
*/
private MultiIndex index;
@@ -74,7 +89,7 @@
/**
* minMergeDocs config parameter.
*/
- private int minMergeDocs = 100;
+ private int minMergeDocs = DEFAULT_MIN_MERGE_DOCS;
/**
* volatileIdleTime config parameter.
@@ -84,12 +99,12 @@
/**
* maxMergeDocs config parameter
*/
- private int maxMergeDocs = 100000;
+ private int maxMergeDocs = DEFAULT_MAX_MERGE_DOCS;
/**
* mergeFactor config parameter
*/
- private int mergeFactor = 10;
+ private int mergeFactor = DEFAULT_MERGE_FACTOR;
/**
* Number of documents that are buffered before they are added to the index.
@@ -262,9 +277,9 @@
* to this handler.
*/
public void close() {
- log.info("Closing search index.");
index.close();
getContext().destroy();
+ log.info("Search index closed.");
}
/**