You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2016/09/06 16:25:52 UTC

[1/2] nifi git commit: NIFI-2681: Refactored IndexManager into an interface and renamed the existing implementation to CachingIndexManager. Implemented a new SimpleIndexManager that performs no caching of IndexSearchers.

Repository: nifi
Updated Branches:
  refs/heads/master a9d029d74 -> 088125451


http://git-wip-us.apache.org/repos/asf/nifi/blob/08812545/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/lucene/TestSimpleIndexManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/lucene/TestSimpleIndexManager.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/lucene/TestSimpleIndexManager.java
new file mode 100644
index 0000000..834177f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/lucene/TestSimpleIndexManager.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance.lucene;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field.Store;
+import org.apache.lucene.document.StringField;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.MatchAllDocsQuery;
+import org.apache.lucene.search.TopDocs;
+import org.apache.nifi.util.file.FileUtils;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestSimpleIndexManager {
+    @BeforeClass
+    public static void setLogLevel() {
+        System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance", "DEBUG");
+    }
+
+
+    @Test
+    public void testMultipleWritersSimultaneouslySameIndex() throws IOException {
+        final SimpleIndexManager mgr = new SimpleIndexManager();
+        final File dir = new File("target/" + UUID.randomUUID().toString());
+        try {
+            final IndexWriter writer1 = mgr.borrowIndexWriter(dir);
+            final IndexWriter writer2 = mgr.borrowIndexWriter(dir);
+
+            final Document doc1 = new Document();
+            doc1.add(new StringField("id", "1", Store.YES));
+
+            final Document doc2 = new Document();
+            doc2.add(new StringField("id", "2", Store.YES));
+
+            writer1.addDocument(doc1);
+            writer2.addDocument(doc2);
+            mgr.returnIndexWriter(dir, writer2);
+            mgr.returnIndexWriter(dir, writer1);
+
+            final IndexSearcher searcher = mgr.borrowIndexSearcher(dir);
+            final TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), 2);
+            assertEquals(2, topDocs.totalHits);
+            mgr.returnIndexSearcher(dir, searcher);
+        } finally {
+            FileUtils.deleteFile(dir, true);
+        }
+    }
+
+}


[2/2] nifi git commit: NIFI-2681: Refactored IndexManager into an interface and renamed the existing implementation to CachingIndexManager. Implemented a new SimpleIndexManager that performs no caching of IndexSearchers.

Posted by bb...@apache.org.
NIFI-2681: Refactored IndexManager into an interface and renamed the existing implementation to CachingIndexManager. Implemented a new SimpleIndexManager that performs no caching of IndexSearchers.

This closes #958.

Signed-off-by: Bryan Bende <bb...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/08812545
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/08812545
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/08812545

Branch: refs/heads/master
Commit: 088125451b7f15684752a914a3a83a834fac1a50
Parents: a9d029d
Author: Mark Payne <ma...@hotmail.com>
Authored: Fri Aug 26 20:03:16 2016 -0400
Committer: Bryan Bende <bb...@apache.org>
Committed: Tue Sep 6 12:25:28 2016 -0400

----------------------------------------------------------------------
 .../PersistentProvenanceRepository.java         | 177 +++---
 .../provenance/lucene/CachingIndexManager.java  | 536 +++++++++++++++++++
 .../nifi/provenance/lucene/IndexManager.java    | 514 +-----------------
 .../provenance/lucene/SimpleIndexManager.java   | 209 ++++++++
 .../TestPersistentProvenanceRepository.java     |   7 +-
 .../lucene/TestCachingIndexManager.java         | 114 ++++
 .../provenance/lucene/TestIndexManager.java     | 114 ----
 .../lucene/TestSimpleIndexManager.java          |  72 +++
 8 files changed, 1034 insertions(+), 709 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/08812545/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
index da3a6f5..0788716 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
@@ -16,6 +16,51 @@
  */
 package org.apache.nifi.provenance;
 
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileNotFoundException;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
 import org.apache.lucene.document.Document;
 import org.apache.lucene.index.DirectoryReader;
 import org.apache.lucene.index.IndexNotFoundException;
@@ -46,6 +91,7 @@ import org.apache.nifi.provenance.lucene.IndexSearch;
 import org.apache.nifi.provenance.lucene.IndexingAction;
 import org.apache.nifi.provenance.lucene.LineageQuery;
 import org.apache.nifi.provenance.lucene.LuceneUtil;
+import org.apache.nifi.provenance.lucene.SimpleIndexManager;
 import org.apache.nifi.provenance.lucene.UpdateMinimumEventId;
 import org.apache.nifi.provenance.search.Query;
 import org.apache.nifi.provenance.search.QueryResult;
