You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2016/09/06 16:25:52 UTC
[1/2] nifi git commit: NIFI-2681: Refactored IndexManager into an
interface and renamed the existing implementation to CachingIndexManager.
Implemented a new SimpleIndexManager that performs no caching of
IndexSearchers.
Repository: nifi
Updated Branches:
refs/heads/master a9d029d74 -> 088125451
http://git-wip-us.apache.org/repos/asf/nifi/blob/08812545/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/lucene/TestSimpleIndexManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/lucene/TestSimpleIndexManager.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/lucene/TestSimpleIndexManager.java
new file mode 100644
index 0000000..834177f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/lucene/TestSimpleIndexManager.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance.lucene;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field.Store;
+import org.apache.lucene.document.StringField;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.MatchAllDocsQuery;
+import org.apache.lucene.search.TopDocs;
+import org.apache.nifi.util.file.FileUtils;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestSimpleIndexManager {
+ @BeforeClass
+ public static void setLogLevel() {
+ System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance", "DEBUG");
+ }
+
+
+ @Test
+ public void testMultipleWritersSimultaneouslySameIndex() throws IOException {
+ final SimpleIndexManager mgr = new SimpleIndexManager();
+ final File dir = new File("target/" + UUID.randomUUID().toString());
+ try {
+ final IndexWriter writer1 = mgr.borrowIndexWriter(dir);
+ final IndexWriter writer2 = mgr.borrowIndexWriter(dir);
+
+ final Document doc1 = new Document();
+ doc1.add(new StringField("id", "1", Store.YES));
+
+ final Document doc2 = new Document();
+ doc2.add(new StringField("id", "2", Store.YES));
+
+ writer1.addDocument(doc1);
+ writer2.addDocument(doc2);
+ mgr.returnIndexWriter(dir, writer2);
+ mgr.returnIndexWriter(dir, writer1);
+
+ final IndexSearcher searcher = mgr.borrowIndexSearcher(dir);
+ final TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), 2);
+ assertEquals(2, topDocs.totalHits);
+ mgr.returnIndexSearcher(dir, searcher);
+ } finally {
+ FileUtils.deleteFile(dir, true);
+ }
+ }
+
+}
[2/2] nifi git commit: NIFI-2681: Refactored IndexManager into an
interface and renamed the existing implementation to CachingIndexManager.
Implemented a new SimpleIndexManager that performs no caching of
IndexSearchers.
Posted by bb...@apache.org.
NIFI-2681: Refactored IndexManager into an interface and renamed the existing implementation to CachingIndexManager. Implemented a new SimpleIndexManager that performs no caching of IndexSearchers.
This closes #958.
Signed-off-by: Bryan Bende <bb...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/08812545
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/08812545
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/08812545
Branch: refs/heads/master
Commit: 088125451b7f15684752a914a3a83a834fac1a50
Parents: a9d029d
Author: Mark Payne <ma...@hotmail.com>
Authored: Fri Aug 26 20:03:16 2016 -0400
Committer: Bryan Bende <bb...@apache.org>
Committed: Tue Sep 6 12:25:28 2016 -0400
----------------------------------------------------------------------
.../PersistentProvenanceRepository.java | 177 +++---
.../provenance/lucene/CachingIndexManager.java | 536 +++++++++++++++++++
.../nifi/provenance/lucene/IndexManager.java | 514 +-----------------
.../provenance/lucene/SimpleIndexManager.java | 209 ++++++++
.../TestPersistentProvenanceRepository.java | 7 +-
.../lucene/TestCachingIndexManager.java | 114 ++++
.../provenance/lucene/TestIndexManager.java | 114 ----
.../lucene/TestSimpleIndexManager.java | 72 +++
8 files changed, 1034 insertions(+), 709 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/08812545/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
index da3a6f5..0788716 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
@@ -16,6 +16,51 @@
*/
package org.apache.nifi.provenance;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileNotFoundException;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
import org.apache.lucene.document.Document;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexNotFoundException;
@@ -46,6 +91,7 @@ import org.apache.nifi.provenance.lucene.IndexSearch;
import org.apache.nifi.provenance.lucene.IndexingAction;
import org.apache.nifi.provenance.lucene.LineageQuery;
import org.apache.nifi.provenance.lucene.LuceneUtil;
+import org.apache.nifi.provenance.lucene.SimpleIndexManager;
import org.apache.nifi.provenance.lucene.UpdateMinimumEventId;
import org.apache.nifi.provenance.search.Query;
import org.apache.nifi.provenance.search.QueryResult;
@@ -68,51 +114,6 @@ import org.apache.nifi.web.ResourceNotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.EOFException;
-import java.io.File;
-import java.io.FileFilter;
-import java.io.FileNotFoundException;
-import java.io.FilenameFilter;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-
public class PersistentProvenanceRepository implements ProvenanceRepository {
public static final String EVENT_CATEGORY = "Provenance Repository";
@@ -226,7 +227,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
this.maxPartitionMillis = configuration.getMaxEventFileLife(TimeUnit.MILLISECONDS);
this.maxPartitionBytes = configuration.getMaxEventFileCapacity();
this.indexConfig = new IndexConfiguration(configuration);
- this.indexManager = new IndexManager();
+ this.indexManager = new SimpleIndexManager();
this.alwaysSync = configuration.isAlwaysSync();
this.rolloverCheckMillis = rolloverCheckMillis;
@@ -1303,57 +1304,61 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
final Runnable rolloverRunnable = new Runnable() {
@Override
public void run() {
-
File fileRolledOver = null;
try {
- fileRolledOver = mergeJournals(journalsToMerge, getMergeFile(journalsToMerge, storageDir), eventReporter);
- } catch (final IOException ioe) {
- logger.error("Failed to merge Journal Files {} into a Provenance Log File due to {}", journalsToMerge, ioe.toString());
- logger.error("", ioe);
- }
-
- if (fileRolledOver != null) {
-
- final File file = fileRolledOver;
-
- // update our map of id to Path
- // We need to make sure that another thread doesn't also update the map at the same time. We cannot
- // use the write lock when purging old events, and we want to use the same approach here.
- boolean updated = false;
- final Long fileFirstEventId = Long.valueOf(LuceneUtil.substringBefore(fileRolledOver.getName(), "."));
- while (!updated) {
- final SortedMap<Long, Path> existingPathMap = idToPathMap.get();
- final SortedMap<Long, Path> newIdToPathMap = new TreeMap<>(new PathMapComparator());
- newIdToPathMap.putAll(existingPathMap);
- newIdToPathMap.put(fileFirstEventId, file.toPath());
- updated = idToPathMap.compareAndSet(existingPathMap, newIdToPathMap);
+ try {
+ fileRolledOver = mergeJournals(journalsToMerge, getMergeFile(journalsToMerge, storageDir), eventReporter);
+ } catch (final IOException ioe) {
+ logger.error("Failed to merge Journal Files {} into a Provenance Log File due to {}", journalsToMerge, ioe.toString());
+ logger.error("", ioe);
}
- logger.info("Successfully Rolled over Provenance Event file containing {} records", recordsWritten);
- }
-
- //if files were rolled over or if out of retries stop the future
- if (fileRolledOver != null || retryAttempts.decrementAndGet() == 0) {
+ if (fileRolledOver != null) {
+
+ final File file = fileRolledOver;
+
+ // update our map of id to Path
+ // We need to make sure that another thread doesn't also update the map at the same time. We cannot
+ // use the write lock when purging old events, and we want to use the same approach here.
+ boolean updated = false;
+ final Long fileFirstEventId = Long.valueOf(LuceneUtil.substringBefore(fileRolledOver.getName(), "."));
+ while (!updated) {
+ final SortedMap<Long, Path> existingPathMap = idToPathMap.get();
+ final SortedMap<Long, Path> newIdToPathMap = new TreeMap<>(new PathMapComparator());
+ newIdToPathMap.putAll(existingPathMap);
+ newIdToPathMap.put(fileFirstEventId, file.toPath());
+ updated = idToPathMap.compareAndSet(existingPathMap, newIdToPathMap);
+ }
- if (fileRolledOver == null && retryAttempts.get() == 0) {
- logger.error("Failed to merge Journal Files {} after {} attempts. ", journalsToMerge, MAX_JOURNAL_ROLLOVER_RETRIES);
+ logger.info("Successfully Rolled over Provenance Event file containing {} records", recordsWritten);
}
- rolloverCompletions.getAndIncrement();
+ //if files were rolled over or if out of retries stop the future
+ if (fileRolledOver != null || retryAttempts.decrementAndGet() == 0) {
- // Cancel the future so that we don't run anymore
- Future<?> future;
- while ((future = futureReference.get()) == null) {
- try {
- Thread.sleep(10L);
- } catch (final InterruptedException ie) {
+ if (fileRolledOver == null && retryAttempts.get() == 0) {
+ logger.error("Failed to merge Journal Files {} after {} attempts. ", journalsToMerge, MAX_JOURNAL_ROLLOVER_RETRIES);
}
- }
- future.cancel(false);
- } else {
- logger.warn("Couldn't merge journals. Will try again. journalsToMerge: {}, storageDir: {}", journalsToMerge, storageDir);
+ rolloverCompletions.getAndIncrement();
+
+ // Cancel the future so that we don't run anymore
+ Future<?> future;
+ while ((future = futureReference.get()) == null) {
+ try {
+ Thread.sleep(10L);
+ } catch (final InterruptedException ie) {
+ }
+ }
+ future.cancel(false);
+
+ } else {
+ logger.warn("Couldn't merge journals. Will try again. journalsToMerge: {}, storageDir: {}", journalsToMerge, storageDir);
+ }
+ } catch (final Exception e) {
+ logger.error("Failed to merge journals. Will try again. journalsToMerge: {}, storageDir: {}, cause: {}", journalsToMerge, storageDir, e.toString());
+ logger.error("", e);
}
}
};
http://git-wip-us.apache.org/repos/asf/nifi/blob/08812545/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/CachingIndexManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/CachingIndexManager.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/CachingIndexManager.java
new file mode 100644
index 0000000..ddfa0db
--- /dev/null
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/CachingIndexManager.java
@@ -0,0 +1,536 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.lucene;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FSDirectory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CachingIndexManager implements Closeable, IndexManager {
+ private static final Logger logger = LoggerFactory.getLogger(CachingIndexManager.class);
+
+ private final Lock lock = new ReentrantLock();
+ private final Map<File, IndexWriterCount> writerCounts = new HashMap<>();
+ private final Map<File, List<ActiveIndexSearcher>> activeSearchers = new HashMap<>();
+
+
+ public void removeIndex(final File indexDirectory) {
+ final File absoluteFile = indexDirectory.getAbsoluteFile();
+ logger.info("Removing index {}", indexDirectory);
+
+ lock.lock();
+ try {
+ final IndexWriterCount count = writerCounts.remove(absoluteFile);
+ if ( count != null ) {
+ try {
+ count.close();
+ } catch (final IOException ioe) {
+ logger.warn("Failed to close Index Writer {} for {}", count.getWriter(), absoluteFile);
+ if ( logger.isDebugEnabled() ) {
+ logger.warn("", ioe);
+ }
+ }
+ }
+
+ final List<ActiveIndexSearcher> searcherList = activeSearchers.remove(absoluteFile);
+ if (searcherList != null) {
+ for ( final ActiveIndexSearcher searcher : searcherList ) {
+ try {
+ searcher.close();
+ } catch (final IOException ioe) {
+ logger.warn("Failed to close Index Searcher {} for {} due to {}",
+ searcher.getSearcher(), absoluteFile, ioe);
+ if ( logger.isDebugEnabled() ) {
+ logger.warn("", ioe);
+ }
+ }
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public IndexWriter borrowIndexWriter(final File indexingDirectory) throws IOException {
+ final File absoluteFile = indexingDirectory.getAbsoluteFile();
+ logger.trace("Borrowing index writer for {}", indexingDirectory);
+
+ lock.lock();
+ try {
+ IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
+ if ( writerCount == null ) {
+ final List<Closeable> closeables = new ArrayList<>();
+ final Directory directory = FSDirectory.open(indexingDirectory);
+ closeables.add(directory);
+
+ try {
+ final Analyzer analyzer = new StandardAnalyzer();
+ closeables.add(analyzer);
+
+ final IndexWriterConfig config = new IndexWriterConfig(LuceneUtil.LUCENE_VERSION, analyzer);
+ config.setWriteLockTimeout(300000L);
+
+ final IndexWriter indexWriter = new IndexWriter(directory, config);
+ writerCount = new IndexWriterCount(indexWriter, analyzer, directory, 1);
+ logger.debug("Providing new index writer for {}", indexingDirectory);
+ } catch (final IOException ioe) {
+ for ( final Closeable closeable : closeables ) {
+ try {
+ closeable.close();
+ } catch (final IOException ioe2) {
+ ioe.addSuppressed(ioe2);
+ }
+ }
+
+ throw ioe;
+ }
+
+ writerCounts.put(absoluteFile, writerCount);
+
+ // Mark any active searchers as poisoned because we are updating the index
+ final List<ActiveIndexSearcher> searchers = activeSearchers.get(absoluteFile);
+ if ( searchers != null ) {
+ for (final ActiveIndexSearcher activeSearcher : searchers) {
+ logger.debug("Poisoning {} because it is searching {}, which is getting updated", activeSearcher, indexingDirectory);
+ activeSearcher.poison();
+ }
+ }
+ } else {
+ logger.debug("Providing existing index writer for {} and incrementing count to {}", indexingDirectory, writerCount.getCount() + 1);
+ writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
+ writerCount.getAnalyzer(), writerCount.getDirectory(), writerCount.getCount() + 1));
+ }
+
+ return writerCount.getWriter();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public void returnIndexWriter(final File indexingDirectory, final IndexWriter writer) {
+ final File absoluteFile = indexingDirectory.getAbsoluteFile();
+ logger.trace("Returning Index Writer for {} to IndexManager", indexingDirectory);
+
+ lock.lock();
+ try {
+ final IndexWriterCount count = writerCounts.remove(absoluteFile);
+
+ try {
+ if ( count == null ) {
+ logger.warn("Index Writer {} was returned to IndexManager for {}, but this writer is not known. "
+ + "This could potentially lead to a resource leak", writer, indexingDirectory);
+ writer.close();
+ } else if ( count.getCount() <= 1 ) {
+ // we are finished with this writer.
+ logger.debug("Decrementing count for Index Writer for {} to {}; Closing writer", indexingDirectory, count.getCount() - 1);
+ count.close();
+ } else {
+ // decrement the count.
+ logger.debug("Decrementing count for Index Writer for {} to {}", indexingDirectory, count.getCount() - 1);
+ writerCounts.put(absoluteFile, new IndexWriterCount(count.getWriter(), count.getAnalyzer(), count.getDirectory(), count.getCount() - 1));
+ }
+ } catch (final IOException ioe) {
+ logger.warn("Failed to close Index Writer {} due to {}", writer, ioe);
+ if ( logger.isDebugEnabled() ) {
+ logger.warn("", ioe);
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+
+ public IndexSearcher borrowIndexSearcher(final File indexDir) throws IOException {
+ final File absoluteFile = indexDir.getAbsoluteFile();
+ logger.trace("Borrowing index searcher for {}", indexDir);
+
+ lock.lock();
+ try {
+ // check if we already have a reader cached.
+ List<ActiveIndexSearcher> currentlyCached = activeSearchers.get(absoluteFile);
+ if ( currentlyCached == null ) {
+ currentlyCached = new ArrayList<>();
+ activeSearchers.put(absoluteFile, currentlyCached);
+ } else {
+ // keep track of any searchers that have been closed so that we can remove them
+ // from our cache later.
+ for (final ActiveIndexSearcher searcher : currentlyCached) {
+ if (searcher.isCache()) {
+ // if the searcher is poisoned, we want to close and expire it.
+ if (searcher.isPoisoned()) {
+ continue;
+ }
+
+ // if there are no references to the reader, it will have been closed. Since there is no
+ // isClosed() method, this is how we determine whether it's been closed or not.
+ final int refCount = searcher.getSearcher().getIndexReader().getRefCount();
+ if (refCount <= 0) {
+ // if refCount == 0, then the reader has been closed, so we cannot use the searcher
+ logger.debug("Reference count for cached Index Searcher for {} is currently {}; "
+ + "removing cached searcher", absoluteFile, refCount);
+ continue;
+ }
+
+ final int referenceCount = searcher.incrementReferenceCount();
+ logger.debug("Providing previously cached index searcher for {} and incrementing Reference Count to {}", indexDir, referenceCount);
+ return searcher.getSearcher();
+ }
+ }
+ }
+
+ // We found no cached Index Readers. Create a new one. To do this, we need to check
+ // if we have an Index Writer, and if so create a Reader based on the Index Writer.
+ // This will provide us a 'near real time' index reader.
+ final IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
+ if ( writerCount == null ) {
+ final Directory directory = FSDirectory.open(absoluteFile);
+ logger.debug("No Index Writer currently exists for {}; creating a cachable reader", indexDir);
+
+ try {
+ final DirectoryReader directoryReader = DirectoryReader.open(directory);
+ final IndexSearcher searcher = new IndexSearcher(directoryReader);
+
+ // we want to cache the searcher that we create, since it's just a reader.
+ final ActiveIndexSearcher cached = new ActiveIndexSearcher(searcher, absoluteFile, directoryReader, directory, true);
+ currentlyCached.add(cached);
+
+ return cached.getSearcher();
+ } catch (final IOException e) {
+ logger.error("Failed to create Index Searcher for {} due to {}", absoluteFile, e.toString());
+ logger.error("", e);
+
+ try {
+ directory.close();
+ } catch (final IOException ioe) {
+ e.addSuppressed(ioe);
+ }
+
+ throw e;
+ }
+ } else {
+ logger.debug("Index Writer currently exists for {}; creating a non-cachable reader and incrementing "
+ + "counter to {}", indexDir, writerCount.getCount() + 1);
+
+ // increment the writer count to ensure that it's kept open.
+ writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
+ writerCount.getAnalyzer(), writerCount.getDirectory(), writerCount.getCount() + 1));
+
+ // create a new Index Searcher from the writer so that we don't have an issue with trying
+ // to read from a directory that's locked. If we get the "no segments* file found" with
+ // Lucene, this indicates that an IndexWriter already has the directory open.
+ final IndexWriter writer = writerCount.getWriter();
+ final DirectoryReader directoryReader = DirectoryReader.open(writer, false);
+ final IndexSearcher searcher = new IndexSearcher(directoryReader);
+
+ // we don't want to cache this searcher because it's based on a writer, so we want to get
+ // new values the next time that we search.
+ final ActiveIndexSearcher activeSearcher = new ActiveIndexSearcher(searcher, absoluteFile, directoryReader, null, false);
+
+ currentlyCached.add(activeSearcher);
+ return activeSearcher.getSearcher();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+
+ public void returnIndexSearcher(final File indexDirectory, final IndexSearcher searcher) {
+ final File absoluteFile = indexDirectory.getAbsoluteFile();
+ logger.trace("Returning index searcher for {} to IndexManager", indexDirectory);
+
+ lock.lock();
+ try {
+ // check if we already have a reader cached.
+ final List<ActiveIndexSearcher> currentlyCached = activeSearchers.get(absoluteFile);
+ if ( currentlyCached == null ) {
+ logger.warn("Received Index Searcher for {} but no searcher was provided for that directory; this could "
+ + "result in a resource leak", indexDirectory);
+ return;
+ }
+
+ // Check if the given searcher is in our list. We use an Iterator to do this so that if we
+ // find it we can call remove() on the iterator if need be.
+ final Iterator<ActiveIndexSearcher> itr = new ArrayList<>(currentlyCached).iterator();
+ boolean activeSearcherFound = false;
+ while (itr.hasNext()) {
+ final ActiveIndexSearcher activeSearcher = itr.next();
+ if ( activeSearcher.getSearcher().equals(searcher) ) {
+ activeSearcherFound = true;
+ if ( activeSearcher.isCache() ) {
+ // if the searcher is poisoned, close it and remove from "pool". Otherwise,
+ // just decrement the count. Note here that when we call close() it won't actually close
+ // the underlying directory reader unless there are no more references to it
+ if ( activeSearcher.isPoisoned() ) {
+ itr.remove();
+
+ try {
+ activeSearcher.close();
+ } catch (final IOException ioe) {
+ logger.warn("Failed to close Index Searcher for {} due to {}", absoluteFile, ioe);
+ if ( logger.isDebugEnabled() ) {
+ logger.warn("", ioe);
+ }
+ }
+
+ return;
+ } else {
+ // the searcher is cached. Just leave it open.
+ final int refCount = activeSearcher.decrementReferenceCount();
+ logger.debug("Index searcher for {} is cached; leaving open with reference count of {}", indexDirectory, refCount);
+ return;
+ }
+ } else {
+ // searcher is not cached. It was created from a writer, and we want
+ // the newest updates the next time that we get a searcher, so we will
+ // go ahead and close this one out.
+ itr.remove();
+
+ // decrement the writer count because we incremented it when creating the searcher
+ final IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
+ if ( writerCount != null ) {
+ if ( writerCount.getCount() <= 1 ) {
+ try {
+ logger.debug("Index searcher for {} is not cached. Writer count is "
+ + "decremented to {}; closing writer", indexDirectory, writerCount.getCount() - 1);
+
+ writerCount.close();
+ } catch (final IOException ioe) {
+ logger.warn("Failed to close Index Writer for {} due to {}", absoluteFile, ioe);
+ if ( logger.isDebugEnabled() ) {
+ logger.warn("", ioe);
+ }
+ }
+ } else {
+ logger.debug("Index searcher for {} is not cached. Writer count is decremented "
+ + "to {}; leaving writer open", indexDirectory, writerCount.getCount() - 1);
+
+ writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
+ writerCount.getAnalyzer(), writerCount.getDirectory(),
+ writerCount.getCount() - 1));
+ }
+ }
+
+ try {
+ logger.debug("Closing Index Searcher for {}", indexDirectory);
+ final boolean allReferencesClosed = activeSearcher.close();
+ if (!allReferencesClosed) {
+ currentlyCached.add(activeSearcher);
+ }
+ } catch (final IOException ioe) {
+ logger.warn("Failed to close Index Searcher for {} due to {}", absoluteFile, ioe);
+ if ( logger.isDebugEnabled() ) {
+ logger.warn("", ioe);
+ }
+ }
+ }
+ }
+ }
+
+ if (!activeSearcherFound) {
+ logger.debug("Index Searcher {} was returned for {} but found no Active Searcher for it. "
+ + "This will occur if the Index Searcher was already returned while being poisoned.", searcher, indexDirectory);
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ logger.debug("Closing Index Manager");
+
+ lock.lock();
+ try {
+ IOException ioe = null;
+
+ for ( final IndexWriterCount count : writerCounts.values() ) {
+ try {
+ count.close();
+ } catch (final IOException e) {
+ if ( ioe == null ) {
+ ioe = e;
+ } else {
+ ioe.addSuppressed(e);
+ }
+ }
+ }
+
+ for (final List<ActiveIndexSearcher> searcherList : activeSearchers.values()) {
+ for (final ActiveIndexSearcher searcher : searcherList) {
+ try {
+ searcher.close();
+ } catch (final IOException e) {
+ if ( ioe == null ) {
+ ioe = e;
+ } else {
+ ioe.addSuppressed(e);
+ }
+ }
+ }
+ }
+
+ if ( ioe != null ) {
+ throw ioe;
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+
+ private static void close(final Closeable... closeables) throws IOException {
+ IOException ioe = null;
+ for ( final Closeable closeable : closeables ) {
+ if ( closeable == null ) {
+ continue;
+ }
+
+ try {
+ closeable.close();
+ } catch (final IOException e) {
+ if ( ioe == null ) {
+ ioe = e;
+ } else {
+ ioe.addSuppressed(e);
+ }
+ }
+ }
+
+ if ( ioe != null ) {
+ throw ioe;
+ }
+ }
+
+
+ private static class ActiveIndexSearcher {
+ private final IndexSearcher searcher;
+ private final DirectoryReader directoryReader;
+ private final File indexDirectory;
+ private final Directory directory;
+ private final boolean cache;
+ private final AtomicInteger referenceCount = new AtomicInteger(1);
+ private volatile boolean poisoned = false;
+
+ public ActiveIndexSearcher(final IndexSearcher searcher, final File indexDirectory, final DirectoryReader directoryReader,
+ final Directory directory, final boolean cache) {
+ this.searcher = searcher;
+ this.directoryReader = directoryReader;
+ this.indexDirectory = indexDirectory;
+ this.directory = directory;
+ this.cache = cache;
+ }
+
+ public boolean isCache() {
+ return cache;
+ }
+
+ public IndexSearcher getSearcher() {
+ return searcher;
+ }
+
+ public boolean isPoisoned() {
+ return poisoned;
+ }
+
+ public void poison() {
+ this.poisoned = true;
+ }
+
+ public int incrementReferenceCount() {
+ return referenceCount.incrementAndGet();
+ }
+
+ public int decrementReferenceCount() {
+ return referenceCount.decrementAndGet();
+ }
+
+ public boolean close() throws IOException {
+ final int updatedRefCount = referenceCount.decrementAndGet();
+ if (updatedRefCount <= 0) {
+ logger.debug("Decremented Reference Count for {} to {}; closing underlying directory reader", this, updatedRefCount);
+ CachingIndexManager.close(directoryReader, directory);
+ return true;
+ } else {
+ logger.debug("Decremented Reference Count for {} to {}; leaving underlying directory reader open", this, updatedRefCount);
+ return false;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "ActiveIndexSearcher[directory=" + indexDirectory + ", cached=" + cache + ", poisoned=" + poisoned + "]";
+ }
+ }
+
+
+ private static class IndexWriterCount implements Closeable {
+ private final IndexWriter writer;
+ private final Analyzer analyzer;
+ private final Directory directory;
+ private final int count;
+
+ public IndexWriterCount(final IndexWriter writer, final Analyzer analyzer, final Directory directory, final int count) {
+ this.writer = writer;
+ this.analyzer = analyzer;
+ this.directory = directory;
+ this.count = count;
+ }
+
+ public Analyzer getAnalyzer() {
+ return analyzer;
+ }
+
+ public Directory getDirectory() {
+ return directory;
+ }
+
+ public IndexWriter getWriter() {
+ return writer;
+ }
+
+ public int getCount() {
+ return count;
+ }
+
+ @Override
+ public void close() throws IOException {
+ CachingIndexManager.close(writer, analyzer, directory);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/08812545/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
index 3d38cac..f84021f 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
@@ -14,522 +14,24 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.nifi.provenance.lucene;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import org.apache.lucene.analysis.Analyzer;
-import org.apache.lucene.analysis.standard.StandardAnalyzer;
-import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.search.IndexSearcher;
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.FSDirectory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class IndexManager implements Closeable {
- private static final Logger logger = LoggerFactory.getLogger(IndexManager.class);
-
- private final Lock lock = new ReentrantLock();
- private final Map<File, IndexWriterCount> writerCounts = new HashMap<>();
- private final Map<File, List<ActiveIndexSearcher>> activeSearchers = new HashMap<>();
-
-
- public void removeIndex(final File indexDirectory) {
- final File absoluteFile = indexDirectory.getAbsoluteFile();
- logger.info("Removing index {}", indexDirectory);
-
- lock.lock();
- try {
- final IndexWriterCount count = writerCounts.remove(absoluteFile);
- if ( count != null ) {
- try {
- count.close();
- } catch (final IOException ioe) {
- logger.warn("Failed to close Index Writer {} for {}", count.getWriter(), absoluteFile);
- if ( logger.isDebugEnabled() ) {
- logger.warn("", ioe);
- }
- }
- }
-
- for ( final List<ActiveIndexSearcher> searcherList : activeSearchers.values() ) {
- for ( final ActiveIndexSearcher searcher : searcherList ) {
- try {
- searcher.close();
- } catch (final IOException ioe) {
- logger.warn("Failed to close Index Searcher {} for {} due to {}",
- searcher.getSearcher(), absoluteFile, ioe);
- if ( logger.isDebugEnabled() ) {
- logger.warn("", ioe);
- }
- }
- }
- }
- } finally {
- lock.unlock();
- }
- }
-
- public IndexWriter borrowIndexWriter(final File indexingDirectory) throws IOException {
- final File absoluteFile = indexingDirectory.getAbsoluteFile();
- logger.trace("Borrowing index writer for {}", indexingDirectory);
-
- lock.lock();
- try {
- IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
- if ( writerCount == null ) {
- final List<Closeable> closeables = new ArrayList<>();
- final Directory directory = FSDirectory.open(indexingDirectory);
- closeables.add(directory);
-
- try {
- final Analyzer analyzer = new StandardAnalyzer();
- closeables.add(analyzer);
-
- final IndexWriterConfig config = new IndexWriterConfig(LuceneUtil.LUCENE_VERSION, analyzer);
- config.setWriteLockTimeout(300000L);
-
- final IndexWriter indexWriter = new IndexWriter(directory, config);
- writerCount = new IndexWriterCount(indexWriter, analyzer, directory, 1);
- logger.debug("Providing new index writer for {}", indexingDirectory);
- } catch (final IOException ioe) {
- for ( final Closeable closeable : closeables ) {
- try {
- closeable.close();
- } catch (final IOException ioe2) {
- ioe.addSuppressed(ioe2);
- }
- }
-
- throw ioe;
- }
-
- writerCounts.put(absoluteFile, writerCount);
-
- // Mark any active searchers as poisoned because we are updating the index
- final List<ActiveIndexSearcher> searchers = activeSearchers.get(absoluteFile);
- if ( searchers != null ) {
- for (final ActiveIndexSearcher activeSearcher : searchers) {
- logger.debug("Poisoning {} because it is searching {}, which is getting updated", activeSearcher, indexingDirectory);
- activeSearcher.poison();
- }
- }
- } else {
- logger.debug("Providing existing index writer for {} and incrementing count to {}", indexingDirectory, writerCount.getCount() + 1);
- writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
- writerCount.getAnalyzer(), writerCount.getDirectory(), writerCount.getCount() + 1));
- }
-
- return writerCount.getWriter();
- } finally {
- lock.unlock();
- }
- }
-
- public void returnIndexWriter(final File indexingDirectory, final IndexWriter writer) {
- final File absoluteFile = indexingDirectory.getAbsoluteFile();
- logger.trace("Returning Index Writer for {} to IndexManager", indexingDirectory);
-
- lock.lock();
- try {
- final IndexWriterCount count = writerCounts.remove(absoluteFile);
-
- try {
- if ( count == null ) {
- logger.warn("Index Writer {} was returned to IndexManager for {}, but this writer is not known. "
- + "This could potentially lead to a resource leak", writer, indexingDirectory);
- writer.close();
- } else if ( count.getCount() <= 1 ) {
- // we are finished with this writer.
- logger.debug("Decrementing count for Index Writer for {} to {}; Closing writer", indexingDirectory, count.getCount() - 1);
- count.close();
- } else {
- // decrement the count.
- logger.debug("Decrementing count for Index Writer for {} to {}", indexingDirectory, count.getCount() - 1);
- writerCounts.put(absoluteFile, new IndexWriterCount(count.getWriter(), count.getAnalyzer(), count.getDirectory(), count.getCount() - 1));
- }
- } catch (final IOException ioe) {
- logger.warn("Failed to close Index Writer {} due to {}", writer, ioe);
- if ( logger.isDebugEnabled() ) {
- logger.warn("", ioe);
- }
- }
- } finally {
- lock.unlock();
- }
- }
-
-
- public IndexSearcher borrowIndexSearcher(final File indexDir) throws IOException {
- final File absoluteFile = indexDir.getAbsoluteFile();
- logger.trace("Borrowing index searcher for {}", indexDir);
-
- lock.lock();
- try {
- // check if we already have a reader cached.
- List<ActiveIndexSearcher> currentlyCached = activeSearchers.get(absoluteFile);
- if ( currentlyCached == null ) {
- currentlyCached = new ArrayList<>();
- activeSearchers.put(absoluteFile, currentlyCached);
- } else {
- // keep track of any searchers that have been closed so that we can remove them
- // from our cache later.
- for (final ActiveIndexSearcher searcher : currentlyCached) {
- if (searcher.isCache()) {
- // if the searcher is poisoned, we want to close and expire it.
- if (searcher.isPoisoned()) {
- continue;
- }
-
- // if there are no references to the reader, it will have been closed. Since there is no
- // isClosed() method, this is how we determine whether it's been closed or not.
- final int refCount = searcher.getSearcher().getIndexReader().getRefCount();
- if (refCount <= 0) {
- // if refCount == 0, then the reader has been closed, so we cannot use the searcher
- logger.debug("Reference count for cached Index Searcher for {} is currently {}; "
- + "removing cached searcher", absoluteFile, refCount);
- continue;
- }
-
- final int referenceCount = searcher.incrementReferenceCount();
- logger.debug("Providing previously cached index searcher for {} and incrementing Reference Count to {}", indexDir, referenceCount);
- return searcher.getSearcher();
- }
- }
- }
-
- // We found no cached Index Readers. Create a new one. To do this, we need to check
- // if we have an Index Writer, and if so create a Reader based on the Index Writer.
- // This will provide us a 'near real time' index reader.
- final IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
- if ( writerCount == null ) {
- final Directory directory = FSDirectory.open(absoluteFile);
- logger.debug("No Index Writer currently exists for {}; creating a cachable reader", indexDir);
-
- try {
- final DirectoryReader directoryReader = DirectoryReader.open(directory);
- final IndexSearcher searcher = new IndexSearcher(directoryReader);
-
- // we want to cache the searcher that we create, since it's just a reader.
- final ActiveIndexSearcher cached = new ActiveIndexSearcher(searcher, absoluteFile, directoryReader, directory, true);
- currentlyCached.add(cached);
-
- return cached.getSearcher();
- } catch (final IOException e) {
- logger.error("Failed to create Index Searcher for {} due to {}", absoluteFile, e.toString());
- logger.error("", e);
-
- try {
- directory.close();
- } catch (final IOException ioe) {
- e.addSuppressed(ioe);
- }
-
- throw e;
- }
- } else {
- logger.debug("Index Writer currently exists for {}; creating a non-cachable reader and incrementing "
- + "counter to {}", indexDir, writerCount.getCount() + 1);
-
- // increment the writer count to ensure that it's kept open.
- writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
- writerCount.getAnalyzer(), writerCount.getDirectory(), writerCount.getCount() + 1));
-
- // create a new Index Searcher from the writer so that we don't have an issue with trying
- // to read from a directory that's locked. If we get the "no segments* file found" with
- // Lucene, this indicates that an IndexWriter already has the directory open.
- final IndexWriter writer = writerCount.getWriter();
- final DirectoryReader directoryReader = DirectoryReader.open(writer, false);
- final IndexSearcher searcher = new IndexSearcher(directoryReader);
-
- // we don't want to cache this searcher because it's based on a writer, so we want to get
- // new values the next time that we search.
- final ActiveIndexSearcher activeSearcher = new ActiveIndexSearcher(searcher, absoluteFile, directoryReader, null, false);
-
- currentlyCached.add(activeSearcher);
- return activeSearcher.getSearcher();
- }
- } finally {
- lock.unlock();
- }
- }
-
-
- public void returnIndexSearcher(final File indexDirectory, final IndexSearcher searcher) {
- final File absoluteFile = indexDirectory.getAbsoluteFile();
- logger.trace("Returning index searcher for {} to IndexManager", indexDirectory);
-
- lock.lock();
- try {
- // check if we already have a reader cached.
- final List<ActiveIndexSearcher> currentlyCached = activeSearchers.get(absoluteFile);
- if ( currentlyCached == null ) {
- logger.warn("Received Index Searcher for {} but no searcher was provided for that directory; this could "
- + "result in a resource leak", indexDirectory);
- return;
- }
-
- // Check if the given searcher is in our list. We use an Iterator to do this so that if we
- // find it we can call remove() on the iterator if need be.
- final Iterator<ActiveIndexSearcher> itr = new ArrayList<>(currentlyCached).iterator();
- boolean activeSearcherFound = false;
- while (itr.hasNext()) {
- final ActiveIndexSearcher activeSearcher = itr.next();
- if ( activeSearcher.getSearcher().equals(searcher) ) {
- activeSearcherFound = true;
- if ( activeSearcher.isCache() ) {
- // if the searcher is poisoned, close it and remove from "pool". Otherwise,
- // just decrement the count. Note here that when we call close() it won't actually close
- // the underlying directory reader unless there are no more references to it
- if ( activeSearcher.isPoisoned() ) {
- itr.remove();
-
- try {
- activeSearcher.close();
- } catch (final IOException ioe) {
- logger.warn("Failed to close Index Searcher for {} due to {}", absoluteFile, ioe);
- if ( logger.isDebugEnabled() ) {
- logger.warn("", ioe);
- }
- }
-
- return;
- } else {
- // the searcher is cached. Just leave it open.
- final int refCount = activeSearcher.decrementReferenceCount();
- logger.debug("Index searcher for {} is cached; leaving open with reference count of {}", indexDirectory, refCount);
- return;
- }
- } else {
- // searcher is not cached. It was created from a writer, and we want
- // the newest updates the next time that we get a searcher, so we will
- // go ahead and close this one out.
- itr.remove();
-
- // decrement the writer count because we incremented it when creating the searcher
- final IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
- if ( writerCount != null ) {
- if ( writerCount.getCount() <= 1 ) {
- try {
- logger.debug("Index searcher for {} is not cached. Writer count is "
- + "decremented to {}; closing writer", indexDirectory, writerCount.getCount() - 1);
-
- writerCount.close();
- } catch (final IOException ioe) {
- logger.warn("Failed to close Index Writer for {} due to {}", absoluteFile, ioe);
- if ( logger.isDebugEnabled() ) {
- logger.warn("", ioe);
- }
- }
- } else {
- logger.debug("Index searcher for {} is not cached. Writer count is decremented "
- + "to {}; leaving writer open", indexDirectory, writerCount.getCount() - 1);
-
- writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
- writerCount.getAnalyzer(), writerCount.getDirectory(),
- writerCount.getCount() - 1));
- }
- }
-
- try {
- logger.debug("Closing Index Searcher for {}", indexDirectory);
- final boolean allReferencesClosed = activeSearcher.close();
- if (!allReferencesClosed) {
- currentlyCached.add(activeSearcher);
- }
- } catch (final IOException ioe) {
- logger.warn("Failed to close Index Searcher for {} due to {}", absoluteFile, ioe);
- if ( logger.isDebugEnabled() ) {
- logger.warn("", ioe);
- }
- }
- }
- }
- }
-
- if (!activeSearcherFound) {
- logger.debug("Index Searcher {} was returned for {} but found no Active Searcher for it. "
- + "This will occur if the Index Searcher was already returned while being poisoned.", searcher, indexDirectory);
- }
- } finally {
- lock.unlock();
- }
- }
-
- @Override
- public void close() throws IOException {
- logger.debug("Closing Index Manager");
-
- lock.lock();
- try {
- IOException ioe = null;
-
- for ( final IndexWriterCount count : writerCounts.values() ) {
- try {
- count.close();
- } catch (final IOException e) {
- if ( ioe == null ) {
- ioe = e;
- } else {
- ioe.addSuppressed(e);
- }
- }
- }
-
- for (final List<ActiveIndexSearcher> searcherList : activeSearchers.values()) {
- for (final ActiveIndexSearcher searcher : searcherList) {
- try {
- searcher.close();
- } catch (final IOException e) {
- if ( ioe == null ) {
- ioe = e;
- } else {
- ioe.addSuppressed(e);
- }
- }
- }
- }
-
- if ( ioe != null ) {
- throw ioe;
- }
- } finally {
- lock.unlock();
- }
- }
-
-
- private static void close(final Closeable... closeables) throws IOException {
- IOException ioe = null;
- for ( final Closeable closeable : closeables ) {
- if ( closeable == null ) {
- continue;
- }
-
- try {
- closeable.close();
- } catch (final IOException e) {
- if ( ioe == null ) {
- ioe = e;
- } else {
- ioe.addSuppressed(e);
- }
- }
- }
-
- if ( ioe != null ) {
- throw ioe;
- }
- }
-
-
- private static class ActiveIndexSearcher {
- private final IndexSearcher searcher;
- private final DirectoryReader directoryReader;
- private final File indexDirectory;
- private final Directory directory;
- private final boolean cache;
- private final AtomicInteger referenceCount = new AtomicInteger(1);
- private volatile boolean poisoned = false;
-
- public ActiveIndexSearcher(final IndexSearcher searcher, final File indexDirectory, final DirectoryReader directoryReader,
- final Directory directory, final boolean cache) {
- this.searcher = searcher;
- this.directoryReader = directoryReader;
- this.indexDirectory = indexDirectory;
- this.directory = directory;
- this.cache = cache;
- }
-
- public boolean isCache() {
- return cache;
- }
-
- public IndexSearcher getSearcher() {
- return searcher;
- }
-
- public boolean isPoisoned() {
- return poisoned;
- }
-
- public void poison() {
- this.poisoned = true;
- }
-
- public int incrementReferenceCount() {
- return referenceCount.incrementAndGet();
- }
-
- public int decrementReferenceCount() {
- return referenceCount.decrementAndGet();
- }
-
- public boolean close() throws IOException {
- final int updatedRefCount = referenceCount.decrementAndGet();
- if (updatedRefCount <= 0) {
- logger.debug("Decremented Reference Count for {} to {}; closing underlying directory reader", this, updatedRefCount);
- IndexManager.close(directoryReader, directory);
- return true;
- } else {
- logger.debug("Decremented Reference Count for {} to {}; leaving underlying directory reader open", this, updatedRefCount);
- return false;
- }
- }
-
- @Override
- public String toString() {
- return "ActiveIndexSearcher[directory=" + indexDirectory + ", cached=" + cache + ", poisoned=" + poisoned + "]";
- }
- }
-
-
- private static class IndexWriterCount implements Closeable {
- private final IndexWriter writer;
- private final Analyzer analyzer;
- private final Directory directory;
- private final int count;
-
- public IndexWriterCount(final IndexWriter writer, final Analyzer analyzer, final Directory directory, final int count) {
- this.writer = writer;
- this.analyzer = analyzer;
- this.directory = directory;
- this.count = count;
- }
-
- public Analyzer getAnalyzer() {
- return analyzer;
- }
- public Directory getDirectory() {
- return directory;
- }
+public interface IndexManager extends Closeable {
+ IndexSearcher borrowIndexSearcher(File indexDir) throws IOException;
- public IndexWriter getWriter() {
- return writer;
- }
+ IndexWriter borrowIndexWriter(File indexingDirectory) throws IOException;
- public int getCount() {
- return count;
- }
+ void removeIndex(final File indexDirectory);
- @Override
- public void close() throws IOException {
- IndexManager.close(writer, analyzer, directory);
- }
- }
+ void returnIndexSearcher(File indexDirectory, IndexSearcher searcher);
-}
+ void returnIndexWriter(File indexingDirectory, IndexWriter writer);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/08812545/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/SimpleIndexManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/SimpleIndexManager.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/SimpleIndexManager.java
new file mode 100644
index 0000000..daf6413
--- /dev/null
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/SimpleIndexManager.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance.lucene;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FSDirectory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SimpleIndexManager implements IndexManager {
+ private static final Logger logger = LoggerFactory.getLogger(SimpleIndexManager.class);
+
+ private final ConcurrentMap<Object, List<Closeable>> closeables = new ConcurrentHashMap<>();
+ private final Map<File, IndexWriterCount> writerCounts = new HashMap<>();
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public IndexSearcher borrowIndexSearcher(final File indexDir) throws IOException {
+ logger.debug("Creating index searcher for {}", indexDir);
+ final Directory directory = FSDirectory.open(indexDir);
+ final DirectoryReader directoryReader = DirectoryReader.open(directory);
+ final IndexSearcher searcher = new IndexSearcher(directoryReader);
+
+ final List<Closeable> closeableList = new ArrayList<>(2);
+ closeableList.add(directoryReader);
+ closeableList.add(directory);
+ closeables.put(searcher, closeableList);
+ logger.debug("Created index searcher {} for {}", searcher, indexDir);
+
+ return searcher;
+ }
+
+ @Override
+ public void returnIndexSearcher(final File indexDirectory, final IndexSearcher searcher) {
+ logger.debug("Closing index searcher {} for {}", searcher, indexDirectory);
+
+ final List<Closeable> closeableList = closeables.get(searcher);
+ if (closeableList != null) {
+ for (final Closeable closeable : closeableList) {
+ closeQuietly(closeable);
+ }
+ }
+
+ logger.debug("Closed index searcher {}", searcher);
+ }
+
+ @Override
+ public void removeIndex(final File indexDirectory) {
+ }
+
+
+ @Override
+ public synchronized IndexWriter borrowIndexWriter(final File indexingDirectory) throws IOException {
+ final File absoluteFile = indexingDirectory.getAbsoluteFile();
+ logger.trace("Borrowing index writer for {}", indexingDirectory);
+
+ IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
+ if (writerCount == null) {
+ final List<Closeable> closeables = new ArrayList<>();
+ final Directory directory = FSDirectory.open(indexingDirectory);
+ closeables.add(directory);
+
+ try {
+ final Analyzer analyzer = new StandardAnalyzer();
+ closeables.add(analyzer);
+
+ final IndexWriterConfig config = new IndexWriterConfig(LuceneUtil.LUCENE_VERSION, analyzer);
+ config.setWriteLockTimeout(300000L);
+
+ final IndexWriter indexWriter = new IndexWriter(directory, config);
+ writerCount = new IndexWriterCount(indexWriter, analyzer, directory, 1);
+ logger.debug("Providing new index writer for {}", indexingDirectory);
+ } catch (final IOException ioe) {
+ for (final Closeable closeable : closeables) {
+ try {
+ closeable.close();
+ } catch (final IOException ioe2) {
+ ioe.addSuppressed(ioe2);
+ }
+ }
+
+ throw ioe;
+ }
+
+ writerCounts.put(absoluteFile, writerCount);
+ } else {
+ logger.debug("Providing existing index writer for {} and incrementing count to {}", indexingDirectory, writerCount.getCount() + 1);
+ writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
+ writerCount.getAnalyzer(), writerCount.getDirectory(), writerCount.getCount() + 1));
+ }
+
+ return writerCount.getWriter();
+ }
+
+
+ @Override
+ public synchronized void returnIndexWriter(final File indexingDirectory, final IndexWriter writer) {
+ final File absoluteFile = indexingDirectory.getAbsoluteFile();
+ logger.trace("Returning Index Writer for {} to IndexManager", indexingDirectory);
+
+ final IndexWriterCount count = writerCounts.remove(absoluteFile);
+
+ try {
+ if (count == null) {
+ logger.warn("Index Writer {} was returned to IndexManager for {}, but this writer is not known. "
+ + "This could potentially lead to a resource leak", writer, indexingDirectory);
+ writer.close();
+ } else if (count.getCount() <= 1) {
+ // we are finished with this writer.
+ logger.debug("Decrementing count for Index Writer for {} to {}; Closing writer", indexingDirectory, count.getCount() - 1);
+ writer.commit();
+ count.close();
+ } else {
+ // decrement the count.
+ logger.debug("Decrementing count for Index Writer for {} to {}", indexingDirectory, count.getCount() - 1);
+ writerCounts.put(absoluteFile, new IndexWriterCount(count.getWriter(), count.getAnalyzer(), count.getDirectory(), count.getCount() - 1));
+ }
+ } catch (final IOException ioe) {
+ logger.warn("Failed to close Index Writer {} due to {}", writer, ioe);
+ if (logger.isDebugEnabled()) {
+ logger.warn("", ioe);
+ }
+ }
+ }
+
+ private static void closeQuietly(final Closeable... closeables) {
+ for (final Closeable closeable : closeables) {
+ if (closeable == null) {
+ continue;
+ }
+
+ try {
+ closeable.close();
+ } catch (final Exception e) {
+ logger.warn("Failed to close {} due to {}", closeable, e);
+ }
+ }
+ }
+
+
+ private static class IndexWriterCount implements Closeable {
+ private final IndexWriter writer;
+ private final Analyzer analyzer;
+ private final Directory directory;
+ private final int count;
+
+ public IndexWriterCount(final IndexWriter writer, final Analyzer analyzer, final Directory directory, final int count) {
+ this.writer = writer;
+ this.analyzer = analyzer;
+ this.directory = directory;
+ this.count = count;
+ }
+
+ public Analyzer getAnalyzer() {
+ return analyzer;
+ }
+
+ public Directory getDirectory() {
+ return directory;
+ }
+
+ public IndexWriter getWriter() {
+ return writer;
+ }
+
+ public int getCount() {
+ return count;
+ }
+
+ @Override
+ public void close() throws IOException {
+ closeQuietly(writer, analyzer, directory);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/08812545/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
index 845d6ea..a210aa9 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
@@ -64,6 +64,7 @@ import org.apache.nifi.provenance.lineage.Lineage;
import org.apache.nifi.provenance.lineage.LineageEdge;
import org.apache.nifi.provenance.lineage.LineageNode;
import org.apache.nifi.provenance.lineage.LineageNodeType;
+import org.apache.nifi.provenance.lucene.CachingIndexManager;
import org.apache.nifi.provenance.lucene.IndexManager;
import org.apache.nifi.provenance.lucene.IndexingAction;
import org.apache.nifi.provenance.search.Query;
@@ -496,16 +497,16 @@ public class TestPersistentProvenanceRepository {
final CountDownLatch obtainIndexSearcherLatch = new CountDownLatch(2);
repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS) {
- private IndexManager wrappedManager = null;
+ private CachingIndexManager wrappedManager = null;
// Create an IndexManager that adds a delay before returning the Index Searcher.
@Override
- protected synchronized IndexManager getIndexManager() {
+ protected synchronized CachingIndexManager getIndexManager() {
if (wrappedManager == null) {
final IndexManager mgr = super.getIndexManager();
final Logger logger = LoggerFactory.getLogger("IndexManager");
- wrappedManager = new IndexManager() {
+ wrappedManager = new CachingIndexManager() {
final AtomicInteger indexSearcherCount = new AtomicInteger(0);
@Override
http://git-wip-us.apache.org/repos/asf/nifi/blob/08812545/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/lucene/TestCachingIndexManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/lucene/TestCachingIndexManager.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/lucene/TestCachingIndexManager.java
new file mode 100644
index 0000000..36f0b00
--- /dev/null
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/lucene/TestCachingIndexManager.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance.lucene;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.UUID;
+
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field.Store;
+import org.apache.lucene.document.StringField;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.search.BooleanClause;
+import org.apache.lucene.search.BooleanQuery;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.TermQuery;
+import org.apache.lucene.search.TopDocs;
+import org.apache.lucene.search.BooleanClause.Occur;
+import org.apache.nifi.util.file.FileUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestCachingIndexManager {
+
+ private File indexDir;
+ private CachingIndexManager manager;
+
+ @Before
+ public void setup() {
+ System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance", "DEBUG");
+ manager = new CachingIndexManager();
+
+ indexDir = new File("target/testIndexManager/" + UUID.randomUUID().toString());
+ indexDir.mkdirs();
+ }
+
+ @After
+ public void cleanup() throws IOException {
+ manager.close();
+
+ FileUtils.deleteFiles(Collections.singleton(indexDir), true);
+ }
+
+ @Test
+ public void test() throws IOException {
+ // Create and IndexWriter and add a document to the index, then close the writer.
+ // This gives us something that we can query.
+ final IndexWriter writer = manager.borrowIndexWriter(indexDir);
+ final Document doc = new Document();
+ doc.add(new StringField("unit test", "true", Store.YES));
+ writer.addDocument(doc);
+ manager.returnIndexWriter(indexDir, writer);
+
+ // Get an Index Searcher that we can use to query the index.
+ final IndexSearcher cachedSearcher = manager.borrowIndexSearcher(indexDir);
+
+ // Ensure that we get the expected results.
+ assertCount(cachedSearcher, 1);
+
+ // While we already have an Index Searcher, get a writer for the same index.
+ // This will cause the Index Searcher to be marked as poisoned.
+ final IndexWriter writer2 = manager.borrowIndexWriter(indexDir);
+
+ // Obtain a new Index Searcher with the writer open. This Index Searcher should *NOT*
+ // be the same as the previous searcher because the new one will be a Near-Real-Time Index Searcher
+ // while the other is not.
+ final IndexSearcher nrtSearcher = manager.borrowIndexSearcher(indexDir);
+ assertNotSame(cachedSearcher, nrtSearcher);
+
+ // Ensure that we get the expected query results.
+ assertCount(nrtSearcher, 1);
+
+ // Return the writer, so that there is no longer an active writer for the index.
+ manager.returnIndexWriter(indexDir, writer2);
+
+ // Ensure that we still get the same result.
+ assertCount(cachedSearcher, 1);
+ manager.returnIndexSearcher(indexDir, cachedSearcher);
+
+ // Ensure that our near-real-time index searcher still gets the same result.
+ assertCount(nrtSearcher, 1);
+ manager.returnIndexSearcher(indexDir, nrtSearcher);
+ }
+
+ private void assertCount(final IndexSearcher searcher, final int count) throws IOException {
+ final BooleanQuery query = new BooleanQuery();
+ query.add(new BooleanClause(new TermQuery(new Term("unit test", "true")), Occur.MUST));
+ final TopDocs topDocs = searcher.search(query, count * 10);
+ assertNotNull(topDocs);
+ assertEquals(1, topDocs.totalHits);
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/08812545/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/lucene/TestIndexManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/lucene/TestIndexManager.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/lucene/TestIndexManager.java
deleted file mode 100644
index afe4512..0000000
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/lucene/TestIndexManager.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.provenance.lucene;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNotSame;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.UUID;
-
-import org.apache.lucene.document.Document;
-import org.apache.lucene.document.Field.Store;
-import org.apache.lucene.document.StringField;
-import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.index.Term;
-import org.apache.lucene.search.BooleanClause;
-import org.apache.lucene.search.BooleanQuery;
-import org.apache.lucene.search.IndexSearcher;
-import org.apache.lucene.search.TermQuery;
-import org.apache.lucene.search.TopDocs;
-import org.apache.lucene.search.BooleanClause.Occur;
-import org.apache.nifi.util.file.FileUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class TestIndexManager {
-
- private File indexDir;
- private IndexManager manager;
-
- @Before
- public void setup() {
- System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance", "DEBUG");
- manager = new IndexManager();
-
- indexDir = new File("target/testIndexManager/" + UUID.randomUUID().toString());
- indexDir.mkdirs();
- }
-
- @After
- public void cleanup() throws IOException {
- manager.close();
-
- FileUtils.deleteFiles(Collections.singleton(indexDir), true);
- }
-
- @Test
- public void test() throws IOException {
- // Create and IndexWriter and add a document to the index, then close the writer.
- // This gives us something that we can query.
- final IndexWriter writer = manager.borrowIndexWriter(indexDir);
- final Document doc = new Document();
- doc.add(new StringField("unit test", "true", Store.YES));
- writer.addDocument(doc);
- manager.returnIndexWriter(indexDir, writer);
-
- // Get an Index Searcher that we can use to query the index.
- final IndexSearcher cachedSearcher = manager.borrowIndexSearcher(indexDir);
-
- // Ensure that we get the expected results.
- assertCount(cachedSearcher, 1);
-
- // While we already have an Index Searcher, get a writer for the same index.
- // This will cause the Index Searcher to be marked as poisoned.
- final IndexWriter writer2 = manager.borrowIndexWriter(indexDir);
-
- // Obtain a new Index Searcher with the writer open. This Index Searcher should *NOT*
- // be the same as the previous searcher because the new one will be a Near-Real-Time Index Searcher
- // while the other is not.
- final IndexSearcher nrtSearcher = manager.borrowIndexSearcher(indexDir);
- assertNotSame(cachedSearcher, nrtSearcher);
-
- // Ensure that we get the expected query results.
- assertCount(nrtSearcher, 1);
-
- // Return the writer, so that there is no longer an active writer for the index.
- manager.returnIndexWriter(indexDir, writer2);
-
- // Ensure that we still get the same result.
- assertCount(cachedSearcher, 1);
- manager.returnIndexSearcher(indexDir, cachedSearcher);
-
- // Ensure that our near-real-time index searcher still gets the same result.
- assertCount(nrtSearcher, 1);
- manager.returnIndexSearcher(indexDir, nrtSearcher);
- }
-
- private void assertCount(final IndexSearcher searcher, final int count) throws IOException {
- final BooleanQuery query = new BooleanQuery();
- query.add(new BooleanClause(new TermQuery(new Term("unit test", "true")), Occur.MUST));
- final TopDocs topDocs = searcher.search(query, count * 10);
- assertNotNull(topDocs);
- assertEquals(1, topDocs.totalHits);
- }
-}