You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by js...@apache.org on 2016/10/06 17:19:07 UTC

[3/3] nifi git commit: NIFI-2452: Ensure that we do not close Index Readers that are still in use

NIFI-2452: Ensure that we do not close Index Readers that are still in use

Signed-off-by: Joe Skora <js...@apache.org>

This closes #1072.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/db189e3b
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/db189e3b
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/db189e3b

Branch: refs/heads/0.x
Commit: db189e3b3df8d2d6f67e2548215a2b775b47cfb0
Parents: 83fdced
Author: Mark Payne <ma...@hotmail.com>
Authored: Mon Aug 1 14:51:02 2016 -0400
Committer: Joe Skora <js...@apache.org>
Committed: Thu Oct 6 13:17:44 2016 -0400

----------------------------------------------------------------------
 .../nifi/provenance/IndexConfiguration.java     |   3 +-
 .../PersistentProvenanceRepository.java         |  20 ++-
 .../nifi/provenance/lucene/IndexManager.java    |  55 +++++--
 .../nifi/provenance/lucene/IndexSearch.java     |   3 +-
 .../nifi/provenance/lucene/LineageQuery.java    |   1 +
 .../TestPersistentProvenanceRepository.java     | 163 +++++++++++++++++++
 6 files changed, 223 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/db189e3b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java
