You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by sh...@apache.org on 2012/06/12 11:40:24 UTC
svn commit: r1349214 - in /lucene/dev/trunk/lucene:
core/src/java/org/apache/lucene/index/
core/src/java/org/apache/lucene/search/
facet/src/java/org/apache/lucene/facet/taxonomy/directory/
facet/src/java/org/apache/lucene/facet/taxonomy/writercache/ f...
Author: shaie
Date: Tue Jun 12 09:40:23 2012
New Revision: 1349214
URL: http://svn.apache.org/viewvc?rev=1349214&view=rev
Log:
LUCENE-4061: fix concurrency issues in DirectoryTaxonomyWriter
Added:
lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/ReaderManager.java (with props)
Modified:
lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/search/ReferenceManager.java
lucene/dev/trunk/lucene/facet/src/java/org/apache/lucene/facet/taxonomy/directory/DirectoryTaxonomyWriter.java
lucene/dev/trunk/lucene/facet/src/java/org/apache/lucene/facet/taxonomy/writercache/TaxonomyWriterCache.java
lucene/dev/trunk/lucene/facet/src/java/org/apache/lucene/facet/taxonomy/writercache/cl2o/Cl2oTaxonomyWriterCache.java
lucene/dev/trunk/lucene/facet/src/java/org/apache/lucene/facet/taxonomy/writercache/lru/LruTaxonomyWriterCache.java
lucene/dev/trunk/lucene/facet/src/java/org/apache/lucene/facet/taxonomy/writercache/lru/NameIntCacheLRU.java
lucene/dev/trunk/lucene/facet/src/test/org/apache/lucene/facet/taxonomy/directory/TestDirectoryTaxonomyWriter.java
Added: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/ReaderManager.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/ReaderManager.java?rev=1349214&view=auto
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/ReaderManager.java (added)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/index/ReaderManager.java Tue Jun 12 09:40:23 2012
@@ -0,0 +1,85 @@
+package org.apache.lucene.index;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.ReferenceManager;
+import org.apache.lucene.search.SearcherManager;
+import org.apache.lucene.store.Directory;
+
+/**
+ * Utility class to safely share {@link DirectoryReader} instances across
+ * multiple threads, while periodically reopening. This class ensures each
+ * reader is closed only once all threads have finished using it.
+ *
+ * @see SearcherManager
+ *
+ * @lucene.experimental
+ */
+public final class ReaderManager extends ReferenceManager<DirectoryReader> {
+
+ /**
+ * Creates and returns a new ReaderManager from the given
+ * {@link IndexWriter}.
+ *
+ * @param writer
+ * the IndexWriter to open the IndexReader from.
+ * @param applyAllDeletes
+ * If <code>true</code>, all buffered deletes will be applied (made
+ * visible) in the {@link IndexSearcher} / {@link DirectoryReader}.
+ * If <code>false</code>, the deletes may or may not be applied, but
+ * remain buffered (in IndexWriter) so that they will be applied in
+ * the future. Applying deletes can be costly, so if your app can
+ * tolerate deleted documents being returned you might gain some
+ * performance by passing <code>false</code>. See
+ * {@link DirectoryReader#openIfChanged(DirectoryReader, IndexWriter, boolean)}.
+ *
+ * @throws IOException
+ */
+ public ReaderManager(IndexWriter writer, boolean applyAllDeletes) throws IOException {
+ current = DirectoryReader.open(writer, applyAllDeletes);
+ }
+
+ /**
+ * Creates and returns a new ReaderManager from the given {@link Directory}.
+ * @param dir the directory to open the DirectoryReader on.
+ *
+ * @throws IOException
+ */
+ public ReaderManager(Directory dir) throws IOException {
+ current = DirectoryReader.open(dir);
+ }
+
+ @Override
+ protected void decRef(DirectoryReader reference) throws IOException {
+ reference.decRef();
+ }
+
+ @Override
+ protected DirectoryReader refreshIfNeeded(DirectoryReader referenceToRefresh) throws IOException {
+ return DirectoryReader.openIfChanged(referenceToRefresh);
+ }
+
+ @Override
+ protected boolean tryIncRef(DirectoryReader reference) {
+ return reference.tryIncRef();
+ }
+
+}
Modified: lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/search/ReferenceManager.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/search/ReferenceManager.java?rev=1349214&r1=1349213&r2=1349214&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/search/ReferenceManager.java (original)
+++ lucene/dev/trunk/lucene/core/src/java/org/apache/lucene/search/ReferenceManager.java Tue Jun 12 09:40:23 2012
@@ -162,7 +162,7 @@ public abstract class ReferenceManager<G
public final boolean maybeRefresh() throws IOException {
ensureOpen();
- // Ensure only 1 thread does reopen at once; other threads just return immediately:
+ // Ensure only 1 thread does refresh at once; other threads just return immediately:
final boolean doTryRefresh = refreshLock.tryLock();
if (doTryRefresh) {
try {
@@ -189,7 +189,7 @@ public abstract class ReferenceManager<G
public final void maybeRefreshBlocking() throws IOException, InterruptedException {
ensureOpen();
- // Ensure only 1 thread does reopen at once
+ // Ensure only 1 thread does refresh at once
refreshLock.lock();
try {
doMaybeRefresh();
Modified: lucene/dev/trunk/lucene/facet/src/java/org/apache/lucene/facet/taxonomy/directory/DirectoryTaxonomyWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/facet/src/java/org/apache/lucene/facet/taxonomy/directory/DirectoryTaxonomyWriter.java?rev=1349214&r1=1349213&r2=1349214&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/facet/src/java/org/apache/lucene/facet/taxonomy/directory/DirectoryTaxonomyWriter.java (original)
+++ lucene/dev/trunk/lucene/facet/src/java/org/apache/lucene/facet/taxonomy/directory/DirectoryTaxonomyWriter.java Tue Jun 12 09:40:23 2012
@@ -11,6 +11,7 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.core.KeywordAnalyzer;
@@ -36,6 +37,7 @@ import org.apache.lucene.index.IndexWrit
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.IndexWriterConfig.OpenMode;
import org.apache.lucene.index.LogByteSizeMergePolicy;
+import org.apache.lucene.index.ReaderManager;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.Terms;
import org.apache.lucene.index.TermsEnum;
@@ -93,15 +95,22 @@ public class DirectoryTaxonomyWriter imp
*/
public static final String INDEX_CREATE_TIME = "index.create.time";
- private Directory dir;
- private IndexWriter indexWriter;
+ private final Directory dir;
+ private final IndexWriter indexWriter;
+ private final TaxonomyWriterCache cache;
+ private final AtomicInteger cacheMisses = new AtomicInteger(0);
+
+ /** Records the taxonomy index creation time, updated on replaceTaxonomy as well. */
+ private String createTime;
+
private int nextID;
private char delimiter = Consts.DEFAULT_DELIMITER;
private SinglePositionTokenStream parentStream = new SinglePositionTokenStream(Consts.PAYLOAD_PARENT);
private Field parentStreamField;
private Field fullPathField;
-
- private TaxonomyWriterCache cache;
+ private int cacheMissesUntilFill = 11;
+ private boolean shouldFillCache = true;
+
/**
* We call the cache "complete" if we know that every category in our
* taxonomy is in the cache. When the cache is <B>not</B> complete, and
@@ -112,13 +121,12 @@ public class DirectoryTaxonomyWriter imp
* categories, or if a put() to the cache ever returned true (meaning
* that some of the cached data was cleared).
*/
- private boolean cacheIsComplete;
- private DirectoryReader reader;
- private int cacheMisses;
+ private volatile boolean cacheIsComplete;
+ private volatile ReaderManager readerManager;
+ private volatile boolean shouldRefreshReaderManager;
+ private volatile boolean isClosed = false;
+ private volatile ParentArray parentArray;
- /** Records the taxonomy index creation time, updated on replaceTaxonomy as well. */
- private String createTime;
-
/** Reads the commit data from a Directory. */
private static Map<String, String> readCommitData(Directory dir) throws IOException {
SegmentInfos infos = new SegmentInfos();
@@ -213,8 +221,6 @@ public class DirectoryTaxonomyWriter imp
assert !(indexWriter.getConfig().getMergePolicy() instanceof TieredMergePolicy) :
"for preserving category docids, merging none-adjacent segments is not allowed";
- reader = null;
-
FieldType ft = new FieldType(TextField.TYPE_NOT_STORED);
ft.setOmitNorms(true);
parentStreamField = new Field(Consts.FIELD_PAYLOADS, parentStream, ft);
@@ -232,17 +238,15 @@ public class DirectoryTaxonomyWriter imp
// Make sure that the taxonomy always contain the root category
// with category id 0.
addCategory(new CategoryPath());
- refreshInternalReader();
} else {
// There are some categories on the disk, which we have not yet
// read into the cache, and therefore the cache is incomplete.
- // We chose not to read all the categories into the cache now,
+ // We choose not to read all the categories into the cache now,
// to avoid terrible performance when a taxonomy index is opened
// to add just a single category. We will do it later, after we
// notice a few cache misses.
cacheIsComplete = false;
}
- cacheMisses = 0;
}
/**
@@ -289,18 +293,21 @@ public class DirectoryTaxonomyWriter imp
new LogByteSizeMergePolicy());
}
- /** Opens a {@link DirectoryReader} from the internal {@link IndexWriter}. */
- private synchronized void openInternalReader() throws IOException {
- // verify that the taxo-writer hasn't been closed on us. the method is
- // synchronized since it may be called from a non sync'ed block, and it
- // needs to protect against close() happening concurrently.
- ensureOpen();
- assert reader == null : "a reader is already open !";
- reader = DirectoryReader.open(indexWriter, false);
+ /** Opens a {@link ReaderManager} from the internal {@link IndexWriter}. */
+ private void initReaderManager() throws IOException {
+ if (readerManager == null) {
+ synchronized (this) {
+ // verify that the taxo-writer hasn't been closed on us.
+ ensureOpen();
+ if (readerManager == null) {
+ readerManager = new ReaderManager(indexWriter, false);
+ }
+ }
+ }
}
/**
- * Creates a new instance with a default cached as defined by
+ * Creates a new instance with a default cache as defined by
* {@link #defaultTaxonomyWriterCache()}.
*/
public DirectoryTaxonomyWriter(Directory directory, OpenMode openMode)
@@ -335,7 +342,7 @@ public class DirectoryTaxonomyWriter imp
*/
@Override
public synchronized void close() throws CorruptIndexException, IOException {
- if (indexWriter != null) {
+ if (!isClosed) {
indexWriter.commit(combinedCommitData(null));
doClose();
}
@@ -343,7 +350,7 @@ public class DirectoryTaxonomyWriter imp
private void doClose() throws CorruptIndexException, IOException {
indexWriter.close();
- indexWriter = null;
+ isClosed = true;
closeResources();
}
@@ -355,13 +362,12 @@ public class DirectoryTaxonomyWriter imp
* <code>super.closeResources()</code> call in your implementation.
*/
protected synchronized void closeResources() throws IOException {
- if (reader != null) {
- reader.close();
- reader = null;
+ if (readerManager != null) {
+ readerManager.close();
+ readerManager = null;
}
if (cache != null) {
cache.close();
- cache = null;
}
}
@@ -371,52 +377,48 @@ public class DirectoryTaxonomyWriter imp
* category does not yet exist in the taxonomy.
*/
protected int findCategory(CategoryPath categoryPath) throws IOException {
- // If we can find the category in our cache, we can return the
- // response directly from it:
+ // If we can find the category in the cache, or we know the cache is
+ // complete, we can return the response directly from it
int res = cache.get(categoryPath);
- if (res >= 0) {
+ if (res >= 0 || cacheIsComplete) {
return res;
}
- // If we know that the cache is complete, i.e., contains every category
- // which exists, we can return -1 immediately. However, if the cache is
- // not complete, we need to check the disk.
- if (cacheIsComplete) {
- return -1;
- }
- cacheMisses++;
+
+ cacheMisses.incrementAndGet();
// After a few cache misses, it makes sense to read all the categories
// from disk and into the cache. The reason not to do this on the first
// cache miss (or even when opening the writer) is that it will
// significantly slow down the case when a taxonomy is opened just to
// add one category. The idea only spending a long time on reading
- // after enough time was spent on cache misses is known as a "online
+ // after enough time was spent on cache misses is known as an "online
// algorithm".
- if (perhapsFillCache()) {
- return cache.get(categoryPath);
+ perhapsFillCache();
+ res = cache.get(categoryPath);
+ if (res >= 0 || cacheIsComplete) {
+ // if after filling the cache from the info on disk, the category is in it
+ // or the cache is complete, return whatever cache.get returned.
+ return res;
}
- // We need to get an answer from the on-disk index. If a reader
- // is not yet open, do it now:
- if (reader == null) {
- openInternalReader();
- }
+ // We need to get an answer from the on-disk index.
+ initReaderManager();
- int base = 0;
int doc = -1;
- for (AtomicReader r : reader.getSequentialSubReaders()) {
- DocsEnum docs = r.termDocsEnum(null, Consts.FULL,
- new BytesRef(categoryPath.toString(delimiter)), false);
- if (docs != null) {
- doc = docs.nextDoc() + base;
- break;
+ DirectoryReader reader = readerManager.acquire();
+ try {
+ int base = 0;
+ for (AtomicReader r : reader.getSequentialSubReaders()) {
+ DocsEnum docs = r.termDocsEnum(null, Consts.FULL,
+ new BytesRef(categoryPath.toString(delimiter)), false);
+ if (docs != null) {
+ doc = docs.nextDoc() + base;
+ break;
+ }
+ base += r.maxDoc(); // we don't have deletions, so it's ok to call maxDoc
}
- base += r.maxDoc(); // we don't have deletions, so it's ok to call maxDoc
+ } finally {
+ readerManager.release(reader);
}
- // Note: we do NOT add to the cache the fact that the category
- // does not exist. The reason is that our only use for this
- // method is just before we actually add this category. If
- // in the future this usage changes, we should consider caching
- // the fact that the category is not in the taxonomy.
if (doc > 0) {
addToCache(categoryPath, doc);
}
@@ -431,30 +433,34 @@ public class DirectoryTaxonomyWriter imp
private int findCategory(CategoryPath categoryPath, int prefixLen)
throws IOException {
int res = cache.get(categoryPath, prefixLen);
- if (res >= 0) {
+ if (res >= 0 || cacheIsComplete) {
return res;
}
- if (cacheIsComplete) {
- return -1;
- }
- cacheMisses++;
- if (perhapsFillCache()) {
- return cache.get(categoryPath, prefixLen);
- }
- if (reader == null) {
- openInternalReader();
+
+ cacheMisses.incrementAndGet();
+ perhapsFillCache();
+ res = cache.get(categoryPath, prefixLen);
+ if (res >= 0 || cacheIsComplete) {
+ return res;
}
+
+ initReaderManager();
- int base = 0;
int doc = -1;
- for (AtomicReader r : reader.getSequentialSubReaders()) {
- DocsEnum docs = r.termDocsEnum(null, Consts.FULL,
- new BytesRef(categoryPath.toString(delimiter, prefixLen)), false);
- if (docs != null) {
- doc = docs.nextDoc() + base;
- break;
+ DirectoryReader reader = readerManager.acquire();
+ try {
+ int base = 0;
+ for (AtomicReader r : reader.getSequentialSubReaders()) {
+ DocsEnum docs = r.termDocsEnum(null, Consts.FULL,
+ new BytesRef(categoryPath.toString(delimiter, prefixLen)), false);
+ if (docs != null) {
+ doc = docs.nextDoc() + base;
+ break;
+ }
+ base += r.maxDoc(); // we don't have deletions, so it's ok to call maxDoc
}
- base += r.maxDoc(); // we don't have deletions, so it's ok to call maxDoc
+ } finally {
+ readerManager.release(reader);
}
if (doc > 0) {
@@ -526,7 +532,7 @@ public class DirectoryTaxonomyWriter imp
* {@link AlreadyClosedException} if it is.
*/
protected final void ensureOpen() {
- if (indexWriter == null) {
+ if (isClosed) {
throw new AlreadyClosedException("The taxonomy writer has already been closed");
}
}
@@ -560,7 +566,10 @@ public class DirectoryTaxonomyWriter imp
int id = nextID++;
addToCache(categoryPath, length, id);
-
+
+ // added a category document, mark that ReaderManager is not up-to-date
+ shouldRefreshReaderManager = true;
+
// also add to the parent array
getParentArray().add(id, parent);
@@ -598,24 +607,18 @@ public class DirectoryTaxonomyWriter imp
if (returned) {
return false;
}
- returned = true;
- return true;
+ return returned = true;
}
}
private void addToCache(CategoryPath categoryPath, int id) throws IOException {
if (cache.put(categoryPath, id)) {
// If cache.put() returned true, it means the cache was limited in
- // size, became full, so parts of it had to be cleared.
- // Unfortunately we don't know which part was cleared - it is
- // possible that a relatively-new category that hasn't yet been
- // committed to disk (and therefore isn't yet visible in our
- // "reader") was deleted from the cache, and therefore we must
- // now refresh the reader.
- // Because this is a slow operation, cache implementations are
- // expected not to delete entries one-by-one but rather in bulk
- // (LruTaxonomyWriterCache removes the 2/3rd oldest entries).
- refreshInternalReader();
+ // size, became full, and parts of it had to be evicted. It is
+ // possible that a relatively-new category that isn't yet visible
+ // to our 'reader' was evicted, and therefore we must now refresh
+ // the reader.
+ refreshReaderManager();
cacheIsComplete = false;
}
}
@@ -623,18 +626,22 @@ public class DirectoryTaxonomyWriter imp
private void addToCache(CategoryPath categoryPath, int prefixLen, int id)
throws IOException {
if (cache.put(categoryPath, prefixLen, id)) {
- refreshInternalReader();
+ refreshReaderManager();
cacheIsComplete = false;
}
}
- private synchronized void refreshInternalReader() throws IOException {
- if (reader != null) {
- DirectoryReader r2 = DirectoryReader.openIfChanged(reader);
- if (r2 != null) {
- reader.close();
- reader = r2;
- }
+ private synchronized void refreshReaderManager() throws IOException {
+ // this method is synchronized since it cannot happen concurrently with
+ // addCategoryDocument -- when this method returns, we must know that the
+ // reader manager's state is current. also, it sets shouldRefresh to false,
+ // and this cannot overlap with addCatDoc too.
+ // NOTE: since this method is sync'ed, it can call maybeRefresh, instead of
+ // maybeRefreshBlocking. If ever this is changed, make sure to change the
+ // call too.
+ if (shouldRefreshReaderManager && readerManager != null) {
+ readerManager.maybeRefresh();
+ shouldRefreshReaderManager = false;
}
}
@@ -648,7 +655,6 @@ public class DirectoryTaxonomyWriter imp
public synchronized void commit() throws CorruptIndexException, IOException {
ensureOpen();
indexWriter.commit(combinedCommitData(null));
- refreshInternalReader();
}
/**
@@ -674,7 +680,6 @@ public class DirectoryTaxonomyWriter imp
public synchronized void commit(Map<String,String> commitUserData) throws CorruptIndexException, IOException {
ensureOpen();
indexWriter.commit(combinedCommitData(commitUserData));
- refreshInternalReader();
}
/**
@@ -714,8 +719,6 @@ public class DirectoryTaxonomyWriter imp
return indexWriter.maxDoc();
}
- private boolean alreadyCalledFillCache = false;
-
/**
* Set the number of cache misses before an attempt is made to read the
* entire taxonomy into the in-memory cache.
@@ -742,94 +745,88 @@ public class DirectoryTaxonomyWriter imp
cacheMissesUntilFill = i;
}
- private int cacheMissesUntilFill = 11;
+ // we need to guarantee that if several threads call this concurrently, only
+ // one executes it, and after it returns, the cache is updated and is either
+ // complete or not.
+ private synchronized void perhapsFillCache() throws IOException {
+ if (cacheMisses.get() < cacheMissesUntilFill) {
+ return;
+ }
+
+ if (!shouldFillCache) {
+ // we already filled the cache once, there's no need to re-fill it
+ return;
+ }
+ shouldFillCache = false;
+
+ initReaderManager();
- private boolean perhapsFillCache() throws IOException {
- // Note: we assume that we're only called when cacheIsComplete==false.
- // TODO (Facet): parametrize this criterion:
- if (cacheMisses < cacheMissesUntilFill) {
- return false;
- }
- // If the cache was already filled (or we decided not to fill it because
- // there was no room), there is no sense in trying it again.
- if (alreadyCalledFillCache) {
- return false;
- }
- alreadyCalledFillCache = true;
- // TODO (Facet): we should probably completely clear the cache before starting
- // to read it?
- if (reader == null) {
- openInternalReader();
- }
-
- if (!cache.hasRoom(reader.numDocs())) {
- return false;
- }
-
- CategoryPath cp = new CategoryPath();
- TermsEnum termsEnum = null;
- DocsEnum docsEnum = null;
- int base = 0;
- for (AtomicReader r : reader.getSequentialSubReaders()) {
- Terms terms = r.terms(Consts.FULL);
- if (terms != null) { // cannot really happen, but be on the safe side
- termsEnum = terms.iterator(termsEnum);
- while (termsEnum.next() != null) {
- BytesRef t = termsEnum.term();
- // Since we guarantee uniqueness of categories, each term has exactly
- // one document. Also, since we do not allow removing categories (and
- // hence documents), there are no deletions in the index. Therefore, it
- // is sufficient to call next(), and then doc(), exactly once with no
- // 'validation' checks.
- cp.clear();
- cp.add(t.utf8ToString(), delimiter);
- docsEnum = termsEnum.docs(null, docsEnum, false);
- cache.put(cp, docsEnum.nextDoc() + base);
+ boolean aborted = false;
+ DirectoryReader reader = readerManager.acquire();
+ try {
+ CategoryPath cp = new CategoryPath();
+ TermsEnum termsEnum = null;
+ DocsEnum docsEnum = null;
+ int base = 0;
+ for (AtomicReader r : reader.getSequentialSubReaders()) {
+ Terms terms = r.terms(Consts.FULL);
+ if (terms != null) { // cannot really happen, but be on the safe side
+ termsEnum = terms.iterator(termsEnum);
+ while (termsEnum.next() != null) {
+ if (!cache.isFull()) {
+ BytesRef t = termsEnum.term();
+ // Since we guarantee uniqueness of categories, each term has exactly
+ // one document. Also, since we do not allow removing categories (and
+ // hence documents), there are no deletions in the index. Therefore, it
+ // is sufficient to call next(), and then doc(), exactly once with no
+ // 'validation' checks.
+ cp.clear();
+ cp.add(t.utf8ToString(), delimiter);
+ docsEnum = termsEnum.docs(null, docsEnum, false);
+ boolean res = cache.put(cp, docsEnum.nextDoc() + base);
+ assert !res : "entries should not have been evicted from the cache";
+ } else {
+ // the cache is full and the next put() will evict entries from it, therefore abort the iteration.
+ aborted = true;
+ break;
+ }
+ }
}
+ if (aborted) {
+ break;
+ }
+ base += r.maxDoc(); // we don't have any deletions, so we're ok
}
- base += r.maxDoc(); // we don't have any deletions, so we're ok
+ } finally {
+ readerManager.release(reader);
}
- /*Terms terms = MultiFields.getTerms(reader, Consts.FULL);
- // The check is done here to avoid checking it on every iteration of the
- // below loop. A null term wlil be returned if there are no terms in the
- // lexicon, or after the Consts.FULL term. However while the loop is
- // executed we're safe, because we only iterate as long as there are next()
- // terms.
- if (terms != null) {
- TermsEnum termsEnum = terms.iterator(null);
- Bits liveDocs = MultiFields.getLiveDocs(reader);
- DocsEnum docsEnum = null;
- while (termsEnum.next() != null) {
- BytesRef t = termsEnum.term();
- // Since we guarantee uniqueness of categories, each term has exactly
- // one document. Also, since we do not allow removing categories (and
- // hence documents), there are no deletions in the index. Therefore, it
- // is sufficient to call next(), and then doc(), exactly once with no
- // 'validation' checks.
- docsEnum = termsEnum.docs(liveDocs, docsEnum, false);
- docsEnum.nextDoc();
- cp.clear();
- cp.add(t.utf8ToString(), delimiter);
- cache.put(cp, docsEnum.docID());
- }
- }*/
- cacheIsComplete = true;
- // No sense to keep the reader open - we will not need to read from it
- // if everything is in the cache.
- reader.close();
- reader = null;
- return true;
+ cacheIsComplete = !aborted;
+ if (cacheIsComplete) {
+ synchronized (this) {
+ // everything is in the cache, so no need to keep readerManager open.
+ // this block is executed in a sync block so that it works well with
+ // initReaderManager called in parallel.
+ readerManager.close();
+ readerManager = null;
+ }
+ }
}
- private ParentArray parentArray;
- private synchronized ParentArray getParentArray() throws IOException {
- if (parentArray==null) {
- if (reader == null) {
- openInternalReader();
+ private ParentArray getParentArray() throws IOException {
+ if (parentArray == null) {
+ synchronized (this) {
+ if (parentArray == null) {
+ initReaderManager();
+ parentArray = new ParentArray();
+ DirectoryReader reader = readerManager.acquire();
+ try {
+ parentArray.refresh(reader);
+ } finally {
+ readerManager.release(reader);
+ }
+ }
}
- parentArray = new ParentArray();
- parentArray.refresh(reader);
}
return parentArray;
}
@@ -1029,18 +1026,18 @@ public class DirectoryTaxonomyWriter imp
* {@link IndexWriter#addIndexes(Directory...)} to replace both the taxonomy
* as well as the search index content.
*/
- public void replaceTaxonomy(Directory taxoDir) throws IOException {
+ public synchronized void replaceTaxonomy(Directory taxoDir) throws IOException {
// replace the taxonomy by doing IW optimized operations
indexWriter.deleteAll();
indexWriter.addIndexes(taxoDir);
- refreshInternalReader();
+ shouldRefreshReaderManager = true;
nextID = indexWriter.maxDoc();
// need to clear the cache, so that addCategory won't accidentally return
// old categories that are in the cache.
cache.clear();
cacheIsComplete = false;
- alreadyCalledFillCache = false;
+ shouldFillCache = true;
// update createTime as a taxonomy replace is just like it has be recreated
createTime = Long.toString(System.nanoTime());
Modified: lucene/dev/trunk/lucene/facet/src/java/org/apache/lucene/facet/taxonomy/writercache/TaxonomyWriterCache.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/facet/src/java/org/apache/lucene/facet/taxonomy/writercache/TaxonomyWriterCache.java?rev=1349214&r1=1349213&r2=1349214&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/facet/src/java/org/apache/lucene/facet/taxonomy/writercache/TaxonomyWriterCache.java (original)
+++ lucene/dev/trunk/lucene/facet/src/java/org/apache/lucene/facet/taxonomy/writercache/TaxonomyWriterCache.java Tue Jun 12 09:40:23 2012
@@ -97,20 +97,13 @@ public interface TaxonomyWriterCache {
* If the given length is negative or bigger than the path's actual
* length, the full path is taken.
*/
- public boolean put(CategoryPath categoryPath, int prefixLen, int ordinal);
-
+ public boolean put(CategoryPath categoryPath, int prefixLen, int ordinal);
+
/**
- * Sometimes the cache is either unlimited in size, or limited by a very
- * big size, and in that case when we add a lot of categories it might
- * make sense to pre-load the cache with all the existing categories.
- * However, this pre-load does not make sense when the allowed cache
- * size is small. The hasRoom() method allows to differentiate between
- * these cases.
- * <P>
- * After hasRoom(n) returned <code>true</code>, the following n put()
- * should return false (meaning that the cache was not cleared).
+ * Returns true if the cache is full, such that the next {@link #put} will
+ * evict entries from it, false otherwise.
*/
- public boolean hasRoom(int numberOfEntries);
+ public boolean isFull();
/**
* Clears the content of the cache. Unlike {@link #close()}, the caller can
Modified: lucene/dev/trunk/lucene/facet/src/java/org/apache/lucene/facet/taxonomy/writercache/cl2o/Cl2oTaxonomyWriterCache.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/facet/src/java/org/apache/lucene/facet/taxonomy/writercache/cl2o/Cl2oTaxonomyWriterCache.java?rev=1349214&r1=1349213&r2=1349214&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/facet/src/java/org/apache/lucene/facet/taxonomy/writercache/cl2o/Cl2oTaxonomyWriterCache.java (original)
+++ lucene/dev/trunk/lucene/facet/src/java/org/apache/lucene/facet/taxonomy/writercache/cl2o/Cl2oTaxonomyWriterCache.java Tue Jun 12 09:40:23 2012
@@ -37,7 +37,7 @@ public class Cl2oTaxonomyWriterCache imp
private final int initialCapcity, numHashArrays;
private final float loadFactor;
- private CompactLabelToOrdinal cache;
+ private volatile CompactLabelToOrdinal cache;
public Cl2oTaxonomyWriterCache(int initialCapcity, float loadFactor, int numHashArrays) {
this.cache = new CompactLabelToOrdinal(initialCapcity, loadFactor, numHashArrays);
@@ -48,7 +48,12 @@ public class Cl2oTaxonomyWriterCache imp
@Override
public void clear() {
- cache = new CompactLabelToOrdinal(initialCapcity, loadFactor, numHashArrays);
+ lock.writeLock().lock();
+ try {
+ cache = new CompactLabelToOrdinal(initialCapcity, loadFactor, numHashArrays);
+ } finally {
+ lock.writeLock().unlock();
+ }
}
@Override
@@ -57,9 +62,9 @@ public class Cl2oTaxonomyWriterCache imp
}
@Override
- public boolean hasRoom(int n) {
- // This cache is unlimited, so we always have room for remembering more:
- return true;
+ public boolean isFull() {
+ // This cache is never full
+ return false;
}
@Override
Modified: lucene/dev/trunk/lucene/facet/src/java/org/apache/lucene/facet/taxonomy/writercache/lru/LruTaxonomyWriterCache.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/facet/src/java/org/apache/lucene/facet/taxonomy/writercache/lru/LruTaxonomyWriterCache.java?rev=1349214&r1=1349213&r2=1349214&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/facet/src/java/org/apache/lucene/facet/taxonomy/writercache/lru/LruTaxonomyWriterCache.java (original)
+++ lucene/dev/trunk/lucene/facet/src/java/org/apache/lucene/facet/taxonomy/writercache/lru/LruTaxonomyWriterCache.java Tue Jun 12 09:40:23 2012
@@ -61,12 +61,12 @@ public class LruTaxonomyWriterCache impl
}
@Override
- public synchronized boolean hasRoom(int n) {
- return n <= (cache.getMaxSize() - cache.getSize());
+ public synchronized boolean isFull() {
+ return cache.getSize() == cache.getMaxSize();
}
@Override
- public void clear() {
+ public synchronized void clear() {
cache.clear();
}
Modified: lucene/dev/trunk/lucene/facet/src/java/org/apache/lucene/facet/taxonomy/writercache/lru/NameIntCacheLRU.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/facet/src/java/org/apache/lucene/facet/taxonomy/writercache/lru/NameIntCacheLRU.java?rev=1349214&r1=1349213&r2=1349214&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/facet/src/java/org/apache/lucene/facet/taxonomy/writercache/lru/NameIntCacheLRU.java (original)
+++ lucene/dev/trunk/lucene/facet/src/java/org/apache/lucene/facet/taxonomy/writercache/lru/NameIntCacheLRU.java Tue Jun 12 09:40:23 2012
@@ -105,7 +105,7 @@ class NameIntCacheLRU {
}
private boolean isCacheFull() {
- return (cache.size()>maxCacheSize);
+ return cache.size() > maxCacheSize;
}
void clear() {
Modified: lucene/dev/trunk/lucene/facet/src/test/org/apache/lucene/facet/taxonomy/directory/TestDirectoryTaxonomyWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/lucene/facet/src/test/org/apache/lucene/facet/taxonomy/directory/TestDirectoryTaxonomyWriter.java?rev=1349214&r1=1349213&r2=1349214&view=diff
==============================================================================
--- lucene/dev/trunk/lucene/facet/src/test/org/apache/lucene/facet/taxonomy/directory/TestDirectoryTaxonomyWriter.java (original)
+++ lucene/dev/trunk/lucene/facet/src/test/org/apache/lucene/facet/taxonomy/directory/TestDirectoryTaxonomyWriter.java Tue Jun 12 09:40:23 2012
@@ -59,7 +59,7 @@ public class TestDirectoryTaxonomyWriter
@Override
public boolean put(CategoryPath categoryPath, int prefixLen, int ordinal) { return true; }
@Override
- public boolean hasRoom(int numberOfEntries) { return false; }
+ public boolean isFull() { return true; }
@Override
public void clear() {}
@@ -217,14 +217,24 @@ public class TestDirectoryTaxonomyWriter
}
public void testConcurrency() throws Exception {
- int ncats = atLeast(100000); // add many categories
+ final int ncats = atLeast(100000); // add many categories
final int range = ncats * 3; // affects the categories selection
final AtomicInteger numCats = new AtomicInteger(ncats);
- Directory dir = newDirectory();
+ final Directory dir = newDirectory();
final ConcurrentHashMap<Integer,Integer> values = new ConcurrentHashMap<Integer,Integer>();
- TaxonomyWriterCache cache = random().nextBoolean()
- ? new Cl2oTaxonomyWriterCache(1024, 0.15f, 3)
- : new LruTaxonomyWriterCache(ncats / 10);
+ final double d = random().nextDouble();
+ final TaxonomyWriterCache cache;
+ if (d < 0.7) {
+ // this is the fastest, yet most memory consuming
+ cache = new Cl2oTaxonomyWriterCache(1024, 0.15f, 3);
+ } else if (TEST_NIGHTLY && d > 0.98) {
+ // this is the slowest, but tests the writer concurrency when no caching is done.
+ // only pick it during NIGHTLY tests, and even then, with very low chances.
+ cache = new NoOpCache();
+ } else {
+ // this is slower than CL2O, but less memory consuming, and exercises finding categories on disk too.
+ cache = new LruTaxonomyWriterCache(ncats / 10);
+ }
final DirectoryTaxonomyWriter tw = new DirectoryTaxonomyWriter(dir, OpenMode.CREATE, cache);
Thread[] addThreads = new Thread[atLeast(4)];
for (int z = 0; z < addThreads.length; z++) {
@@ -250,7 +260,7 @@ public class TestDirectoryTaxonomyWriter
tw.close();
DirectoryTaxonomyReader dtr = new DirectoryTaxonomyReader(dir);
- assertEquals(values.size() + 2, dtr.getSize()); // +2 for root category + "a"
+ assertEquals("mismatch number of categories", values.size() + 2, dtr.getSize()); // +2 for root category + "a"
for (Integer value : values.keySet()) {
assertTrue("category not found a/" + value, dtr.getOrdinal(new CategoryPath("a", value.toString())) > 0);
}