@@ -68,51 +114,6 @@ import org.apache.nifi.web.ResourceNotFoundException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.EOFException;
-import java.io.File;
-import java.io.FileFilter;
-import java.io.FileNotFoundException;
-import java.io.FilenameFilter;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-
 public class PersistentProvenanceRepository implements ProvenanceRepository {
 
     public static final String EVENT_CATEGORY = "Provenance Repository";
@@ -226,7 +227,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
         this.maxPartitionMillis = configuration.getMaxEventFileLife(TimeUnit.MILLISECONDS);
         this.maxPartitionBytes = configuration.getMaxEventFileCapacity();
         this.indexConfig = new IndexConfiguration(configuration);
-        this.indexManager = new IndexManager();
+        this.indexManager = new SimpleIndexManager();
         this.alwaysSync = configuration.isAlwaysSync();
         this.rolloverCheckMillis = rolloverCheckMillis;
 
@@ -1303,57 +1304,61 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
                 final Runnable rolloverRunnable = new Runnable() {
                     @Override
                     public void run() {
-
                         File fileRolledOver = null;
 
                         try {
-                            fileRolledOver = mergeJournals(journalsToMerge, getMergeFile(journalsToMerge, storageDir), eventReporter);
-                        } catch (final IOException ioe) {
-                            logger.error("Failed to merge Journal Files {} into a Provenance Log File due to {}", journalsToMerge, ioe.toString());
-                            logger.error("", ioe);
-                        }
-
-                        if (fileRolledOver != null) {
-
-                            final File file = fileRolledOver;
-
-                            // update our map of id to Path
-                            // We need to make sure that another thread doesn't also update the map at the same time. We cannot
-                            // use the write lock when purging old events, and we want to use the same approach here.
-                            boolean updated = false;
-                            final Long fileFirstEventId = Long.valueOf(LuceneUtil.substringBefore(fileRolledOver.getName(), "."));
-                            while (!updated) {
-                                final SortedMap<Long, Path> existingPathMap = idToPathMap.get();
-                                final SortedMap<Long, Path> newIdToPathMap = new TreeMap<>(new PathMapComparator());
-                                newIdToPathMap.putAll(existingPathMap);
-                                newIdToPathMap.put(fileFirstEventId, file.toPath());
-                                updated = idToPathMap.compareAndSet(existingPathMap, newIdToPathMap);
+                            try {
+                                fileRolledOver = mergeJournals(journalsToMerge, getMergeFile(journalsToMerge, storageDir), eventReporter);
+                            } catch (final IOException ioe) {
+                                logger.error("Failed to merge Journal Files {} into a Provenance Log File due to {}", journalsToMerge, ioe.toString());
+                                logger.error("", ioe);
                             }
 
-                            logger.info("Successfully Rolled over Provenance Event file containing {} records", recordsWritten);
-                        }
-
-                        //if files were rolled over or if out of retries stop the future
-                        if (fileRolledOver != null || retryAttempts.decrementAndGet() == 0) {
+                            if (fileRolledOver != null) {
+
+                                final File file = fileRolledOver;
+
+                                // update our map of id to Path
+                                // We need to make sure that another thread doesn't also update the map at the same time. We cannot
+                                // use the write lock when purging old events, and we want to use the same approach here.
+                                boolean updated = false;
+                                final Long fileFirstEventId = Long.valueOf(LuceneUtil.substringBefore(fileRolledOver.getName(), "."));
+                                while (!updated) {
+                                    final SortedMap<Long, Path> existingPathMap = idToPathMap.get();
+                                    final SortedMap<Long, Path> newIdToPathMap = new TreeMap<>(new PathMapComparator());
+                                    newIdToPathMap.putAll(existingPathMap);
+                                    newIdToPathMap.put(fileFirstEventId, file.toPath());
+                                    updated = idToPathMap.compareAndSet(existingPathMap, newIdToPathMap);
+                                }
 
-                            if (fileRolledOver == null && retryAttempts.get() == 0) {
-                                logger.error("Failed to merge Journal Files {} after {} attempts. ", journalsToMerge, MAX_JOURNAL_ROLLOVER_RETRIES);
+                                logger.info("Successfully Rolled over Provenance Event file containing {} records", recordsWritten);
                             }
 
-                            rolloverCompletions.getAndIncrement();
+                            //if files were rolled over or if out of retries stop the future
+                            if (fileRolledOver != null || retryAttempts.decrementAndGet() == 0) {
 
-                            // Cancel the future so that we don't run anymore
-                            Future<?> future;
-                            while ((future = futureReference.get()) == null) {
-                                try {
-                                    Thread.sleep(10L);
-                                } catch (final InterruptedException ie) {
+                                if (fileRolledOver == null && retryAttempts.get() == 0) {
+                                    logger.error("Failed to merge Journal Files {} after {} attempts. ", journalsToMerge, MAX_JOURNAL_ROLLOVER_RETRIES);
                                 }
-                            }
-                            future.cancel(false);
 
-                        } else {
-                            logger.warn("Couldn't merge journals. Will try again. journalsToMerge: {}, storageDir: {}", journalsToMerge, storageDir);
+                                rolloverCompletions.getAndIncrement();
+
+                                // Cancel the future so that we don't run anymore
+                                Future<?> future;
+                                while ((future = futureReference.get()) == null) {
+                                    try {
+                                        Thread.sleep(10L);
+                                    } catch (final InterruptedException ie) {
+                                    }
+                                }
+                                future.cancel(false);
+
+                            } else {
+                                logger.warn("Couldn't merge journals. Will try again. journalsToMerge: {}, storageDir: {}", journalsToMerge, storageDir);
+                            }
+                        } catch (final Exception e) {
+                            logger.error("Failed to merge journals. Will try again. journalsToMerge: {}, storageDir: {}, cause: {}", journalsToMerge, storageDir, e.toString());
+                            logger.error("", e);
                         }
                     }
                 };

http://git-wip-us.apache.org/repos/asf/nifi/blob/08812545/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/CachingIndexManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/CachingIndexManager.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/CachingIndexManager.java
new file mode 100644
index 0000000..ddfa0db
--- /dev/null
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/CachingIndexManager.java
@@ -0,0 +1,536 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.lucene;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FSDirectory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CachingIndexManager implements Closeable, IndexManager {
+    private static final Logger logger = LoggerFactory.getLogger(CachingIndexManager.class);
+
+    private final Lock lock = new ReentrantLock();
+    private final Map<File, IndexWriterCount> writerCounts = new HashMap<>();
+    private final Map<File, List<ActiveIndexSearcher>> activeSearchers = new HashMap<>();
+
+
+    public void removeIndex(final File indexDirectory) {
+        final File absoluteFile = indexDirectory.getAbsoluteFile();
+        logger.info("Removing index {}", indexDirectory);
+
+        lock.lock();
+        try {
+            final IndexWriterCount count = writerCounts.remove(absoluteFile);
+            if ( count != null ) {
+                try {
+                    count.close();
+                } catch (final IOException ioe) {
+                    logger.warn("Failed to close Index Writer {} for {}", count.getWriter(), absoluteFile);
+                    if ( logger.isDebugEnabled() ) {
+                        logger.warn("", ioe);
+                    }
+                }
+            }
+
+            final List<ActiveIndexSearcher> searcherList = activeSearchers.remove(absoluteFile);
+            if (searcherList != null) {
+                for ( final ActiveIndexSearcher searcher : searcherList ) {
+                    try {
+                        searcher.close();
+                    } catch (final IOException ioe) {
+                        logger.warn("Failed to close Index Searcher {} for {} due to {}",
+                                searcher.getSearcher(), absoluteFile, ioe);
+                        if ( logger.isDebugEnabled() ) {
+                            logger.warn("", ioe);
+                        }
+                    }
+                }
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public IndexWriter borrowIndexWriter(final File indexingDirectory) throws IOException {
+        final File absoluteFile = indexingDirectory.getAbsoluteFile();
+        logger.trace("Borrowing index writer for {}", indexingDirectory);
+
+        lock.lock();
+        try {
+            IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
+            if ( writerCount == null ) {
+                final List<Closeable> closeables = new ArrayList<>();
+                final Directory directory = FSDirectory.open(indexingDirectory);
+                closeables.add(directory);
+
+                try {
+                    final Analyzer analyzer = new StandardAnalyzer();
+                    closeables.add(analyzer);
+
+                    final IndexWriterConfig config = new IndexWriterConfig(LuceneUtil.LUCENE_VERSION, analyzer);
+                    config.setWriteLockTimeout(300000L);
+
+                    final IndexWriter indexWriter = new IndexWriter(directory, config);
+                    writerCount = new IndexWriterCount(indexWriter, analyzer, directory, 1);
+                    logger.debug("Providing new index writer for {}", indexingDirectory);
+                } catch (final IOException ioe) {
+                    for ( final Closeable closeable : closeables ) {
+                        try {
+                            closeable.close();
+                        } catch (final IOException ioe2) {
+                            ioe.addSuppressed(ioe2);
+                        }
+                    }
+
+                    throw ioe;
+                }
+
+                writerCounts.put(absoluteFile, writerCount);
+
+                // Mark any active searchers as poisoned because we are updating the index
+                final List<ActiveIndexSearcher> searchers = activeSearchers.get(absoluteFile);
+                if ( searchers != null ) {
+                    for (final ActiveIndexSearcher activeSearcher : searchers) {
+                        logger.debug("Poisoning {} because it is searching {}, which is getting updated", activeSearcher, indexingDirectory);
+                        activeSearcher.poison();
+                    }
+                }
+            } else {
+                logger.debug("Providing existing index writer for {} and incrementing count to {}", indexingDirectory, writerCount.getCount() + 1);
+                writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
+                        writerCount.getAnalyzer(), writerCount.getDirectory(), writerCount.getCount() + 1));
+            }
+
+            return writerCount.getWriter();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public void returnIndexWriter(final File indexingDirectory, final IndexWriter writer) {
+        final File absoluteFile = indexingDirectory.getAbsoluteFile();
+        logger.trace("Returning Index Writer for {} to IndexManager", indexingDirectory);
+
+        lock.lock();
+        try {
+            final IndexWriterCount count = writerCounts.remove(absoluteFile);
+
+            try {
+                if ( count == null ) {
+                    logger.warn("Index Writer {} was returned to IndexManager for {}, but this writer is not known. "
+                            + "This could potentially lead to a resource leak", writer, indexingDirectory);
+                    writer.close();
+                } else if ( count.getCount() <= 1 ) {
+                    // we are finished with this writer.
+                    logger.debug("Decrementing count for Index Writer for {} to {}; Closing writer", indexingDirectory, count.getCount() - 1);
+                    count.close();
+                } else {
+                    // decrement the count.
+                    logger.debug("Decrementing count for Index Writer for {} to {}", indexingDirectory, count.getCount() - 1);
+                    writerCounts.put(absoluteFile, new IndexWriterCount(count.getWriter(), count.getAnalyzer(), count.getDirectory(), count.getCount() - 1));
+                }
+            } catch (final IOException ioe) {
+                logger.warn("Failed to close Index Writer {} due to {}", writer, ioe);
+                if ( logger.isDebugEnabled() ) {
+                    logger.warn("", ioe);
+                }
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+
+    public IndexSearcher borrowIndexSearcher(final File indexDir) throws IOException {
+        final File absoluteFile = indexDir.getAbsoluteFile();
+        logger.trace("Borrowing index searcher for {}", indexDir);
+
+        lock.lock();
+        try {
+            // check if we already have a reader cached.
+            List<ActiveIndexSearcher> currentlyCached = activeSearchers.get(absoluteFile);
+            if ( currentlyCached == null ) {
+                currentlyCached = new ArrayList<>();
+                activeSearchers.put(absoluteFile, currentlyCached);
+            } else {
+                // keep track of any searchers that have been closed so that we can remove them
+                // from our cache later.
+                for (final ActiveIndexSearcher searcher : currentlyCached) {
+                    if (searcher.isCache()) {
+                        // if the searcher is poisoned, we want to close and expire it.
+                        if (searcher.isPoisoned()) {
+                            continue;
+                        }
+
+                        // if there are no references to the reader, it will have been closed. Since there is no
+                        // isClosed() method, this is how we determine whether it's been closed or not.
+                        final int refCount = searcher.getSearcher().getIndexReader().getRefCount();
+                        if (refCount <= 0) {
+                            // if refCount == 0, then the reader has been closed, so we cannot use the searcher
+                            logger.debug("Reference count for cached Index Searcher for {} is currently {}; "
+                                + "removing cached searcher", absoluteFile, refCount);
+                            continue;
+                        }
+
+                        final int referenceCount = searcher.incrementReferenceCount();
+                        logger.debug("Providing previously cached index searcher for {} and incrementing Reference Count to {}", indexDir, referenceCount);
+                        return searcher.getSearcher();
+                    }
+                }
+            }
+
+            // We found no cached Index Readers. Create a new one. To do this, we need to check
+            // if we have an Index Writer, and if so create a Reader based on the Index Writer.
+            // This will provide us a 'near real time' index reader.
+            final IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
+            if ( writerCount == null ) {
+                final Directory directory = FSDirectory.open(absoluteFile);
+                logger.debug("No Index Writer currently exists for {}; creating a cachable reader", indexDir);
+
+                try {
+                    final DirectoryReader directoryReader = DirectoryReader.open(directory);
+                    final IndexSearcher searcher = new IndexSearcher(directoryReader);
+
+                    // we want to cache the searcher that we create, since it's just a reader.
+                    final ActiveIndexSearcher cached = new ActiveIndexSearcher(searcher, absoluteFile, directoryReader, directory, true);
+                    currentlyCached.add(cached);
+
+                    return cached.getSearcher();
+                } catch (final IOException e) {
+                    logger.error("Failed to create Index Searcher for {} due to {}", absoluteFile, e.toString());
+                    logger.error("", e);
+
+                    try {
+                        directory.close();
+                    } catch (final IOException ioe) {
+                        e.addSuppressed(ioe);
+                    }
+
+                    throw e;
+                }
+            } else {
+                logger.debug("Index Writer currently exists for {}; creating a non-cachable reader and incrementing "
+                        + "counter to {}", indexDir, writerCount.getCount() + 1);
+
+                // increment the writer count to ensure that it's kept open.
+                writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
+                        writerCount.getAnalyzer(), writerCount.getDirectory(), writerCount.getCount() + 1));
+
+                // create a new Index Searcher from the writer so that we don't have an issue with trying
+                // to read from a directory that's locked. If we get the "no segments* file found" with
+                // Lucene, this indicates that an IndexWriter already has the directory open.
+                final IndexWriter writer = writerCount.getWriter();
+                final DirectoryReader directoryReader = DirectoryReader.open(writer, false);
+                final IndexSearcher searcher = new IndexSearcher(directoryReader);
+
+                // we don't want to cache this searcher because it's based on a writer, so we want to get
+                // new values the next time that we search.
+                final ActiveIndexSearcher activeSearcher = new ActiveIndexSearcher(searcher, absoluteFile, directoryReader, null, false);
+
+                currentlyCached.add(activeSearcher);
+                return activeSearcher.getSearcher();
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+
+    public void returnIndexSearcher(final File indexDirectory, final IndexSearcher searcher) {
+        final File absoluteFile = indexDirectory.getAbsoluteFile();
+        logger.trace("Returning index searcher for {} to IndexManager", indexDirectory);
+
+        lock.lock();
+        try {
+            // check if we already have a reader cached.
+            final List<ActiveIndexSearcher> currentlyCached = activeSearchers.get(absoluteFile);
+            if ( currentlyCached == null ) {
+                logger.warn("Received Index Searcher for {} but no searcher was provided for that directory; this could "
+                        + "result in a resource leak", indexDirectory);
+                return;
+            }
+
+            // Check if the given searcher is in our list. We use an Iterator to do this so that if we
+            // find it we can call remove() on the iterator if need be.
+            final Iterator<ActiveIndexSearcher> itr = new ArrayList<>(currentlyCached).iterator();
+            boolean activeSearcherFound = false;
+            while (itr.hasNext()) {
+                final ActiveIndexSearcher activeSearcher = itr.next();
+                if ( activeSearcher.getSearcher().equals(searcher) ) {
+                    activeSearcherFound = true;
+                    if ( activeSearcher.isCache() ) {
+                        // if the searcher is poisoned, close it and remove from "pool". Otherwise,
+                        // just decrement the count. Note here that when we call close() it won't actually close
+                        // the underlying directory reader unless there are no more references to it
+                        if ( activeSearcher.isPoisoned() ) {
+                            itr.remove();
+
+                            try {
+                                activeSearcher.close();
+                            } catch (final IOException ioe) {
+                                logger.warn("Failed to close Index Searcher for {} due to {}", absoluteFile, ioe);
+                                if ( logger.isDebugEnabled() ) {
+                                    logger.warn("", ioe);
+                                }
+                            }
+
+                            return;
+                        } else {
+                            // the searcher is cached. Just leave it open.
+                            final int refCount = activeSearcher.decrementReferenceCount();
+                            logger.debug("Index searcher for {} is cached; leaving open with reference count of {}", indexDirectory, refCount);
+                            return;
+                        }
+                    } else {
+                        // searcher is not cached. It was created from a writer, and we want
+                        // the newest updates the next time that we get a searcher, so we will
+                        // go ahead and close this one out.
+                        itr.remove();
+
+                        // decrement the writer count because we incremented it when creating the searcher
+                        final IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
+                        if ( writerCount != null ) {
+                            if ( writerCount.getCount() <= 1 ) {
+                                try {
+                                    logger.debug("Index searcher for {} is not cached. Writer count is "
+                                            + "decremented to {}; closing writer", indexDirectory, writerCount.getCount() - 1);
+
+                                    writerCount.close();
+                                } catch (final IOException ioe) {
+                                    logger.warn("Failed to close Index Writer for {} due to {}", absoluteFile, ioe);
+                                    if ( logger.isDebugEnabled() ) {
+                                        logger.warn("", ioe);
+                                    }
+                                }
+                            } else {
+                                logger.debug("Index searcher for {} is not cached. Writer count is decremented "
+                                        + "to {}; leaving writer open", indexDirectory, writerCount.getCount() - 1);
+
+                                writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
+                                        writerCount.getAnalyzer(), writerCount.getDirectory(),
+                                        writerCount.getCount() - 1));
+                            }
+                        }
+
+                        try {
+                            logger.debug("Closing Index Searcher for {}", indexDirectory);
+                            final boolean allReferencesClosed = activeSearcher.close();
+                            if (!allReferencesClosed) {
+                                currentlyCached.add(activeSearcher);
+                            }
+                        } catch (final IOException ioe) {
+                            logger.warn("Failed to close Index Searcher for {} due to {}", absoluteFile, ioe);
+                            if ( logger.isDebugEnabled() ) {
+                                logger.warn("", ioe);
+                            }
+                        }
+                    }
+                }
+            }
+
+            if (!activeSearcherFound) {
+                logger.debug("Index Searcher {} was returned for {} but found no Active Searcher for it. "
+                    + "This will occur if the Index Searcher was already returned while being poisoned.", searcher, indexDirectory);
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        logger.debug("Closing Index Manager");
+
+        lock.lock();
+        try {
+            IOException ioe = null;
+
+            for ( final IndexWriterCount count : writerCounts.values() ) {
+                try {
+                    count.close();
+                } catch (final IOException e) {
+                    if ( ioe == null ) {
+                        ioe = e;
+                    } else {
+                        ioe.addSuppressed(e);
+                    }
+                }
+            }
+
+            for (final List<ActiveIndexSearcher> searcherList : activeSearchers.values()) {
+                for (final ActiveIndexSearcher searcher : searcherList) {
+                    try {
+                        searcher.close();
+                    } catch (final IOException e) {
+                        if ( ioe == null ) {
+                            ioe = e;
+                        } else {
+                            ioe.addSuppressed(e);
+                        }
+                    }
+                }
+            }
+
+            if ( ioe != null ) {
+                throw ioe;
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+
+    private static void close(final Closeable... closeables) throws IOException {
+        IOException ioe = null;
+        for ( final Closeable closeable : closeables ) {
+            if ( closeable == null ) {
+                continue;
+            }
+
+            try {
+                closeable.close();
+            } catch (final IOException e) {
+                if ( ioe == null ) {
+                    ioe = e;
+                } else {
+                    ioe.addSuppressed(e);
+                }
+            }
+        }
+
+        if ( ioe != null ) {
+            throw ioe;
+        }
+    }
+
+
+    private static class ActiveIndexSearcher {
+        private final IndexSearcher searcher;
+        private final DirectoryReader directoryReader;
+        private final File indexDirectory;
+        private final Directory directory;
+        private final boolean cache;
+        private final AtomicInteger referenceCount = new AtomicInteger(1);
+        private volatile boolean poisoned = false;
+
+        public ActiveIndexSearcher(final IndexSearcher searcher, final File indexDirectory, final DirectoryReader directoryReader,
+                final Directory directory, final boolean cache) {
+            this.searcher = searcher;
+            this.directoryReader = directoryReader;
+            this.indexDirectory = indexDirectory;
+            this.directory = directory;
+            this.cache = cache;
+        }
+
+        public boolean isCache() {
+            return cache;
+        }
+
+        public IndexSearcher getSearcher() {
+            return searcher;
+        }
+
+        public boolean isPoisoned() {
+            return poisoned;
+        }
+
+        public void poison() {
+            this.poisoned = true;
+        }
+
+        public int incrementReferenceCount() {
+            return referenceCount.incrementAndGet();
+        }
+
+        public int decrementReferenceCount() {
+            return referenceCount.decrementAndGet();
+        }
+
+        public boolean close() throws IOException {
+            final int updatedRefCount = referenceCount.decrementAndGet();
+            if (updatedRefCount <= 0) {
+                logger.debug("Decremented Reference Count for {} to {}; closing underlying directory reader", this, updatedRefCount);
+                CachingIndexManager.close(directoryReader, directory);
+                return true;
+            } else {
+                logger.debug("Decremented Reference Count for {} to {}; leaving underlying directory reader open", this, updatedRefCount);
+                return false;
+            }
+        }
+
+        @Override
+        public String toString() {
+            return "ActiveIndexSearcher[directory=" + indexDirectory + ", cached=" + cache + ", poisoned=" + poisoned + "]";
+        }
+    }
+
+
+    private static class IndexWriterCount implements Closeable {
+        private final IndexWriter writer;
+        private final Analyzer analyzer;
+        private final Directory directory;
+        private final int count;
+
+        public IndexWriterCount(final IndexWriter writer, final Analyzer analyzer, final Directory directory, final int count) {
+            this.writer = writer;
+            this.analyzer = analyzer;
+            this.directory = directory;
+            this.count = count;
+        }
+
+        public Analyzer getAnalyzer() {
+            return analyzer;
+        }
+
+        public Directory getDirectory() {
+            return directory;
+        }
+
+        public IndexWriter getWriter() {
+            return writer;
+        }
+
+        public int getCount() {
+            return count;
+        }
+
+        @Override
+        public void close() throws IOException {
+            CachingIndexManager.close(writer, analyzer, directory);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/08812545/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
index 3d38cac..f84021f 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
@@ -14,522 +14,24 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.nifi.provenance.lucene;
 
 import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
 
-import org.apache.lucene.analysis.Analyzer;
-import org.apache.lucene.analysis.standard.StandardAnalyzer;
-import org.apache.lucene.index.DirectoryReader;
 import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.index.IndexWriterConfig;
 import org.apache.lucene.search.IndexSearcher;
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.FSDirectory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class IndexManager implements Closeable {
-    private static final Logger logger = LoggerFactory.getLogger(IndexManager.class);
-
-    private final Lock lock = new ReentrantLock();
-    private final Map<File, IndexWriterCount> writerCounts = new HashMap<>();
-    private final Map<File, List<ActiveIndexSearcher>> activeSearchers = new HashMap<>();
-
-
-    public void removeIndex(final File indexDirectory) {
-        final File absoluteFile = indexDirectory.getAbsoluteFile();
-        logger.info("Removing index {}", indexDirectory);
-
-        lock.lock();
-        try {
-            final IndexWriterCount count = writerCounts.remove(absoluteFile);
-            if ( count != null ) {
-                try {
-                    count.close();
-                } catch (final IOException ioe) {
-                    logger.warn("Failed to close Index Writer {} for {}", count.getWriter(), absoluteFile);
-                    if ( logger.isDebugEnabled() ) {
-                        logger.warn("", ioe);
-                    }
-                }
-            }
-
-            for ( final List<ActiveIndexSearcher> searcherList : activeSearchers.values() ) {
-                for ( final ActiveIndexSearcher searcher : searcherList ) {
-                    try {
-                        searcher.close();
-                    } catch (final IOException ioe) {
-                        logger.warn("Failed to close Index Searcher {} for {} due to {}",
-                                searcher.getSearcher(), absoluteFile, ioe);
-                        if ( logger.isDebugEnabled() ) {
-                            logger.warn("", ioe);
-                        }
-                    }
-                }
-            }
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    public IndexWriter borrowIndexWriter(final File indexingDirectory) throws IOException {
-        final File absoluteFile = indexingDirectory.getAbsoluteFile();
-        logger.trace("Borrowing index writer for {}", indexingDirectory);
-
-        lock.lock();
-        try {
-            IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
-            if ( writerCount == null ) {
-                final List<Closeable> closeables = new ArrayList<>();
-                final Directory directory = FSDirectory.open(indexingDirectory);
-                closeables.add(directory);
-
-                try {
-                    final Analyzer analyzer = new StandardAnalyzer();
-                    closeables.add(analyzer);
-
-                    final IndexWriterConfig config = new IndexWriterConfig(LuceneUtil.LUCENE_VERSION, analyzer);
-                    config.setWriteLockTimeout(300000L);
-
-                    final IndexWriter indexWriter = new IndexWriter(directory, config);
-                    writerCount = new IndexWriterCount(indexWriter, analyzer, directory, 1);
-                    logger.debug("Providing new index writer for {}", indexingDirectory);
-                } catch (final IOException ioe) {
-                    for ( final Closeable closeable : closeables ) {
-                        try {
-                            closeable.close();
-                        } catch (final IOException ioe2) {
-                            ioe.addSuppressed(ioe2);
-                        }
-                    }
-
-                    throw ioe;
-                }
-
-                writerCounts.put(absoluteFile, writerCount);
-
-                // Mark any active searchers as poisoned because we are updating the index
-                final List<ActiveIndexSearcher> searchers = activeSearchers.get(absoluteFile);
-                if ( searchers != null ) {
-                    for (final ActiveIndexSearcher activeSearcher : searchers) {
-                        logger.debug("Poisoning {} because it is searching {}, which is getting updated", activeSearcher, indexingDirectory);
-                        activeSearcher.poison();
-                    }
-                }
-            } else {
-                logger.debug("Providing existing index writer for {} and incrementing count to {}", indexingDirectory, writerCount.getCount() + 1);
-                writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
-                        writerCount.getAnalyzer(), writerCount.getDirectory(), writerCount.getCount() + 1));
-            }
-
-            return writerCount.getWriter();
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    public void returnIndexWriter(final File indexingDirectory, final IndexWriter writer) {
-        final File absoluteFile = indexingDirectory.getAbsoluteFile();
-        logger.trace("Returning Index Writer for {} to IndexManager", indexingDirectory);
-
-        lock.lock();
-        try {
-            final IndexWriterCount count = writerCounts.remove(absoluteFile);
-
-            try {
-                if ( count == null ) {
-                    logger.warn("Index Writer {} was returned to IndexManager for {}, but this writer is not known. "
-                            + "This could potentially lead to a resource leak", writer, indexingDirectory);
-                    writer.close();
-                } else if ( count.getCount() <= 1 ) {
-                    // we are finished with this writer.
-                    logger.debug("Decrementing count for Index Writer for {} to {}; Closing writer", indexingDirectory, count.getCount() - 1);
-                    count.close();
-                } else {
-                    // decrement the count.
-                    logger.debug("Decrementing count for Index Writer for {} to {}", indexingDirectory, count.getCount() - 1);
-                    writerCounts.put(absoluteFile, new IndexWriterCount(count.getWriter(), count.getAnalyzer(), count.getDirectory(), count.getCount() - 1));
-                }
-            } catch (final IOException ioe) {
-                logger.warn("Failed to close Index Writer {} due to {}", writer, ioe);
-                if ( logger.isDebugEnabled() ) {
-                    logger.warn("", ioe);
-                }
-            }
-        } finally {
-            lock.unlock();
-        }
-    }
-
-
-    public IndexSearcher borrowIndexSearcher(final File indexDir) throws IOException {
-        final File absoluteFile = indexDir.getAbsoluteFile();
-        logger.trace("Borrowing index searcher for {}", indexDir);
-
-        lock.lock();
-        try {
-            // check if we already have a reader cached.
-            List<ActiveIndexSearcher> currentlyCached = activeSearchers.get(absoluteFile);
-            if ( currentlyCached == null ) {
-                currentlyCached = new ArrayList<>();
-                activeSearchers.put(absoluteFile, currentlyCached);
-            } else {
-                // keep track of any searchers that have been closed so that we can remove them
-                // from our cache later.
-                for (final ActiveIndexSearcher searcher : currentlyCached) {
-                    if (searcher.isCache()) {
-                        // if the searcher is poisoned, we want to close and expire it.
-                        if (searcher.isPoisoned()) {
-                            continue;
-                        }
-
-                        // if there are no references to the reader, it will have been closed. Since there is no
-                        // isClosed() method, this is how we determine whether it's been closed or not.
-                        final int refCount = searcher.getSearcher().getIndexReader().getRefCount();
-                        if (refCount <= 0) {
-                            // if refCount == 0, then the reader has been closed, so we cannot use the searcher
-                            logger.debug("Reference count for cached Index Searcher for {} is currently {}; "
-                                + "removing cached searcher", absoluteFile, refCount);
-                            continue;
-                        }
-
-                        final int referenceCount = searcher.incrementReferenceCount();
-                        logger.debug("Providing previously cached index searcher for {} and incrementing Reference Count to {}", indexDir, referenceCount);
-                        return searcher.getSearcher();
-                    }
-                }
-            }
-
-            // We found no cached Index Readers. Create a new one. To do this, we need to check
-            // if we have an Index Writer, and if so create a Reader based on the Index Writer.
-            // This will provide us a 'near real time' index reader.
-            final IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
-            if ( writerCount == null ) {
-                final Directory directory = FSDirectory.open(absoluteFile);
-                logger.debug("No Index Writer currently exists for {}; creating a cachable reader", indexDir);
-
-                try {
-                    final DirectoryReader directoryReader = DirectoryReader.open(directory);
-                    final IndexSearcher searcher = new IndexSearcher(directoryReader);
-
-                    // we want to cache the searcher that we create, since it's just a reader.
-                    final ActiveIndexSearcher cached = new ActiveIndexSearcher(searcher, absoluteFile, directoryReader, directory, true);
-                    currentlyCached.add(cached);
-
-                    return cached.getSearcher();
-                } catch (final IOException e) {
-                    logger.error("Failed to create Index Searcher for {} due to {}", absoluteFile, e.toString());
-                    logger.error("", e);
-
-                    try {
-                        directory.close();
-                    } catch (final IOException ioe) {
-                        e.addSuppressed(ioe);
-                    }
-
-                    throw e;
-                }
-            } else {
-                logger.debug("Index Writer currently exists for {}; creating a non-cachable reader and incrementing "
-                        + "counter to {}", indexDir, writerCount.getCount() + 1);
-
-                // increment the writer count to ensure that it's kept open.
-                writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
-                        writerCount.getAnalyzer(), writerCount.getDirectory(), writerCount.getCount() + 1));
-
-                // create a new Index Searcher from the writer so that we don't have an issue with trying
-                // to read from a directory that's locked. If we get the "no segments* file found" with
-                // Lucene, this indicates that an IndexWriter already has the directory open.
-                final IndexWriter writer = writerCount.getWriter();
-                final DirectoryReader directoryReader = DirectoryReader.open(writer, false);
-                final IndexSearcher searcher = new IndexSearcher(directoryReader);
-
-                // we don't want to cache this searcher because it's based on a writer, so we want to get
-                // new values the next time that we search.
-                final ActiveIndexSearcher activeSearcher = new ActiveIndexSearcher(searcher, absoluteFile, directoryReader, null, false);
-
-                currentlyCached.add(activeSearcher);
-                return activeSearcher.getSearcher();
-            }
-        } finally {
-            lock.unlock();
-        }
-    }
-
-
-    public void returnIndexSearcher(final File indexDirectory, final IndexSearcher searcher) {
-        final File absoluteFile = indexDirectory.getAbsoluteFile();
-        logger.trace("Returning index searcher for {} to IndexManager", indexDirectory);
-
-        lock.lock();
-        try {
-            // check if we already have a reader cached.
-            final List<ActiveIndexSearcher> currentlyCached = activeSearchers.get(absoluteFile);
-            if ( currentlyCached == null ) {
-                logger.warn("Received Index Searcher for {} but no searcher was provided for that directory; this could "
-                        + "result in a resource leak", indexDirectory);
-                return;
-            }
-
-            // Check if the given searcher is in our list. We use an Iterator to do this so that if we
-            // find it we can call remove() on the iterator if need be.
-            final Iterator<ActiveIndexSearcher> itr = new ArrayList<>(currentlyCached).iterator();
-            boolean activeSearcherFound = false;
-            while (itr.hasNext()) {
-                final ActiveIndexSearcher activeSearcher = itr.next();
-                if ( activeSearcher.getSearcher().equals(searcher) ) {
-                    activeSearcherFound = true;
-                    if ( activeSearcher.isCache() ) {
-                        // if the searcher is poisoned, close it and remove from "pool". Otherwise,
-                        // just decrement the count. Note here that when we call close() it won't actually close
-                        // the underlying directory reader unless there are no more references to it
-                        if ( activeSearcher.isPoisoned() ) {
-                            itr.remove();
-
-                            try {
-                                activeSearcher.close();
-                            } catch (final IOException ioe) {
-                                logger.warn("Failed to close Index Searcher for {} due to {}", absoluteFile, ioe);
-                                if ( logger.isDebugEnabled() ) {
-                                    logger.warn("", ioe);
-                                }
-                            }
-
-                            return;
-                        } else {
-                            // the searcher is cached. Just leave it open.
-                            final int refCount = activeSearcher.decrementReferenceCount();
-                            logger.debug("Index searcher for {} is cached; leaving open with reference count of {}", indexDirectory, refCount);
-                            return;
-                        }
-                    } else {
-                        // searcher is not cached. It was created from a writer, and we want
-                        // the newest updates the next time that we get a searcher, so we will
-                        // go ahead and close this one out.
-                        itr.remove();
-
-                        // decrement the writer count because we incremented it when creating the searcher
-                        final IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
-                        if ( writerCount != null ) {
-                            if ( writerCount.getCount() <= 1 ) {
-                                try {
-                                    logger.debug("Index searcher for {} is not cached. Writer count is "
-                                            + "decremented to {}; closing writer", indexDirectory, writerCount.getCount() - 1);
-
-                                    writerCount.close();
-                                } catch (final IOException ioe) {
-                                    logger.warn("Failed to close Index Writer for {} due to {}", absoluteFile, ioe);
-                                    if ( logger.isDebugEnabled() ) {
-                                        logger.warn("", ioe);
-                                    }
-                                }
-                            } else {
-                                logger.debug("Index searcher for {} is not cached. Writer count is decremented "
-                                        + "to {}; leaving writer open", indexDirectory, writerCount.getCount() - 1);
-
-                                writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
-                                        writerCount.getAnalyzer(), writerCount.getDirectory(),
-                                        writerCount.getCount() - 1));
-                            }
-                        }
-
-                        try {
-                            logger.debug("Closing Index Searcher for {}", indexDirectory);
-                            final boolean allReferencesClosed = activeSearcher.close();
-                            if (!allReferencesClosed) {
-                                currentlyCached.add(activeSearcher);
-                            }
-                        } catch (final IOException ioe) {
-                            logger.warn("Failed to close Index Searcher for {} due to {}", absoluteFile, ioe);
-                            if ( logger.isDebugEnabled() ) {
-                                logger.warn("", ioe);
-                            }
-                        }
-                    }
-                }
-            }
-
-            if (!activeSearcherFound) {
-                logger.debug("Index Searcher {} was returned for {} but found no Active Searcher for it. "
-                    + "This will occur if the Index Searcher was already returned while being poisoned.", searcher, indexDirectory);
-            }
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    @Override
-    public void close() throws IOException {
-        logger.debug("Closing Index Manager");
-
-        lock.lock();
-        try {
-            IOException ioe = null;
-
-            for ( final IndexWriterCount count : writerCounts.values() ) {
-                try {
-                    count.close();
-                } catch (final IOException e) {
-                    if ( ioe == null ) {
-                        ioe = e;
-                    } else {
-                        ioe.addSuppressed(e);
-                    }
-                }
-            }
-
-            for (final List<ActiveIndexSearcher> searcherList : activeSearchers.values()) {
-                for (final ActiveIndexSearcher searcher : searcherList) {
-                    try {
-                        searcher.close();
-                    } catch (final IOException e) {
-                        if ( ioe == null ) {
-                            ioe = e;
-                        } else {
-                            ioe.addSuppressed(e);
-                        }
-                    }
-                }
-            }
-
-            if ( ioe != null ) {
-                throw ioe;
-            }
-        } finally {
-            lock.unlock();
-        }
-    }
-
-
-    private static void close(final Closeable... closeables) throws IOException {
-        IOException ioe = null;
-        for ( final Closeable closeable : closeables ) {
-            if ( closeable == null ) {
-                continue;
-            }
-
-            try {
-                closeable.close();
-            } catch (final IOException e) {
-                if ( ioe == null ) {
-                    ioe = e;
-                } else {
-                    ioe.addSuppressed(e);
-                }
-            }
-        }
-
-        if ( ioe != null ) {
-            throw ioe;
-        }
-    }
-
-
-    private static class ActiveIndexSearcher {
-        private final IndexSearcher searcher;
-        private final DirectoryReader directoryReader;
-        private final File indexDirectory;
-        private final Directory directory;
-        private final boolean cache;
-        private final AtomicInteger referenceCount = new AtomicInteger(1);
-        private volatile boolean poisoned = false;
-
-        public ActiveIndexSearcher(final IndexSearcher searcher, final File indexDirectory, final DirectoryReader directoryReader,
-                final Directory directory, final boolean cache) {
-            this.searcher = searcher;
-            this.directoryReader = directoryReader;
-            this.indexDirectory = indexDirectory;
-            this.directory = directory;
-            this.cache = cache;
-        }
-
-        public boolean isCache() {
-            return cache;
-        }
-
-        public IndexSearcher getSearcher() {
-            return searcher;
-        }
-
-        public boolean isPoisoned() {
-            return poisoned;
-        }
-
-        public void poison() {
-            this.poisoned = true;
-        }
-
-        public int incrementReferenceCount() {
-            return referenceCount.incrementAndGet();
-        }
-
-        public int decrementReferenceCount() {
-            return referenceCount.decrementAndGet();
-        }
-
-        public boolean close() throws IOException {
-            final int updatedRefCount = referenceCount.decrementAndGet();
-            if (updatedRefCount <= 0) {
-                logger.debug("Decremented Reference Count for {} to {}; closing underlying directory reader", this, updatedRefCount);
-                IndexManager.close(directoryReader, directory);
-                return true;
-            } else {
-                logger.debug("Decremented Reference Count for {} to {}; leaving underlying directory reader open", this, updatedRefCount);
-                return false;
-            }
-        }
-
-        @Override
-        public String toString() {
-            return "ActiveIndexSearcher[directory=" + indexDirectory + ", cached=" + cache + ", poisoned=" + poisoned + "]";
-        }
-    }
-
-
-    private static class IndexWriterCount implements Closeable {
-        private final IndexWriter writer;
-        private final Analyzer analyzer;
-        private final Directory directory;
-        private final int count;
-
-        public IndexWriterCount(final IndexWriter writer, final Analyzer analyzer, final Directory directory, final int count) {
-            this.writer = writer;
-            this.analyzer = analyzer;
-            this.directory = directory;
-            this.count = count;
-        }
-
-        public Analyzer getAnalyzer() {
-            return analyzer;
-        }
 
-        public Directory getDirectory() {
-            return directory;
-        }
+public interface IndexManager extends Closeable {
+    IndexSearcher borrowIndexSearcher(File indexDir) throws IOException;
 
-        public IndexWriter getWriter() {
-            return writer;
-        }
+    IndexWriter borrowIndexWriter(File indexingDirectory) throws IOException;
 
-        public int getCount() {
-            return count;
-        }
+    void removeIndex(final File indexDirectory);
 
-        @Override
-        public void close() throws IOException {
-            IndexManager.close(writer, analyzer, directory);
-        }
-    }
+    void returnIndexSearcher(File indexDirectory, IndexSearcher searcher);
 
-}
+    void returnIndexWriter(File indexingDirectory, IndexWriter writer);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/08812545/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/SimpleIndexManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/SimpleIndexManager.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/SimpleIndexManager.java
new file mode 100644
index 0000000..daf6413
--- /dev/null
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/SimpleIndexManager.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance.lucene;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FSDirectory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SimpleIndexManager implements IndexManager {
+    private static final Logger logger = LoggerFactory.getLogger(SimpleIndexManager.class);
+
+    private final ConcurrentMap<Object, List<Closeable>> closeables = new ConcurrentHashMap<>();
+    private final Map<File, IndexWriterCount> writerCounts = new HashMap<>();
+
+    @Override
+    public void close() throws IOException {
+    }
+
+    @Override
+    public IndexSearcher borrowIndexSearcher(final File indexDir) throws IOException {
+        logger.debug("Creating index searcher for {}", indexDir);
+        final Directory directory = FSDirectory.open(indexDir);
+        final DirectoryReader directoryReader = DirectoryReader.open(directory);
+        final IndexSearcher searcher = new IndexSearcher(directoryReader);
+
+        final List<Closeable> closeableList = new ArrayList<>(2);
+        closeableList.add(directoryReader);
+        closeableList.add(directory);
+        closeables.put(searcher, closeableList);
+        logger.debug("Created index searcher {} for {}", searcher, indexDir);
+
+        return searcher;
+    }
+
+    @Override
+    public void returnIndexSearcher(final File indexDirectory, final IndexSearcher searcher) {
+        logger.debug("Closing index searcher {} for {}", searcher, indexDirectory);
+
+        final List<Closeable> closeableList = closeables.get(searcher);
+        if (closeableList != null) {
+            for (final Closeable closeable : closeableList) {
+                closeQuietly(closeable);
+            }
+        }
+
+        logger.debug("Closed index searcher {}", searcher);
+    }
+
+    @Override
+    public void removeIndex(final File indexDirectory) {
+    }
+
+
+    @Override
+    public synchronized IndexWriter borrowIndexWriter(final File indexingDirectory) throws IOException {
+        final File absoluteFile = indexingDirectory.getAbsoluteFile();
+        logger.trace("Borrowing index writer for {}", indexingDirectory);
+
+        IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
+        if (writerCount == null) {
+            final List<Closeable> closeables = new ArrayList<>();
+            final Directory directory = FSDirectory.open(indexingDirectory);
+            closeables.add(directory);
+
+            try {
+                final Analyzer analyzer = new StandardAnalyzer();
+                closeables.add(analyzer);
+
+                final IndexWriterConfig config = new IndexWriterConfig(LuceneUtil.LUCENE_VERSION, analyzer);
+                config.setWriteLockTimeout(300000L);
+
+                final IndexWriter indexWriter = new IndexWriter(directory, config);
+                writerCount = new IndexWriterCount(indexWriter, analyzer, directory, 1);
+                logger.debug("Providing new index writer for {}", indexingDirectory);
+            } catch (final IOException ioe) {
+                for (final Closeable closeable : closeables) {
+                    try {
+                        closeable.close();
+                    } catch (final IOException ioe2) {
+                        ioe.addSuppressed(ioe2);
+                    }
+                }
+
+                throw ioe;
+            }
+
+            writerCounts.put(absoluteFile, writerCount);
+        } else {
+            logger.debug("Providing existing index writer for {} and incrementing count to {}", indexingDirectory, writerCount.getCount() + 1);
+            writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
+                writerCount.getAnalyzer(), writerCount.getDirectory(), writerCount.getCount() + 1));
+        }
+
+        return writerCount.getWriter();
+    }
+
+
+    @Override
+    public synchronized void returnIndexWriter(final File indexingDirectory, final IndexWriter writer) {
+        final File absoluteFile = indexingDirectory.getAbsoluteFile();
+        logger.trace("Returning Index Writer for {} to IndexManager", indexingDirectory);
+
+        final IndexWriterCount count = writerCounts.remove(absoluteFile);
+
+        try {
+            if (count == null) {
+                logger.warn("Index Writer {} was returned to IndexManager for {}, but this writer is not known. "
+                    + "This could potentially lead to a resource leak", writer, indexingDirectory);
+                writer.close();
+            } else if (count.getCount() <= 1) {
+                // we are finished with this writer.
+                logger.debug("Decrementing count for Index Writer for {} to {}; Closing writer", indexingDirectory, count.getCount() - 1);
+                writer.commit();
+                count.close();
+            } else {
+                // decrement the count.
+                logger.debug("Decrementing count for Index Writer for {} to {}", indexingDirectory, count.getCount() - 1);
+                writerCounts.put(absoluteFile, new IndexWriterCount(count.getWriter(), count.getAnalyzer(), count.getDirectory(), count.getCount() - 1));
+            }
+        } catch (final IOException ioe) {
+            logger.warn("Failed to close Index Writer {} due to {}", writer, ioe);
+            if (logger.isDebugEnabled()) {
+                logger.warn("", ioe);
+            }
+        }
+    }
+
+    private static void closeQuietly(final Closeable... closeables) {
+        for (final Closeable closeable : closeables) {
+            if (closeable == null) {
+                continue;
+            }
+
+            try {
+                closeable.close();
+            } catch (final Exception e) {
+                logger.warn("Failed to close {} due to {}", closeable, e);
+            }
+        }
+    }
+
+
+    private static class IndexWriterCount implements Closeable {
+        private final IndexWriter writer;
+        private final Analyzer analyzer;
+        private final Directory directory;
+        private final int count;
+
+        public IndexWriterCount(final IndexWriter writer, final Analyzer analyzer, final Directory directory, final int count) {
+            this.writer = writer;
+            this.analyzer = analyzer;
+            this.directory = directory;
+            this.count = count;
+        }
+
+        public Analyzer getAnalyzer() {
+            return analyzer;
+        }
+
+        public Directory getDirectory() {
+            return directory;
+        }
+
+        public IndexWriter getWriter() {
+            return writer;
+        }
+
+        public int getCount() {
+            return count;
+        }
+
+        @Override
+        public void close() throws IOException {
+            closeQuietly(writer, analyzer, directory);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/08812545/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
index 845d6ea..a210aa9 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
@@ -64,6 +64,7 @@ import org.apache.nifi.provenance.lineage.Lineage;
 import org.apache.nifi.provenance.lineage.LineageEdge;
 import org.apache.nifi.provenance.lineage.LineageNode;
 import org.apache.nifi.provenance.lineage.LineageNodeType;
+import org.apache.nifi.provenance.lucene.CachingIndexManager;
 import org.apache.nifi.provenance.lucene.IndexManager;
 import org.apache.nifi.provenance.lucene.IndexingAction;
 import org.apache.nifi.provenance.search.Query;
@@ -496,16 +497,16 @@ public class TestPersistentProvenanceRepository {
 
         final CountDownLatch obtainIndexSearcherLatch = new CountDownLatch(2);
         repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS) {
-            private IndexManager wrappedManager = null;
+            private CachingIndexManager wrappedManager = null;
 
             // Create an IndexManager that adds a delay before returning the Index Searcher.
             @Override
-            protected synchronized IndexManager getIndexManager() {
+            protected synchronized CachingIndexManager getIndexManager() {
                 if (wrappedManager == null) {
                     final IndexManager mgr = super.getIndexManager();
                     final Logger logger = LoggerFactory.getLogger("IndexManager");
 
-                    wrappedManager = new IndexManager() {
+                    wrappedManager = new CachingIndexManager() {
                         final AtomicInteger indexSearcherCount = new AtomicInteger(0);
 
                         @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/08812545/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/lucene/TestCachingIndexManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/lucene/TestCachingIndexManager.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/lucene/TestCachingIndexManager.java
new file mode 100644
index 0000000..36f0b00
--- /dev/null
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/lucene/TestCachingIndexManager.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance.lucene;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.UUID;
+
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field.Store;
+import org.apache.lucene.document.StringField;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.search.BooleanClause;
+import org.apache.lucene.search.BooleanQuery;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.TermQuery;
+import org.apache.lucene.search.TopDocs;
+import org.apache.lucene.search.BooleanClause.Occur;
+import org.apache.nifi.util.file.FileUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestCachingIndexManager {
+
+    private File indexDir;
+    private CachingIndexManager manager;
+
+    @Before
+    public void setup() {
+        System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance", "DEBUG");
+        manager = new CachingIndexManager();
+
+        indexDir = new File("target/testIndexManager/" + UUID.randomUUID().toString());
+        indexDir.mkdirs();
+    }
+
+    @After
+    public void cleanup() throws IOException {
+        manager.close();
+
+        FileUtils.deleteFiles(Collections.singleton(indexDir), true);
+    }
+
+    @Test
+    public void test() throws IOException {
+        // Create and IndexWriter and add a document to the index, then close the writer.
+        // This gives us something that we can query.
+        final IndexWriter writer = manager.borrowIndexWriter(indexDir);
+        final Document doc = new Document();
+        doc.add(new StringField("unit test", "true", Store.YES));
+        writer.addDocument(doc);
+        manager.returnIndexWriter(indexDir, writer);
+
+        // Get an Index Searcher that we can use to query the index.
+        final IndexSearcher cachedSearcher = manager.borrowIndexSearcher(indexDir);
+
+        // Ensure that we get the expected results.
+        assertCount(cachedSearcher, 1);
+
+        // While we already have an Index Searcher, get a writer for the same index.
+        // This will cause the Index Searcher to be marked as poisoned.
+        final IndexWriter writer2 = manager.borrowIndexWriter(indexDir);
+
+        // Obtain a new Index Searcher with the writer open. This Index Searcher should *NOT*
+        // be the same as the previous searcher because the new one will be a Near-Real-Time Index Searcher
+        // while the other is not.
+        final IndexSearcher nrtSearcher = manager.borrowIndexSearcher(indexDir);
+        assertNotSame(cachedSearcher, nrtSearcher);
+
+        // Ensure that we get the expected query results.
+        assertCount(nrtSearcher, 1);
+
+        // Return the writer, so that there is no longer an active writer for the index.
+        manager.returnIndexWriter(indexDir, writer2);
+
+        // Ensure that we still get the same result.
+        assertCount(cachedSearcher, 1);
+        manager.returnIndexSearcher(indexDir, cachedSearcher);
+
+        // Ensure that our near-real-time index searcher still gets the same result.
+        assertCount(nrtSearcher, 1);
+        manager.returnIndexSearcher(indexDir, nrtSearcher);
+    }
+
+    private void assertCount(final IndexSearcher searcher, final int count) throws IOException {
+        final BooleanQuery query = new BooleanQuery();
+        query.add(new BooleanClause(new TermQuery(new Term("unit test", "true")), Occur.MUST));
+        final TopDocs topDocs = searcher.search(query, count * 10);
+        assertNotNull(topDocs);
+        assertEquals(1, topDocs.totalHits);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/08812545/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/lucene/TestIndexManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/lucene/TestIndexManager.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/lucene/TestIndexManager.java
deleted file mode 100644
index afe4512..0000000
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/lucene/TestIndexManager.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.provenance.lucene;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNotSame;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.UUID;
-
-import org.apache.lucene.document.Document;
-import org.apache.lucene.document.Field.Store;
-import org.apache.lucene.document.StringField;
-import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.index.Term;
-import org.apache.lucene.search.BooleanClause;
-import org.apache.lucene.search.BooleanQuery;
-import org.apache.lucene.search.IndexSearcher;
-import org.apache.lucene.search.TermQuery;
-import org.apache.lucene.search.TopDocs;
-import org.apache.lucene.search.BooleanClause.Occur;
-import org.apache.nifi.util.file.FileUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class TestIndexManager {
-
-    private File indexDir;
-    private IndexManager manager;
-
-    @Before
-    public void setup() {
-        System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance", "DEBUG");
-        manager = new IndexManager();
-
-        indexDir = new File("target/testIndexManager/" + UUID.randomUUID().toString());
-        indexDir.mkdirs();
-    }
-
-    @After
-    public void cleanup() throws IOException {
-        manager.close();
-
-        FileUtils.deleteFiles(Collections.singleton(indexDir), true);
-    }
-
-    @Test
-    public void test() throws IOException {
-        // Create and IndexWriter and add a document to the index, then close the writer.
-        // This gives us something that we can query.
-        final IndexWriter writer = manager.borrowIndexWriter(indexDir);
-        final Document doc = new Document();
-        doc.add(new StringField("unit test", "true", Store.YES));
-        writer.addDocument(doc);
-        manager.returnIndexWriter(indexDir, writer);
-
-        // Get an Index Searcher that we can use to query the index.
-        final IndexSearcher cachedSearcher = manager.borrowIndexSearcher(indexDir);
-
-        // Ensure that we get the expected results.
-        assertCount(cachedSearcher, 1);
-
-        // While we already have an Index Searcher, get a writer for the same index.
-        // This will cause the Index Searcher to be marked as poisoned.
-        final IndexWriter writer2 = manager.borrowIndexWriter(indexDir);
-
-        // Obtain a new Index Searcher with the writer open. This Index Searcher should *NOT*
-        // be the same as the previous searcher because the new one will be a Near-Real-Time Index Searcher
-        // while the other is not.
-        final IndexSearcher nrtSearcher = manager.borrowIndexSearcher(indexDir);
-        assertNotSame(cachedSearcher, nrtSearcher);
-
-        // Ensure that we get the expected query results.
-        assertCount(nrtSearcher, 1);
-
-        // Return the writer, so that there is no longer an active writer for the index.
-        manager.returnIndexWriter(indexDir, writer2);
-
-        // Ensure that we still get the same result.
-        assertCount(cachedSearcher, 1);
-        manager.returnIndexSearcher(indexDir, cachedSearcher);
-
-        // Ensure that our near-real-time index searcher still gets the same result.
-        assertCount(nrtSearcher, 1);
-        manager.returnIndexSearcher(indexDir, nrtSearcher);
-    }
-
-    private void assertCount(final IndexSearcher searcher, final int count) throws IOException {
-        final BooleanQuery query = new BooleanQuery();
-        query.add(new BooleanClause(new TermQuery(new Term("unit test", "true")), Occur.MUST));
-        final TopDocs topDocs = searcher.search(query, count * 10);
-        assertNotNull(topDocs);
-        assertEquals(1, topDocs.totalHits);
-    }
-}