You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by js...@apache.org on 2016/10/06 17:19:07 UTC
[3/3] nifi git commit: NIFI-2452: Ensure that we do not close Index
Readers that are still in use
NIFI-2452: Ensure that we do not close Index Readers that are still in use
Signed-off-by: Joe Skora <js...@apache.org>
This closes #1072.
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/db189e3b
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/db189e3b
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/db189e3b
Branch: refs/heads/0.x
Commit: db189e3b3df8d2d6f67e2548215a2b775b47cfb0
Parents: 83fdced
Author: Mark Payne <ma...@hotmail.com>
Authored: Mon Aug 1 14:51:02 2016 -0400
Committer: Joe Skora <js...@apache.org>
Committed: Thu Oct 6 13:17:44 2016 -0400
----------------------------------------------------------------------
.../nifi/provenance/IndexConfiguration.java | 3 +-
.../PersistentProvenanceRepository.java | 20 ++-
.../nifi/provenance/lucene/IndexManager.java | 55 +++++--
.../nifi/provenance/lucene/IndexSearch.java | 3 +-
.../nifi/provenance/lucene/LineageQuery.java | 1 +
.../TestPersistentProvenanceRepository.java | 163 +++++++++++++++++++
6 files changed, 223 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/db189e3b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java
index 4e80811..af7bff5 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java
@@ -212,13 +212,14 @@ public class IndexConfiguration {
final List<File> dirs = new ArrayList<>();
lock.lock();
try {
+ // Sort directories so that we return the newest index first
final List<File> sortedIndexDirectories = getIndexDirectories();
Collections.sort(sortedIndexDirectories, new Comparator<File>() {
@Override
public int compare(final File o1, final File o2) {
final long epochTimestamp1 = getIndexStartTime(o1);
final long epochTimestamp2 = getIndexStartTime(o2);
- return Long.compare(epochTimestamp1, epochTimestamp2);
+ return Long.compare(epochTimestamp2, epochTimestamp1);
}
});
http://git-wip-us.apache.org/repos/asf/nifi/blob/db189e3b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
index d7ab2d7..8b971b5 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
@@ -207,6 +207,10 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
rolloverExecutor = Executors.newScheduledThreadPool(numRolloverThreads, new NamedThreadFactory("Provenance Repository Rollover Thread"));
}
+ protected IndexManager getIndexManager() {
+ return indexManager;
+ }
+
@Override
public void initialize(final EventReporter eventReporter) throws IOException {
writeLock.lock();
@@ -641,7 +645,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
rolloverExecutor.shutdownNow();
queryExecService.shutdownNow();
- indexManager.close();
+ getIndexManager().close();
if ( writers != null ) {
for (final RecordWriter writer : writers) {
@@ -1006,7 +1010,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
// we can safely delete the first index because the latest event in the index is an event
// that has already been expired from the repository.
final File indexingDirectory = indexDirs.get(0);
- indexManager.removeIndex(indexingDirectory);
+ getIndexManager().removeIndex(indexingDirectory);
indexConfig.removeIndexDirectory(indexingDirectory);
deleteDirectory(indexingDirectory);
@@ -1474,7 +1478,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
logger.warn("Merged Journal File {} already exists; however, all partial journal files also exist "
+ "so assuming that the merge did not finish. Repeating procedure in order to ensure consistency.");
- final DeleteIndexAction deleteAction = new DeleteIndexAction(this, indexConfig, indexManager);
+ final DeleteIndexAction deleteAction = new DeleteIndexAction(this, indexConfig, getIndexManager());
try {
deleteAction.execute(suggestedMergeFile);
} catch (final Exception e) {
@@ -1610,7 +1614,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
final AtomicBoolean finishedAdding = new AtomicBoolean(false);
final List<Future<?>> futures = new ArrayList<>();
- final IndexWriter indexWriter = indexManager.borrowIndexWriter(indexingDirectory);
+ final IndexWriter indexWriter = getIndexManager().borrowIndexWriter(indexingDirectory);
try {
final ExecutorService exec = Executors.newFixedThreadPool(configuration.getIndexThreadPoolSize(), new ThreadFactory() {
@Override
@@ -1733,7 +1737,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
}
}
} finally {
- indexManager.returnIndexWriter(indexingDirectory, indexWriter);
+ getIndexManager().returnIndexWriter(indexingDirectory, indexWriter);
}
indexConfig.setMaxIdIndexed(maxId);
@@ -1934,7 +1938,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
* @return an Iterator of ProvenanceEventRecord that match the query
* @throws IOException if unable to perform the query
*/
- public Iterator<ProvenanceEventRecord> queryLucene(final org.apache.lucene.search.Query luceneQuery) throws IOException {
+ Iterator<ProvenanceEventRecord> queryLucene(final org.apache.lucene.search.Query luceneQuery) throws IOException {
final List<File> indexFiles = indexConfig.getIndexDirectories();
final AtomicLong hits = new AtomicLong(0L);
@@ -2366,7 +2370,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
@Override
public void run() {
try {
- final IndexSearch search = new IndexSearch(PersistentProvenanceRepository.this, indexDir, indexManager, maxAttributeChars);
+ final IndexSearch search = new IndexSearch(PersistentProvenanceRepository.this, indexDir, getIndexManager(), maxAttributeChars);
final StandardQueryResult queryResult = search.search(query, retrievalCount, firstEventTimestamp);
submission.getResult().update(queryResult.getMatchingEvents(), queryResult.getTotalHitCount());
if (queryResult.isFinished()) {
@@ -2408,7 +2412,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
try {
final Set<ProvenanceEventRecord> matchingRecords = LineageQuery.computeLineageForFlowFiles(PersistentProvenanceRepository.this,
- indexManager, indexDir, null, flowFileUuids, maxAttributeChars);
+ getIndexManager(), indexDir, null, flowFileUuids, maxAttributeChars);
final StandardLineageResult result = submission.getResult();
result.update(matchingRecords);
http://git-wip-us.apache.org/repos/asf/nifi/blob/db189e3b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
index 57d0d78..07cd190 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
@@ -26,6 +26,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -86,7 +87,7 @@ public class IndexManager implements Closeable {
public IndexWriter borrowIndexWriter(final File indexingDirectory) throws IOException {
final File absoluteFile = indexingDirectory.getAbsoluteFile();
- logger.debug("Borrowing index writer for {}", indexingDirectory);
+ logger.trace("Borrowing index writer for {}", indexingDirectory);
lock.lock();
try {
@@ -124,6 +125,7 @@ public class IndexManager implements Closeable {
final List<ActiveIndexSearcher> searchers = activeSearchers.get(absoluteFile);
if ( searchers != null ) {
for (final ActiveIndexSearcher activeSearcher : searchers) {
+ logger.debug("Poisoning {} because it is searching {}, which is getting updated", activeSearcher, indexingDirectory);
activeSearcher.poison();
}
}
@@ -141,7 +143,7 @@ public class IndexManager implements Closeable {
public void returnIndexWriter(final File indexingDirectory, final IndexWriter writer) {
final File absoluteFile = indexingDirectory.getAbsoluteFile();
- logger.debug("Returning Index Writer for {} to IndexManager", indexingDirectory);
+ logger.trace("Returning Index Writer for {} to IndexManager", indexingDirectory);
lock.lock();
try {
@@ -154,7 +156,7 @@ public class IndexManager implements Closeable {
writer.close();
} else if ( count.getCount() <= 1 ) {
// we are finished with this writer.
- logger.debug("Closing Index Writer for {}", indexingDirectory);
+ logger.debug("Decrementing count for Index Writer for {} to {}; Closing writer", indexingDirectory, count.getCount() - 1);
count.close();
} else {
// decrement the count.
@@ -175,7 +177,7 @@ public class IndexManager implements Closeable {
public IndexSearcher borrowIndexSearcher(final File indexDir) throws IOException {
final File absoluteFile = indexDir.getAbsoluteFile();
- logger.debug("Borrowing index searcher for {}", indexDir);
+ logger.trace("Borrowing index searcher for {}", indexDir);
lock.lock();
try {
@@ -210,7 +212,8 @@ public class IndexManager implements Closeable {
continue;
}
- logger.debug("Providing previously cached index searcher for {}", indexDir);
+ final int referenceCount = searcher.incrementReferenceCount();
+ logger.debug("Providing previously cached index searcher for {} and incrementing Reference Count to {}", indexDir, referenceCount);
return searcher.getSearcher();
}
}
@@ -219,7 +222,9 @@ public class IndexManager implements Closeable {
// from the cache so that we don't try to use them again later.
for ( final ActiveIndexSearcher searcher : expired ) {
try {
+ logger.debug("Closing {}", searcher);
searcher.close();
+ logger.trace("Closed {}", searcher);
} catch (final Exception e) {
logger.debug("Failed to close 'expired' IndexSearcher {}", searcher);
}
@@ -239,11 +244,14 @@ public class IndexManager implements Closeable {
final IndexSearcher searcher = new IndexSearcher(directoryReader);
// we want to cache the searcher that we create, since it's just a reader.
- final ActiveIndexSearcher cached = new ActiveIndexSearcher(searcher, directoryReader, directory, true);
+ final ActiveIndexSearcher cached = new ActiveIndexSearcher(searcher, absoluteFile, directoryReader, directory, true);
currentlyCached.add(cached);
return cached.getSearcher();
} catch (final IOException e) {
+ logger.error("Failed to create Index Searcher for {} due to {}", absoluteFile, e.toString());
+ logger.error("", e);
+
try {
directory.close();
} catch (final IOException ioe) {
@@ -269,7 +277,7 @@ public class IndexManager implements Closeable {
// we don't want to cache this searcher because it's based on a writer, so we want to get
// new values the next time that we search.
- final ActiveIndexSearcher activeSearcher = new ActiveIndexSearcher(searcher, directoryReader, null, false);
+ final ActiveIndexSearcher activeSearcher = new ActiveIndexSearcher(searcher, absoluteFile, directoryReader, null, false);
currentlyCached.add(activeSearcher);
return activeSearcher.getSearcher();
@@ -282,7 +290,7 @@ public class IndexManager implements Closeable {
public void returnIndexSearcher(final File indexDirectory, final IndexSearcher searcher) {
final File absoluteFile = indexDirectory.getAbsoluteFile();
- logger.debug("Returning index searcher for {} to IndexManager", indexDirectory);
+ logger.trace("Returning index searcher for {} to IndexManager", indexDirectory);
lock.lock();
try {
@@ -318,7 +326,8 @@ public class IndexManager implements Closeable {
return;
} else {
// the searcher is cached. Just leave it open.
- logger.debug("Index searcher for {} is cached; leaving open", indexDirectory);
+ final int refCount = activeSearcher.decrementReferenceCount();
+ logger.debug("Index searcher for {} is cached; leaving open with reference count of {}", indexDirectory, refCount);
return;
}
} else {
@@ -439,14 +448,17 @@ public class IndexManager implements Closeable {
private static class ActiveIndexSearcher implements Closeable {
private final IndexSearcher searcher;
private final DirectoryReader directoryReader;
+ private final File indexDirectory;
private final Directory directory;
private final boolean cache;
- private boolean poisoned = false;
+ private final AtomicInteger referenceCount = new AtomicInteger(1);
+ private volatile boolean poisoned = false;
- public ActiveIndexSearcher(final IndexSearcher searcher, final DirectoryReader directoryReader,
+ public ActiveIndexSearcher(final IndexSearcher searcher, final File indexDirectory, final DirectoryReader directoryReader,
final Directory directory, final boolean cache) {
this.searcher = searcher;
this.directoryReader = directoryReader;
+ this.indexDirectory = indexDirectory;
this.directory = directory;
this.cache = cache;
}
@@ -467,9 +479,28 @@ public class IndexManager implements Closeable {
this.poisoned = true;
}
+ public int incrementReferenceCount() {
+ return referenceCount.incrementAndGet();
+ }
+
+ public int decrementReferenceCount() {
+ return referenceCount.decrementAndGet();
+ }
+
@Override
public void close() throws IOException {
- IndexManager.close(directoryReader, directory);
+ final int updatedRefCount = referenceCount.decrementAndGet();
+ if (updatedRefCount <= 0) {
+ logger.debug("Decremented Reference Count for {} to {}; closing underlying directory reader", this, updatedRefCount);
+ IndexManager.close(directoryReader, directory);
+ } else {
+ logger.debug("Decremented Reference Count for {} to {}; leaving underlying directory reader open", this, updatedRefCount);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "ActiveIndexSearcher[directory=" + indexDirectory + ", cached=" + cache + ", poisoned=" + poisoned + "]";
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/db189e3b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
index b8661df..03a8df9 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
@@ -77,11 +77,12 @@ public class IndexSearch {
final long searchStartNanos = System.nanoTime();
final long openSearcherNanos = searchStartNanos - start;
+ logger.debug("Searching {} for {}", this, provenanceQuery);
final TopDocs topDocs = searcher.search(luceneQuery, provenanceQuery.getMaxResults());
final long finishSearch = System.nanoTime();
final long searchNanos = finishSearch - searchStartNanos;
- logger.debug("Searching {} took {} millis; opening searcher took {} millis", this,
+ logger.debug("Searching {} for {} took {} millis; opening searcher took {} millis", this, provenanceQuery,
TimeUnit.NANOSECONDS.toMillis(searchNanos), TimeUnit.NANOSECONDS.toMillis(openSearcherNanos));
if (topDocs.totalHits == 0) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/db189e3b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java
index ce60e03..af738cd 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java
@@ -90,6 +90,7 @@ public class LineageQuery {
}
final long searchStart = System.nanoTime();
+ logger.debug("Searching {} for {}", indexDirectory, query);
final TopDocs uuidQueryTopDocs = searcher.search(query, MAX_QUERY_RESULTS);
final long searchEnd = System.nanoTime();
http://git-wip-us.apache.org/repos/asf/nifi/blob/db189e3b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
index faaef87..0aa0d0f 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
@@ -33,6 +33,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -59,6 +60,7 @@ import org.apache.nifi.provenance.lineage.Lineage;
import org.apache.nifi.provenance.lineage.LineageEdge;
import org.apache.nifi.provenance.lineage.LineageNode;
import org.apache.nifi.provenance.lineage.LineageNodeType;
+import org.apache.nifi.provenance.lucene.IndexManager;
import org.apache.nifi.provenance.lucene.IndexingAction;
import org.apache.nifi.provenance.search.Query;
import org.apache.nifi.provenance.search.QueryResult;
@@ -82,6 +84,8 @@ import org.junit.rules.TestName;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class TestPersistentProvenanceRepository {
@@ -522,6 +526,165 @@ public class TestPersistentProvenanceRepository {
assertTrue(newRecordSet.getMatchingEvents().isEmpty());
}
+ // TODO: Switch to 10,000.
+ @Test(timeout = 1000000)
+ public void testModifyIndexWhileSearching() throws IOException, InterruptedException, ParseException {
+ final RepositoryConfiguration config = createConfiguration();
+ config.setMaxRecordLife(30, TimeUnit.SECONDS);
+ config.setMaxStorageCapacity(1024L * 1024L * 10);
+ config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS);
+ config.setMaxEventFileCapacity(1024L * 1024L * 10);
+ config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields()));
+
+ final CountDownLatch obtainIndexSearcherLatch = new CountDownLatch(2);
+ repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS) {
+ private IndexManager wrappedManager = null;
+
+ // Create an IndexManager that adds a delay before returning the Index Searcher.
+ @Override
+ protected synchronized IndexManager getIndexManager() {
+ if (wrappedManager == null) {
+ final IndexManager mgr = super.getIndexManager();
+ final Logger logger = LoggerFactory.getLogger("IndexManager");
+
+ wrappedManager = new IndexManager() {
+ final AtomicInteger indexSearcherCount = new AtomicInteger(0);
+
+ @Override
+ public IndexSearcher borrowIndexSearcher(File indexDir) throws IOException {
+ final IndexSearcher searcher = mgr.borrowIndexSearcher(indexDir);
+ final int idx = indexSearcherCount.incrementAndGet();
+ obtainIndexSearcherLatch.countDown();
+
+ // The first searcher should sleep for 3 seconds. The second searcher should
+ // sleep for 5 seconds. This allows us to have two threads each obtain a Searcher
+ // and then have one of them finish searching and close the searcher if it's poisoned while the
+ // second thread is still holding the searcher
+ try {
+ if (idx == 1) {
+ Thread.sleep(3000L);
+ } else {
+ Thread.sleep(5000L);
+ }
+ } catch (InterruptedException e) {
+ throw new IOException("Interrupted", e);
+ }
+
+ logger.info("Releasing index searcher");
+ return searcher;
+ }
+
+ @Override
+ public IndexWriter borrowIndexWriter(File indexingDirectory) throws IOException {
+ return mgr.borrowIndexWriter(indexingDirectory);
+ }
+
+ @Override
+ public void close() throws IOException {
+ mgr.close();
+ }
+
+ @Override
+ public void removeIndex(File indexDirectory) {
+ mgr.removeIndex(indexDirectory);
+ }
+
+ @Override
+ public void returnIndexSearcher(File indexDirectory, IndexSearcher searcher) {
+ mgr.returnIndexSearcher(indexDirectory, searcher);
+ }
+
+ @Override
+ public void returnIndexWriter(File indexingDirectory, IndexWriter writer) {
+ mgr.returnIndexWriter(indexingDirectory, writer);
+ }
+ };
+ }
+
+ return wrappedManager;
+ }
+ };
+
+ repo.initialize(getEventReporter());
+
+ final String uuid = "10000000-0000-0000-0000-000000000000";
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put("abc", "xyz");
+ attributes.put("xyz", "abc");
+ attributes.put("filename", "file-" + uuid);
+
+ final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
+ builder.setEventTime(System.currentTimeMillis());
+ builder.setEventType(ProvenanceEventType.RECEIVE);
+ builder.setTransitUri("nifi://unit-test");
+ attributes.put("uuid", uuid);
+ builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
+ builder.setComponentId("1234");
+ builder.setComponentType("dummy processor");
+
+ for (int i = 0; i < 10; i++) {
+ builder.fromFlowFile(createFlowFile(i, 3000L, attributes));
+ attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + i);
+ repo.registerEvent(builder.build());
+ }
+
+ repo.waitForRollover();
+
+ // Perform a query. This will ensure that an IndexSearcher is created and cached.
+ final Query query = new Query(UUID.randomUUID().toString());
+ query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.Filename, "file-*"));
+ query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, "12?4"));
+ query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.TransitURI, "nifi://*"));
+ query.setMaxResults(100);
+
+ // Run a query in a background thread. When this thread goes to obtain the IndexSearcher, it will have a 5 second delay.
+ // That delay will occur as the main thread is updating the index. This should result in the search creating a new Index Reader
+ // that can properly query the index.
+ final int numThreads = 2;
+ final CountDownLatch performSearchLatch = new CountDownLatch(numThreads);
+ final Runnable searchRunnable = new Runnable() {
+ @Override
+ public void run() {
+ QueryResult result;
+ try {
+ result = repo.queryEvents(query);
+ } catch (IOException e) {
+ e.printStackTrace();
+ Assert.fail(e.toString());
+ return;
+ }
+
+ System.out.println("Finished search: " + result);
+ performSearchLatch.countDown();
+ }
+ };
+
+ // Kick off the searcher threads
+ for (int i = 0; i < numThreads; i++) {
+ final Thread searchThread = new Thread(searchRunnable);
+ searchThread.start();
+ }
+
+ // Wait until we've obtained the Index Searchers before modifying the index.
+ obtainIndexSearcherLatch.await();
+
+ // add more events to the repo
+ for (int i = 0; i < 10; i++) {
+ builder.fromFlowFile(createFlowFile(i, 3000L, attributes));
+ attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + i);
+ repo.registerEvent(builder.build());
+ }
+
+ // Force a rollover to occur. This will modify the index.
+ repo.rolloverWithLock(true);
+
+ // Wait for the repository to roll over.
+ repo.waitForRollover();
+
+ // Wait for the searches to complete.
+ performSearchLatch.await();
+ }
+
@Test
public void testIndexAndCompressOnRolloverAndSubsequentSearchMultipleStorageDirs() throws IOException, InterruptedException, ParseException {
final RepositoryConfiguration config = createConfiguration();