index 4e80811..af7bff5 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java
@@ -212,13 +212,14 @@ public class IndexConfiguration {
         final List<File> dirs = new ArrayList<>();
         lock.lock();
         try {
+            // Sort directories so that we return the newest index first
             final List<File> sortedIndexDirectories = getIndexDirectories();
             Collections.sort(sortedIndexDirectories, new Comparator<File>() {
                 @Override
                 public int compare(final File o1, final File o2) {
                     final long epochTimestamp1 = getIndexStartTime(o1);
                     final long epochTimestamp2 = getIndexStartTime(o2);
-                    return Long.compare(epochTimestamp1, epochTimestamp2);
+                    return Long.compare(epochTimestamp2, epochTimestamp1);
                 }
             });
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/db189e3b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
index d7ab2d7..8b971b5 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
@@ -207,6 +207,10 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
         rolloverExecutor = Executors.newScheduledThreadPool(numRolloverThreads, new NamedThreadFactory("Provenance Repository Rollover Thread"));
     }
 
+    protected IndexManager getIndexManager() {
+        return indexManager;
+    }
+
     @Override
     public void initialize(final EventReporter eventReporter) throws IOException {
         writeLock.lock();
@@ -641,7 +645,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
             rolloverExecutor.shutdownNow();
             queryExecService.shutdownNow();
 
-            indexManager.close();
+            getIndexManager().close();
 
             if ( writers != null ) {
                 for (final RecordWriter writer : writers) {
@@ -1006,7 +1010,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
             // we can safely delete the first index because the latest event in the index is an event
             // that has already been expired from the repository.
             final File indexingDirectory = indexDirs.get(0);
-            indexManager.removeIndex(indexingDirectory);
+            getIndexManager().removeIndex(indexingDirectory);
             indexConfig.removeIndexDirectory(indexingDirectory);
             deleteDirectory(indexingDirectory);
 
@@ -1474,7 +1478,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
                 logger.warn("Merged Journal File {} already exists; however, all partial journal files also exist "
                         + "so assuming that the merge did not finish. Repeating procedure in order to ensure consistency.");
 
-                final DeleteIndexAction deleteAction = new DeleteIndexAction(this, indexConfig, indexManager);
+                final DeleteIndexAction deleteAction = new DeleteIndexAction(this, indexConfig, getIndexManager());
                 try {
                     deleteAction.execute(suggestedMergeFile);
                 } catch (final Exception e) {
@@ -1610,7 +1614,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
                 final AtomicBoolean finishedAdding = new AtomicBoolean(false);
                 final List<Future<?>> futures = new ArrayList<>();
 
-                final IndexWriter indexWriter = indexManager.borrowIndexWriter(indexingDirectory);
+                final IndexWriter indexWriter = getIndexManager().borrowIndexWriter(indexingDirectory);
                 try {
                     final ExecutorService exec = Executors.newFixedThreadPool(configuration.getIndexThreadPoolSize(), new ThreadFactory() {
                         @Override
@@ -1733,7 +1737,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
                         }
                     }
                 } finally {
-                    indexManager.returnIndexWriter(indexingDirectory, indexWriter);
+                    getIndexManager().returnIndexWriter(indexingDirectory, indexWriter);
                 }
 
                 indexConfig.setMaxIdIndexed(maxId);
@@ -1934,7 +1938,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
      * @return an Iterator of ProvenanceEventRecord that match the query
      * @throws IOException if unable to perform the query
      */
-    public Iterator<ProvenanceEventRecord> queryLucene(final org.apache.lucene.search.Query luceneQuery) throws IOException {
+    Iterator<ProvenanceEventRecord> queryLucene(final org.apache.lucene.search.Query luceneQuery) throws IOException {
         final List<File> indexFiles = indexConfig.getIndexDirectories();
 
         final AtomicLong hits = new AtomicLong(0L);
@@ -2366,7 +2370,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
         @Override
         public void run() {
             try {
-                final IndexSearch search = new IndexSearch(PersistentProvenanceRepository.this, indexDir, indexManager, maxAttributeChars);
+                final IndexSearch search = new IndexSearch(PersistentProvenanceRepository.this, indexDir, getIndexManager(), maxAttributeChars);
                 final StandardQueryResult queryResult = search.search(query, retrievalCount, firstEventTimestamp);
                 submission.getResult().update(queryResult.getMatchingEvents(), queryResult.getTotalHitCount());
                 if (queryResult.isFinished()) {
@@ -2408,7 +2412,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
 
             try {
                 final Set<ProvenanceEventRecord> matchingRecords = LineageQuery.computeLineageForFlowFiles(PersistentProvenanceRepository.this,
-                    indexManager, indexDir, null, flowFileUuids, maxAttributeChars);
+                    getIndexManager(), indexDir, null, flowFileUuids, maxAttributeChars);
 
                 final StandardLineageResult result = submission.getResult();
                 result.update(matchingRecords);

http://git-wip-us.apache.org/repos/asf/nifi/blob/db189e3b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
index 57d0d78..07cd190 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
@@ -26,6 +26,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -86,7 +87,7 @@ public class IndexManager implements Closeable {
 
     public IndexWriter borrowIndexWriter(final File indexingDirectory) throws IOException {
         final File absoluteFile = indexingDirectory.getAbsoluteFile();
-        logger.debug("Borrowing index writer for {}", indexingDirectory);
+        logger.trace("Borrowing index writer for {}", indexingDirectory);
 
         lock.lock();
         try {
@@ -124,6 +125,7 @@ public class IndexManager implements Closeable {
                 final List<ActiveIndexSearcher> searchers = activeSearchers.get(absoluteFile);
                 if ( searchers != null ) {
                     for (final ActiveIndexSearcher activeSearcher : searchers) {
+                        logger.debug("Poisoning {} because it is searching {}, which is getting updated", activeSearcher, indexingDirectory);
                         activeSearcher.poison();
                     }
                 }
@@ -141,7 +143,7 @@ public class IndexManager implements Closeable {
 
     public void returnIndexWriter(final File indexingDirectory, final IndexWriter writer) {
         final File absoluteFile = indexingDirectory.getAbsoluteFile();
-        logger.debug("Returning Index Writer for {} to IndexManager", indexingDirectory);
+        logger.trace("Returning Index Writer for {} to IndexManager", indexingDirectory);
 
         lock.lock();
         try {
@@ -154,7 +156,7 @@ public class IndexManager implements Closeable {
                     writer.close();
                 } else if ( count.getCount() <= 1 ) {
                     // we are finished with this writer.
-                    logger.debug("Closing Index Writer for {}", indexingDirectory);
+                    logger.debug("Decrementing count for Index Writer for {} to {}; Closing writer", indexingDirectory, count.getCount() - 1);
                     count.close();
                 } else {
                     // decrement the count.
@@ -175,7 +177,7 @@ public class IndexManager implements Closeable {
 
     public IndexSearcher borrowIndexSearcher(final File indexDir) throws IOException {
         final File absoluteFile = indexDir.getAbsoluteFile();
-        logger.debug("Borrowing index searcher for {}", indexDir);
+        logger.trace("Borrowing index searcher for {}", indexDir);
 
         lock.lock();
         try {
@@ -210,7 +212,8 @@ public class IndexManager implements Closeable {
                                 continue;
                             }
 
-                            logger.debug("Providing previously cached index searcher for {}", indexDir);
+                            final int referenceCount = searcher.incrementReferenceCount();
+                            logger.debug("Providing previously cached index searcher for {} and incrementing Reference Count to {}", indexDir, referenceCount);
                             return searcher.getSearcher();
                         }
                     }
@@ -219,7 +222,9 @@ public class IndexManager implements Closeable {
                     // from the cache so that we don't try to use them again later.
                     for ( final ActiveIndexSearcher searcher : expired ) {
                         try {
+                            logger.debug("Closing {}", searcher);
                             searcher.close();
+                            logger.trace("Closed {}", searcher);
                         } catch (final Exception e) {
                             logger.debug("Failed to close 'expired' IndexSearcher {}", searcher);
                         }
@@ -239,11 +244,14 @@ public class IndexManager implements Closeable {
                     final IndexSearcher searcher = new IndexSearcher(directoryReader);
 
                     // we want to cache the searcher that we create, since it's just a reader.
-                    final ActiveIndexSearcher cached = new ActiveIndexSearcher(searcher, directoryReader, directory, true);
+                    final ActiveIndexSearcher cached = new ActiveIndexSearcher(searcher, absoluteFile, directoryReader, directory, true);
                     currentlyCached.add(cached);
 
                     return cached.getSearcher();
                 } catch (final IOException e) {
+                    logger.error("Failed to create Index Searcher for {} due to {}", absoluteFile, e.toString());
+                    logger.error("", e);
+
                     try {
                         directory.close();
                     } catch (final IOException ioe) {
@@ -269,7 +277,7 @@ public class IndexManager implements Closeable {
 
                 // we don't want to cache this searcher because it's based on a writer, so we want to get
                 // new values the next time that we search.
-                final ActiveIndexSearcher activeSearcher = new ActiveIndexSearcher(searcher, directoryReader, null, false);
+                final ActiveIndexSearcher activeSearcher = new ActiveIndexSearcher(searcher, absoluteFile, directoryReader, null, false);
 
                 currentlyCached.add(activeSearcher);
                 return activeSearcher.getSearcher();
@@ -282,7 +290,7 @@ public class IndexManager implements Closeable {
 
     public void returnIndexSearcher(final File indexDirectory, final IndexSearcher searcher) {
         final File absoluteFile = indexDirectory.getAbsoluteFile();
-        logger.debug("Returning index searcher for {} to IndexManager", indexDirectory);
+        logger.trace("Returning index searcher for {} to IndexManager", indexDirectory);
 
         lock.lock();
         try {
@@ -318,7 +326,8 @@ public class IndexManager implements Closeable {
                             return;
                         } else {
                             // the searcher is cached. Just leave it open.
-                            logger.debug("Index searcher for {} is cached; leaving open", indexDirectory);
+                            final int refCount = activeSearcher.decrementReferenceCount();
+                            logger.debug("Index searcher for {} is cached; leaving open with reference count of {}", indexDirectory, refCount);
                             return;
                         }
                     } else {
@@ -439,14 +448,17 @@ public class IndexManager implements Closeable {
     private static class ActiveIndexSearcher implements Closeable {
         private final IndexSearcher searcher;
         private final DirectoryReader directoryReader;
+        private final File indexDirectory;
         private final Directory directory;
         private final boolean cache;
-        private boolean poisoned = false;
+        private final AtomicInteger referenceCount = new AtomicInteger(1);
+        private volatile boolean poisoned = false;
 
-        public ActiveIndexSearcher(final IndexSearcher searcher, final DirectoryReader directoryReader,
+        public ActiveIndexSearcher(final IndexSearcher searcher, final File indexDirectory, final DirectoryReader directoryReader,
                 final Directory directory, final boolean cache) {
             this.searcher = searcher;
             this.directoryReader = directoryReader;
+            this.indexDirectory = indexDirectory;
             this.directory = directory;
             this.cache = cache;
         }
@@ -467,9 +479,28 @@ public class IndexManager implements Closeable {
             this.poisoned = true;
         }
 
+        public int incrementReferenceCount() {
+            return referenceCount.incrementAndGet();
+        }
+
+        public int decrementReferenceCount() {
+            return referenceCount.decrementAndGet();
+        }
+
         @Override
         public void close() throws IOException {
-            IndexManager.close(directoryReader, directory);
+            final int updatedRefCount = referenceCount.decrementAndGet();
+            if (updatedRefCount <= 0) {
+                logger.debug("Decremented Reference Count for {} to {}; closing underlying directory reader", this, updatedRefCount);
+                IndexManager.close(directoryReader, directory);
+            } else {
+                logger.debug("Decremented Reference Count for {} to {}; leaving underlying directory reader open", this, updatedRefCount);
+            }
+        }
+
+        @Override
+        public String toString() {
+            return "ActiveIndexSearcher[directory=" + indexDirectory + ", cached=" + cache + ", poisoned=" + poisoned + "]";
         }
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/db189e3b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
index b8661df..03a8df9 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
@@ -77,11 +77,12 @@ public class IndexSearch {
             final long searchStartNanos = System.nanoTime();
             final long openSearcherNanos = searchStartNanos - start;
 
+            logger.debug("Searching {} for {}", this, provenanceQuery);
             final TopDocs topDocs = searcher.search(luceneQuery, provenanceQuery.getMaxResults());
             final long finishSearch = System.nanoTime();
             final long searchNanos = finishSearch - searchStartNanos;
 
-            logger.debug("Searching {} took {} millis; opening searcher took {} millis", this,
+            logger.debug("Searching {} for {} took {} millis; opening searcher took {} millis", this, provenanceQuery,
                     TimeUnit.NANOSECONDS.toMillis(searchNanos), TimeUnit.NANOSECONDS.toMillis(openSearcherNanos));
 
             if (topDocs.totalHits == 0) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/db189e3b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java
index ce60e03..af738cd 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java
@@ -90,6 +90,7 @@ public class LineageQuery {
                 }
 
                 final long searchStart = System.nanoTime();
+                logger.debug("Searching {} for {}", indexDirectory, query);
                 final TopDocs uuidQueryTopDocs = searcher.search(query, MAX_QUERY_RESULTS);
                 final long searchEnd = System.nanoTime();
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/db189e3b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
index faaef87..0aa0d0f 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
@@ -33,6 +33,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -59,6 +60,7 @@ import org.apache.nifi.provenance.lineage.Lineage;
 import org.apache.nifi.provenance.lineage.LineageEdge;
 import org.apache.nifi.provenance.lineage.LineageNode;
 import org.apache.nifi.provenance.lineage.LineageNodeType;
+import org.apache.nifi.provenance.lucene.IndexManager;
 import org.apache.nifi.provenance.lucene.IndexingAction;
 import org.apache.nifi.provenance.search.Query;
 import org.apache.nifi.provenance.search.QueryResult;
@@ -82,6 +84,8 @@ import org.junit.rules.TestName;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class TestPersistentProvenanceRepository {
 
@@ -522,6 +526,165 @@ public class TestPersistentProvenanceRepository {
         assertTrue(newRecordSet.getMatchingEvents().isEmpty());
     }
 
+    // TODO: Switch to 10,000.
+    @Test(timeout = 1000000)
+    public void testModifyIndexWhileSearching() throws IOException, InterruptedException, ParseException {
+        final RepositoryConfiguration config = createConfiguration();
+        config.setMaxRecordLife(30, TimeUnit.SECONDS);
+        config.setMaxStorageCapacity(1024L * 1024L * 10);
+        config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS);
+        config.setMaxEventFileCapacity(1024L * 1024L * 10);
+        config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields()));
+
+        final CountDownLatch obtainIndexSearcherLatch = new CountDownLatch(2);
+        repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS) {
+            private IndexManager wrappedManager = null;
+
+            // Create an IndexManager that adds a delay before returning the Index Searcher.
+            @Override
+            protected synchronized IndexManager getIndexManager() {
+                if (wrappedManager == null) {
+                    final IndexManager mgr = super.getIndexManager();
+                    final Logger logger = LoggerFactory.getLogger("IndexManager");
+
+                    wrappedManager = new IndexManager() {
+                        final AtomicInteger indexSearcherCount = new AtomicInteger(0);
+
+                        @Override
+                        public IndexSearcher borrowIndexSearcher(File indexDir) throws IOException {
+                            final IndexSearcher searcher = mgr.borrowIndexSearcher(indexDir);
+                            final int idx = indexSearcherCount.incrementAndGet();
+                            obtainIndexSearcherLatch.countDown();
+
+                            // The first searcher should sleep for 3 seconds. The second searcher should
+                            // sleep for 5 seconds. This allows us to have two threads each obtain a Searcher
+                            // and then have one of them finish searching and close the searcher if it's poisoned while the
+                            // second thread is still holding the searcher
+                            try {
+                                if (idx == 1) {
+                                    Thread.sleep(3000L);
+                                } else {
+                                    Thread.sleep(5000L);
+                                }
+                            } catch (InterruptedException e) {
+                                throw new IOException("Interrupted", e);
+                            }
+
+                            logger.info("Releasing index searcher");
+                            return searcher;
+                        }
+
+                        @Override
+                        public IndexWriter borrowIndexWriter(File indexingDirectory) throws IOException {
+                            return mgr.borrowIndexWriter(indexingDirectory);
+                        }
+
+                        @Override
+                        public void close() throws IOException {
+                            mgr.close();
+                        }
+
+                        @Override
+                        public void removeIndex(File indexDirectory) {
+                            mgr.removeIndex(indexDirectory);
+                        }
+
+                        @Override
+                        public void returnIndexSearcher(File indexDirectory, IndexSearcher searcher) {
+                            mgr.returnIndexSearcher(indexDirectory, searcher);
+                        }
+
+                        @Override
+                        public void returnIndexWriter(File indexingDirectory, IndexWriter writer) {
+                            mgr.returnIndexWriter(indexingDirectory, writer);
+                        }
+                    };
+                }
+
+                return wrappedManager;
+            }
+        };
+
+        repo.initialize(getEventReporter());
+
+        final String uuid = "10000000-0000-0000-0000-000000000000";
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("abc", "xyz");
+        attributes.put("xyz", "abc");
+        attributes.put("filename", "file-" + uuid);
+
+        final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
+        builder.setEventTime(System.currentTimeMillis());
+        builder.setEventType(ProvenanceEventType.RECEIVE);
+        builder.setTransitUri("nifi://unit-test");
+        attributes.put("uuid", uuid);
+        builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
+        builder.setComponentId("1234");
+        builder.setComponentType("dummy processor");
+
+        for (int i = 0; i < 10; i++) {
+            builder.fromFlowFile(createFlowFile(i, 3000L, attributes));
+            attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + i);
+            repo.registerEvent(builder.build());
+        }
+
+        repo.waitForRollover();
+
+        // Perform a query. This will ensure that an IndexSearcher is created and cached.
+        final Query query = new Query(UUID.randomUUID().toString());
+        query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.Filename, "file-*"));
+        query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, "12?4"));
+        query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.TransitURI, "nifi://*"));
+        query.setMaxResults(100);
+
+        // Run a query in a background thread. When this thread goes to obtain the IndexSearcher, it will have a 5 second delay.
+        // That delay will occur as the main thread is updating the index. This should result in the search creating a new Index Reader
+        // that can properly query the index.
+        final int numThreads = 2;
+        final CountDownLatch performSearchLatch = new CountDownLatch(numThreads);
+        final Runnable searchRunnable = new Runnable() {
+            @Override
+            public void run() {
+                QueryResult result;
+                try {
+                    result = repo.queryEvents(query);
+                } catch (IOException e) {
+                    e.printStackTrace();
+                    Assert.fail(e.toString());
+                    return;
+                }
+
+                System.out.println("Finished search: " + result);
+                performSearchLatch.countDown();
+            }
+        };
+
+        // Kick off the searcher threads
+        for (int i = 0; i < numThreads; i++) {
+            final Thread searchThread = new Thread(searchRunnable);
+            searchThread.start();
+        }
+
+        // Wait until we've obtained the Index Searchers before modifying the index.
+        obtainIndexSearcherLatch.await();
+
+        // add more events to the repo
+        for (int i = 0; i < 10; i++) {
+            builder.fromFlowFile(createFlowFile(i, 3000L, attributes));
+            attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + i);
+            repo.registerEvent(builder.build());
+        }
+
+        // Force a rollover to occur. This will modify the index.
+        repo.rolloverWithLock(true);
+
+        // Wait for the repository to roll over.
+        repo.waitForRollover();
+
+        // Wait for the searches to complete.
+        performSearchLatch.await();
+    }
+
     @Test
     public void testIndexAndCompressOnRolloverAndSubsequentSearchMultipleStorageDirs() throws IOException, InterruptedException, ParseException {
         final RepositoryConfiguration config = createConfiguration();