You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2015/04/28 16:04:55 UTC

[21/50] [abbrv] incubator-nifi git commit: NIFI-527: Code cleanup

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/expiration/ExpirationAction.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/expiration/ExpirationAction.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/expiration/ExpirationAction.java
index 8c266d1..0ffa5e6 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/expiration/ExpirationAction.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/expiration/ExpirationAction.java
@@ -25,9 +25,9 @@ public interface ExpirationAction {
      * Performs some action against the given File and returns the new File that
      * contains the modified version
      *
-     * @param expiredFile
-     * @return
-     * @throws IOException
+     * @param expiredFile the file that was expired
+     * @return the new file after the file has been renamed, or the expiredFile if the file was not renamed
+     * @throws IOException if there was an IO problem
      */
     File execute(File expiredFile) throws IOException;
 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java
index 7db04aa..70bf36e 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java
@@ -49,9 +49,9 @@ public class DeleteIndexAction implements ExpirationAction {
         long numDeleted = 0;
         long maxEventId = -1L;
         try (final RecordReader reader = RecordReaders.newRecordReader(expiredFile, repository.getAllLogFiles())) {
-        	maxEventId = reader.getMaxEventId();
+            maxEventId = reader.getMaxEventId();
         } catch (final IOException ioe) {
-        	logger.warn("Failed to obtain max ID present in journal file {}", expiredFile.getAbsolutePath());
+            logger.warn("Failed to obtain max ID present in journal file {}", expiredFile.getAbsolutePath());
         }
 
         // remove the records from the index
@@ -68,19 +68,19 @@ public class DeleteIndexAction implements ExpirationAction {
                 deleteDir = (docsLeft <= 0);
                 logger.debug("After expiring {}, there are {} docs left for index {}", expiredFile, docsLeft, indexingDirectory);
             } finally {
-            	indexManager.returnIndexWriter(indexingDirectory, writer);
+                indexManager.returnIndexWriter(indexingDirectory, writer);
             }
 
             // we've confirmed that all documents have been removed. Delete the index directory.
             if (deleteDir) {
-            	indexManager.removeIndex(indexingDirectory);
+                indexManager.removeIndex(indexingDirectory);
                 indexConfiguration.removeIndexDirectory(indexingDirectory);
-                
+
                 deleteDirectory(indexingDirectory);
                 logger.info("Removed empty index directory {}", indexingDirectory);
             }
         }
-        
+
         // Update the minimum index to 1 more than the max Event ID in this file.
         if (maxEventId > -1L) {
             indexConfiguration.setMinIdIndexed(maxEventId + 1L);

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
index 5a77f42..98137fb 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
@@ -45,12 +45,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class DocsReader {
-	private final Logger logger = LoggerFactory.getLogger(DocsReader.class);
-	
+    private final Logger logger = LoggerFactory.getLogger(DocsReader.class);
+
     public DocsReader(final List<File> storageDirectories) {
     }
 
-    public Set<ProvenanceEventRecord> read(final TopDocs topDocs, final IndexReader indexReader, final Collection<Path> allProvenanceLogFiles, final AtomicInteger retrievalCount, final int maxResults) throws IOException {
+    public Set<ProvenanceEventRecord> read(final TopDocs topDocs, final IndexReader indexReader, final Collection<Path> allProvenanceLogFiles,
+            final AtomicInteger retrievalCount, final int maxResults) throws IOException {
         if (retrievalCount.get() >= maxResults) {
             return Collections.emptySet();
         }
@@ -73,42 +74,42 @@ public class DocsReader {
         return read(docs, allProvenanceLogFiles);
     }
 
-    
+
     private long getByteOffset(final Document d, final RecordReader reader) {
         final IndexableField blockField = d.getField(FieldNames.BLOCK_INDEX);
         if ( blockField != null ) {
-        	final int blockIndex = blockField.numericValue().intValue();
-        	final TocReader tocReader = reader.getTocReader();
-        	return tocReader.getBlockOffset(blockIndex);
+            final int blockIndex = blockField.numericValue().intValue();
+            final TocReader tocReader = reader.getTocReader();
+            return tocReader.getBlockOffset(blockIndex);
         }
-        
-    	return d.getField(FieldNames.STORAGE_FILE_OFFSET).numericValue().longValue();
+
+        return d.getField(FieldNames.STORAGE_FILE_OFFSET).numericValue().longValue();
     }
-    
-    
+
+
     private ProvenanceEventRecord getRecord(final Document d, final RecordReader reader) throws IOException {
-    	IndexableField blockField = d.getField(FieldNames.BLOCK_INDEX);
-    	if ( blockField == null ) {
-    		reader.skipTo(getByteOffset(d, reader));
-    	} else {
-    		reader.skipToBlock(blockField.numericValue().intValue());
-    	}
-    	
+        IndexableField blockField = d.getField(FieldNames.BLOCK_INDEX);
+        if ( blockField == null ) {
+            reader.skipTo(getByteOffset(d, reader));
+        } else {
+            reader.skipToBlock(blockField.numericValue().intValue());
+        }
+
         StandardProvenanceEventRecord record;
         while ( (record = reader.nextRecord()) != null) {
-        	IndexableField idField = d.getField(SearchableFields.Identifier.getSearchableFieldName());
-        	if ( idField == null || idField.numericValue().longValue() == record.getEventId() ) {
-        		break;
-        	}
+            IndexableField idField = d.getField(SearchableFields.Identifier.getSearchableFieldName());
+            if ( idField == null || idField.numericValue().longValue() == record.getEventId() ) {
+                break;
+            }
         }
-        
+
         if ( record == null ) {
-        	throw new IOException("Failed to find Provenance Event " + d);
+            throw new IOException("Failed to find Provenance Event " + d);
         } else {
-        	return record;
+            return record;
         }
     }
-    
+
 
     public Set<ProvenanceEventRecord> read(final List<Document> docs, final Collection<Path> allProvenanceLogFiles) throws IOException {
         LuceneUtil.sortDocsForRetrieval(docs);
@@ -119,23 +120,23 @@ public class DocsReader {
 
         final long start = System.nanoTime();
         int logFileCount = 0;
-        
+
         final Set<String> storageFilesToSkip = new HashSet<>();
-        
+
         try {
             for (final Document d : docs) {
                 final String storageFilename = d.getField(FieldNames.STORAGE_FILENAME).stringValue();
                 if ( storageFilesToSkip.contains(storageFilename) ) {
-                	continue;
+                    continue;
                 }
-                
+
                 try {
                     if (reader != null && storageFilename.equals(lastStorageFilename)) {
-                       	matchingRecords.add(getRecord(d, reader));
+                        matchingRecords.add(getRecord(d, reader));
                     } else {
-                    	logger.debug("Opening log file {}", storageFilename);
-                    	
-                    	logFileCount++;
+                        logger.debug("Opening log file {}", storageFilename);
+
+                        logFileCount++;
                         if (reader != null) {
                             reader.close();
                         }
@@ -143,20 +144,20 @@ public class DocsReader {
                         List<File> potentialFiles = LuceneUtil.getProvenanceLogFiles(storageFilename, allProvenanceLogFiles);
                         if (potentialFiles.isEmpty()) {
                             logger.warn("Could not find Provenance Log File with basename {} in the "
-                            		+ "Provenance Repository; assuming file has expired and continuing without it", storageFilename);
+                                    + "Provenance Repository; assuming file has expired and continuing without it", storageFilename);
                             storageFilesToSkip.add(storageFilename);
                             continue;
                         }
 
                         if (potentialFiles.size() > 1) {
-                            throw new FileNotFoundException("Found multiple Provenance Log Files with basename " + 
-                            		storageFilename + " in the Provenance Repository");
+                            throw new FileNotFoundException("Found multiple Provenance Log Files with basename " +
+                                    storageFilename + " in the Provenance Repository");
                         }
 
                         for (final File file : potentialFiles) {
                             try {
-                            	reader = RecordReaders.newRecordReader(file, allProvenanceLogFiles);
-                               	matchingRecords.add(getRecord(d, reader));
+                                reader = RecordReaders.newRecordReader(file, allProvenanceLogFiles);
+                                matchingRecords.add(getRecord(d, reader));
                             } catch (final IOException e) {
                                 throw new IOException("Failed to retrieve record " + d + " from Provenance File " + file + " due to " + e, e);
                             }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
index 3943504..9c3ec31 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
@@ -41,65 +41,65 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class IndexManager implements Closeable {
-	private static final Logger logger = LoggerFactory.getLogger(IndexManager.class);
-	
-	private final Lock lock = new ReentrantLock();
-	private final Map<File, IndexWriterCount> writerCounts = new HashMap<>();
-	private final Map<File, List<ActiveIndexSearcher>> activeSearchers = new HashMap<>();
-	
-	
-	public void removeIndex(final File indexDirectory) {
-		final File absoluteFile = indexDirectory.getAbsoluteFile();
-		logger.info("Removing index {}", indexDirectory);
-		
-		lock.lock();
-		try {
-			final IndexWriterCount count = writerCounts.remove(absoluteFile);
-			if ( count != null ) {
-				try {
-					count.close();
-				} catch (final IOException ioe) {
-					logger.warn("Failed to close Index Writer {} for {}", count.getWriter(), absoluteFile);
-					if ( logger.isDebugEnabled() ) {
-						logger.warn("", ioe);
-					}
-				}
-			}
-			
-			for ( final List<ActiveIndexSearcher> searcherList : activeSearchers.values() ) {
-				for ( final ActiveIndexSearcher searcher : searcherList ) {
-					try {
-						searcher.close();
-					} catch (final IOException ioe) {
-						logger.warn("Failed to close Index Searcher {} for {} due to {}", 
-								searcher.getSearcher(), absoluteFile, ioe);
-						if ( logger.isDebugEnabled() ) {
-							logger.warn("", ioe);
-						}
-					}
-				}
-			}
-		} finally {
-			lock.unlock();
-		}
-	}
-	
-	public IndexWriter borrowIndexWriter(final File indexingDirectory) throws IOException {
-		final File absoluteFile = indexingDirectory.getAbsoluteFile();
-		logger.debug("Borrowing index writer for {}", indexingDirectory);
-		
-		lock.lock();
-		try {
-			IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
-			if ( writerCount == null ) {
-				final List<Closeable> closeables = new ArrayList<>();
+    private static final Logger logger = LoggerFactory.getLogger(IndexManager.class);
+
+    private final Lock lock = new ReentrantLock();
+    private final Map<File, IndexWriterCount> writerCounts = new HashMap<>();
+    private final Map<File, List<ActiveIndexSearcher>> activeSearchers = new HashMap<>();
+
+
+    public void removeIndex(final File indexDirectory) {
+        final File absoluteFile = indexDirectory.getAbsoluteFile();
+        logger.info("Removing index {}", indexDirectory);
+
+        lock.lock();
+        try {
+            final IndexWriterCount count = writerCounts.remove(absoluteFile);
+            if ( count != null ) {
+                try {
+                    count.close();
+                } catch (final IOException ioe) {
+                    logger.warn("Failed to close Index Writer {} for {}", count.getWriter(), absoluteFile);
+                    if ( logger.isDebugEnabled() ) {
+                        logger.warn("", ioe);
+                    }
+                }
+            }
+
+            for ( final List<ActiveIndexSearcher> searcherList : activeSearchers.values() ) {
+                for ( final ActiveIndexSearcher searcher : searcherList ) {
+                    try {
+                        searcher.close();
+                    } catch (final IOException ioe) {
+                        logger.warn("Failed to close Index Searcher {} for {} due to {}",
+                                searcher.getSearcher(), absoluteFile, ioe);
+                        if ( logger.isDebugEnabled() ) {
+                            logger.warn("", ioe);
+                        }
+                    }
+                }
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public IndexWriter borrowIndexWriter(final File indexingDirectory) throws IOException {
+        final File absoluteFile = indexingDirectory.getAbsoluteFile();
+        logger.debug("Borrowing index writer for {}", indexingDirectory);
+
+        lock.lock();
+        try {
+            IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
+            if ( writerCount == null ) {
+                final List<Closeable> closeables = new ArrayList<>();
                 final Directory directory = FSDirectory.open(indexingDirectory);
                 closeables.add(directory);
-                
+
                 try {
-                	final Analyzer analyzer = new StandardAnalyzer();
-                	closeables.add(analyzer);
-                	
+                    final Analyzer analyzer = new StandardAnalyzer();
+                    closeables.add(analyzer);
+
                     final IndexWriterConfig config = new IndexWriterConfig(LuceneUtil.LUCENE_VERSION, analyzer);
                     config.setWriteLockTimeout(300000L);
 
@@ -107,361 +107,361 @@ public class IndexManager implements Closeable {
                     writerCount = new IndexWriterCount(indexWriter, analyzer, directory, 1);
                     logger.debug("Providing new index writer for {}", indexingDirectory);
                 } catch (final IOException ioe) {
-                	for ( final Closeable closeable : closeables ) {
-                		try {
-                			closeable.close();
-                		} catch (final IOException ioe2) {
-                			ioe.addSuppressed(ioe2);
-                		}
-                	}
-                	
-                	throw ioe;
+                    for ( final Closeable closeable : closeables ) {
+                        try {
+                            closeable.close();
+                        } catch (final IOException ioe2) {
+                            ioe.addSuppressed(ioe2);
+                        }
+                    }
+
+                    throw ioe;
                 }
-                
+
                 writerCounts.put(absoluteFile, writerCount);
-			} else {
-				logger.debug("Providing existing index writer for {} and incrementing count to {}", indexingDirectory, writerCount.getCount() + 1);
-				writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
-						writerCount.getAnalyzer(), writerCount.getDirectory(), writerCount.getCount() + 1));
-			}
-			
-			return writerCount.getWriter();
-		} finally {
-			lock.unlock();
-		}
-	}
-	
-	public void returnIndexWriter(final File indexingDirectory, final IndexWriter writer) {
-		final File absoluteFile = indexingDirectory.getAbsoluteFile();
-		logger.debug("Returning Index Writer for {} to IndexManager", indexingDirectory);
-		
-		lock.lock();
-		try {
-			IndexWriterCount count = writerCounts.remove(absoluteFile);
-			
-			try {
-				if ( count == null ) {
-					logger.warn("Index Writer {} was returned to IndexManager for {}, but this writer is not known. "
-							+ "This could potentially lead to a resource leak", writer, indexingDirectory);
-					writer.close();
-				} else if ( count.getCount() <= 1 ) {
-					// we are finished with this writer.
-					logger.debug("Closing Index Writer for {}", indexingDirectory);
-					count.close();
-				} else {
-					// decrement the count.
-					logger.debug("Decrementing count for Index Writer for {} to {}", indexingDirectory, count.getCount() - 1);
-					writerCounts.put(absoluteFile, new IndexWriterCount(count.getWriter(), count.getAnalyzer(), count.getDirectory(), count.getCount() - 1));
-				}
-			} catch (final IOException ioe) {
-				logger.warn("Failed to close Index Writer {} due to {}", writer, ioe);
-				if ( logger.isDebugEnabled() ) {
-					logger.warn("", ioe);
-				}
-			}
-		} finally {
-			lock.unlock();
-		}
-	}
-
-	
-	public IndexSearcher borrowIndexSearcher(final File indexDir) throws IOException {
-		final File absoluteFile = indexDir.getAbsoluteFile();
-		logger.debug("Borrowing index searcher for {}", indexDir);
-		
-		lock.lock();
-		try {
-			// check if we already have a reader cached.
-			List<ActiveIndexSearcher> currentlyCached = activeSearchers.get(absoluteFile);
-			if ( currentlyCached == null ) {
-				currentlyCached = new ArrayList<>();
-				activeSearchers.put(absoluteFile, currentlyCached);
-			} else {
-				// keep track of any searchers that have been closed so that we can remove them
-				// from our cache later.
-				final Set<ActiveIndexSearcher> expired = new HashSet<>();
-				
-				try {
-					for ( final ActiveIndexSearcher searcher : currentlyCached ) {
-						if ( searcher.isCache() ) {
-							final int refCount = searcher.getSearcher().getIndexReader().getRefCount();
-							if ( refCount <= 0 ) {
-								// if refCount == 0, then the reader has been closed, so we need to discard the searcher
-								logger.debug("Reference count for cached Index Searcher for {} is currently {}; "
-									+ "removing cached searcher", absoluteFile, refCount);
-								expired.add(searcher);
-								continue;
-							}
-							
-							logger.debug("Providing previously cached index searcher for {}", indexDir);
-							return searcher.getSearcher();
-						}
-					}
-				} finally {
-					// if we have any expired index searchers, we need to close them and remove them
-					// from the cache so that we don't try to use them again later.
-					for ( final ActiveIndexSearcher searcher : expired ) {
-						try {
-							searcher.close();
-						} catch (final Exception e) {
-							logger.debug("Failed to close 'expired' IndexSearcher {}", searcher);
-						}
-						
-						currentlyCached.remove(searcher);
-					}
-				}
-			}
-			
-			IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
-			if ( writerCount == null ) {
-				final Directory directory = FSDirectory.open(absoluteFile);
-				logger.debug("No Index Writer currently exists for {}; creating a cachable reader", indexDir);
-				
-				try {
-					final DirectoryReader directoryReader = DirectoryReader.open(directory);
-					final IndexSearcher searcher = new IndexSearcher(directoryReader);
-					
-					// we want to cache the searcher that we create, since it's just a reader.
-					final ActiveIndexSearcher cached = new ActiveIndexSearcher(searcher, directoryReader, directory, true);
-					currentlyCached.add(cached);
-					
-					return cached.getSearcher();
-				} catch (final IOException e) {
-					try {
-						directory.close();
-					} catch (final IOException ioe) {
-						e.addSuppressed(ioe);
-					}
-					
-					throw e;
-				}
-			} else {
-				logger.debug("Index Writer currently exists for {}; creating a non-cachable reader and incrementing "
-						+ "counter to {}", indexDir, writerCount.getCount() + 1);
-
-				// increment the writer count to ensure that it's kept open.
-				writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
-						writerCount.getAnalyzer(), writerCount.getDirectory(), writerCount.getCount() + 1));
-				
-				// create a new Index Searcher from the writer so that we don't have an issue with trying
-				// to read from a directory that's locked. If we get the "no segments* file found" with
-				// Lucene, this indicates that an IndexWriter already has the directory open.
-				final IndexWriter writer = writerCount.getWriter();
-				final DirectoryReader directoryReader = DirectoryReader.open(writer, false);
-				final IndexSearcher searcher = new IndexSearcher(directoryReader);
-				
-				// we don't want to cache this searcher because it's based on a writer, so we want to get
-				// new values the next time that we search.
-				final ActiveIndexSearcher activeSearcher = new ActiveIndexSearcher(searcher, directoryReader, null, false);
-				
-				currentlyCached.add(activeSearcher);
-				return activeSearcher.getSearcher();
-			}
-		} finally {
-			lock.unlock();
-		}
-	}
-	
-	
-	public void returnIndexSearcher(final File indexDirectory, final IndexSearcher searcher) {
-		final File absoluteFile = indexDirectory.getAbsoluteFile();
-		logger.debug("Returning index searcher for {} to IndexManager", indexDirectory);
-		
-		lock.lock();
-		try {
-			// check if we already have a reader cached.
-			List<ActiveIndexSearcher> currentlyCached = activeSearchers.get(absoluteFile);
-			if ( currentlyCached == null ) {
-				logger.warn("Received Index Searcher for {} but no searcher was provided for that directory; this could "
-						+ "result in a resource leak", indexDirectory);
-				return;
-			}
-			
-			final Iterator<ActiveIndexSearcher> itr = currentlyCached.iterator();
-			while (itr.hasNext()) {
-				final ActiveIndexSearcher activeSearcher = itr.next();
-				if ( activeSearcher.getSearcher().equals(searcher) ) {
-					if ( activeSearcher.isCache() ) {
-						// the searcher is cached. Just leave it open.
-						logger.debug("Index searcher for {} is cached; leaving open", indexDirectory);
-						return;
-					} else {
-						// searcher is not cached. It was created from a writer, and we want
-						// the newest updates the next time that we get a searcher, so we will
-						// go ahead and close this one out.
-						itr.remove();
-						
-						// decrement the writer count because we incremented it when creating the searcher
-						final IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
-						if ( writerCount != null ) {
-							if ( writerCount.getCount() <= 1 ) {
-								try {
-									logger.debug("Index searcher for {} is not cached. Writer count is "
-											+ "decremented to {}; closing writer", indexDirectory, writerCount.getCount() - 1);
-									
-									writerCount.close();
-								} catch (final IOException ioe) {
-									logger.warn("Failed to close Index Writer for {} due to {}", absoluteFile, ioe);
-									if ( logger.isDebugEnabled() ) {
-										logger.warn("", ioe);
-									}
-								}
-							} else {
-								logger.debug("Index searcher for {} is not cached. Writer count is decremented "
-										+ "to {}; leaving writer open", indexDirectory, writerCount.getCount() - 1);
-								
-								writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
-									writerCount.getAnalyzer(), writerCount.getDirectory(), 
-									writerCount.getCount() - 1));
-							}
-						}
-
-						try {
-							logger.debug("Closing Index Searcher for {}", indexDirectory);
-							activeSearcher.close();
-						} catch (final IOException ioe) {
-							logger.warn("Failed to close Index Searcher for {} due to {}", absoluteFile, ioe);
-							if ( logger.isDebugEnabled() ) {
-								logger.warn("", ioe);
-							}
-						}
-					}
-				}
-			}
-		} finally {
-			lock.unlock();
-		}
-	}
-	
-	@Override
-	public void close() throws IOException {
-		logger.debug("Closing Index Manager");
-		
-		lock.lock();
-		try {
-			IOException ioe = null;
-			
-			for ( final IndexWriterCount count : writerCounts.values() ) {
-				try {
-					count.close();
-				} catch (final IOException e) {
-					if ( ioe == null ) {
-						ioe = e;
-					} else {
-						ioe.addSuppressed(e);
-					}
-				}
-			}
-			
-			for (final List<ActiveIndexSearcher> searcherList : activeSearchers.values()) {
-				for (final ActiveIndexSearcher searcher : searcherList) {
-					try {
-						searcher.close();
-					} catch (final IOException e) {
-						if ( ioe == null ) {
-							ioe = e;
-						} else {
-							ioe.addSuppressed(e);
-						}
-					}
-				}
-			}
-			
-			if ( ioe != null ) {
-				throw ioe;
-			}
-		} finally {
-			lock.unlock();
-		}
-	}
-
-	
-	private static void close(final Closeable... closeables) throws IOException {
-		IOException ioe = null;
-		for ( final Closeable closeable : closeables ) {
-			if ( closeable == null ) {
-				continue;
-			}
-			
-			try {
-				closeable.close();
-			} catch (final IOException e) {
-				if ( ioe == null ) {
-					ioe = e;
-				} else {
-					ioe.addSuppressed(e);
-				}
-			}
-		}
-		
-		if ( ioe != null ) {
-			throw ioe;
-		}
-	}
-	
-	
-	private static class ActiveIndexSearcher implements Closeable {
-		private final IndexSearcher searcher;
-		private final DirectoryReader directoryReader;
-		private final Directory directory;
-		private final boolean cache;
-		
-		public ActiveIndexSearcher(IndexSearcher searcher, DirectoryReader directoryReader, 
-				Directory directory, final boolean cache) {
-			this.searcher = searcher;
-			this.directoryReader = directoryReader;
-			this.directory = directory;
-			this.cache = cache;
-		}
-
-		public boolean isCache() {
-			return cache;
-		}
-
-		public IndexSearcher getSearcher() {
-			return searcher;
-		}
-		
-		@Override
-		public void close() throws IOException {
-			IndexManager.close(directoryReader, directory);
-		}
-	}
-	
-	
-	private static class IndexWriterCount implements Closeable {
-		private final IndexWriter writer;
-		private final Analyzer analyzer;
-		private final Directory directory;
-		private final int count;
-		
-		public IndexWriterCount(final IndexWriter writer, final Analyzer analyzer, final Directory directory, final int count) {
-			this.writer = writer;
-			this.analyzer = analyzer;
-			this.directory = directory;
-			this.count = count;
-		}
-
-		public Analyzer getAnalyzer() {
-			return analyzer;
-		}
-
-		public Directory getDirectory() {
-			return directory;
-		}
-
-		public IndexWriter getWriter() {
-			return writer;
-		}
-
-		public int getCount() {
-			return count;
-		}
-
-		@Override
-		public void close() throws IOException {
-			IndexManager.close(writer, analyzer, directory);
-		}
-	}
+            } else {
+                logger.debug("Providing existing index writer for {} and incrementing count to {}", indexingDirectory, writerCount.getCount() + 1);
+                writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
+                        writerCount.getAnalyzer(), writerCount.getDirectory(), writerCount.getCount() + 1));
+            }
+
+            return writerCount.getWriter();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public void returnIndexWriter(final File indexingDirectory, final IndexWriter writer) {
+        final File absoluteFile = indexingDirectory.getAbsoluteFile();
+        logger.debug("Returning Index Writer for {} to IndexManager", indexingDirectory);
+
+        lock.lock();
+        try {
+            IndexWriterCount count = writerCounts.remove(absoluteFile);
+
+            try {
+                if ( count == null ) {
+                    logger.warn("Index Writer {} was returned to IndexManager for {}, but this writer is not known. "
+                            + "This could potentially lead to a resource leak", writer, indexingDirectory);
+                    writer.close();
+                } else if ( count.getCount() <= 1 ) {
+                    // we are finished with this writer.
+                    logger.debug("Closing Index Writer for {}", indexingDirectory);
+                    count.close();
+                } else {
+                    // decrement the count.
+                    logger.debug("Decrementing count for Index Writer for {} to {}", indexingDirectory, count.getCount() - 1);
+                    writerCounts.put(absoluteFile, new IndexWriterCount(count.getWriter(), count.getAnalyzer(), count.getDirectory(), count.getCount() - 1));
+                }
+            } catch (final IOException ioe) {
+                logger.warn("Failed to close Index Writer {} due to {}", writer, ioe);
+                if ( logger.isDebugEnabled() ) {
+                    logger.warn("", ioe);
+                }
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+
+    public IndexSearcher borrowIndexSearcher(final File indexDir) throws IOException {
+        final File absoluteFile = indexDir.getAbsoluteFile();
+        logger.debug("Borrowing index searcher for {}", indexDir);
+
+        lock.lock();
+        try {
+            // check if we already have a reader cached.
+            List<ActiveIndexSearcher> currentlyCached = activeSearchers.get(absoluteFile);
+            if ( currentlyCached == null ) {
+                currentlyCached = new ArrayList<>();
+                activeSearchers.put(absoluteFile, currentlyCached);
+            } else {
+                // keep track of any searchers that have been closed so that we can remove them
+                // from our cache later.
+                final Set<ActiveIndexSearcher> expired = new HashSet<>();
+
+                try {
+                    for ( final ActiveIndexSearcher searcher : currentlyCached ) {
+                        if ( searcher.isCache() ) {
+                            final int refCount = searcher.getSearcher().getIndexReader().getRefCount();
+                            if ( refCount <= 0 ) {
+                                // if refCount == 0, then the reader has been closed, so we need to discard the searcher
+                                logger.debug("Reference count for cached Index Searcher for {} is currently {}; "
+                                        + "removing cached searcher", absoluteFile, refCount);
+                                expired.add(searcher);
+                                continue;
+                            }
+
+                            logger.debug("Providing previously cached index searcher for {}", indexDir);
+                            return searcher.getSearcher();
+                        }
+                    }
+                } finally {
+                    // if we have any expired index searchers, we need to close them and remove them
+                    // from the cache so that we don't try to use them again later.
+                    for ( final ActiveIndexSearcher searcher : expired ) {
+                        try {
+                            searcher.close();
+                        } catch (final Exception e) {
+                            logger.debug("Failed to close 'expired' IndexSearcher {}", searcher);
+                        }
+
+                        currentlyCached.remove(searcher);
+                    }
+                }
+            }
+
+            IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
+            if ( writerCount == null ) {
+                final Directory directory = FSDirectory.open(absoluteFile);
+                logger.debug("No Index Writer currently exists for {}; creating a cachable reader", indexDir);
+
+                try {
+                    final DirectoryReader directoryReader = DirectoryReader.open(directory);
+                    final IndexSearcher searcher = new IndexSearcher(directoryReader);
+
+                    // we want to cache the searcher that we create, since it's just a reader.
+                    final ActiveIndexSearcher cached = new ActiveIndexSearcher(searcher, directoryReader, directory, true);
+                    currentlyCached.add(cached);
+
+                    return cached.getSearcher();
+                } catch (final IOException e) {
+                    try {
+                        directory.close();
+                    } catch (final IOException ioe) {
+                        e.addSuppressed(ioe);
+                    }
+
+                    throw e;
+                }
+            } else {
+                logger.debug("Index Writer currently exists for {}; creating a non-cachable reader and incrementing "
+                        + "counter to {}", indexDir, writerCount.getCount() + 1);
+
+                // increment the writer count to ensure that it's kept open.
+                writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
+                        writerCount.getAnalyzer(), writerCount.getDirectory(), writerCount.getCount() + 1));
+
+                // create a new Index Searcher from the writer so that we don't have an issue with trying
+                // to read from a directory that's locked. If we get the "no segments* file found" with
+                // Lucene, this indicates that an IndexWriter already has the directory open.
+                final IndexWriter writer = writerCount.getWriter();
+                final DirectoryReader directoryReader = DirectoryReader.open(writer, false);
+                final IndexSearcher searcher = new IndexSearcher(directoryReader);
+
+                // we don't want to cache this searcher because it's based on a writer, so we want to get
+                // new values the next time that we search.
+                final ActiveIndexSearcher activeSearcher = new ActiveIndexSearcher(searcher, directoryReader, null, false);
+
+                currentlyCached.add(activeSearcher);
+                return activeSearcher.getSearcher();
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+
+    public void returnIndexSearcher(final File indexDirectory, final IndexSearcher searcher) {
+        final File absoluteFile = indexDirectory.getAbsoluteFile();
+        logger.debug("Returning index searcher for {} to IndexManager", indexDirectory);
+
+        lock.lock();
+        try {
+            // check if we already have a reader cached.
+            List<ActiveIndexSearcher> currentlyCached = activeSearchers.get(absoluteFile);
+            if ( currentlyCached == null ) {
+                logger.warn("Received Index Searcher for {} but no searcher was provided for that directory; this could "
+                        + "result in a resource leak", indexDirectory);
+                return;
+            }
+
+            final Iterator<ActiveIndexSearcher> itr = currentlyCached.iterator();
+            while (itr.hasNext()) {
+                final ActiveIndexSearcher activeSearcher = itr.next();
+                if ( activeSearcher.getSearcher().equals(searcher) ) {
+                    if ( activeSearcher.isCache() ) {
+                        // the searcher is cached. Just leave it open.
+                        logger.debug("Index searcher for {} is cached; leaving open", indexDirectory);
+                        return;
+                    } else {
+                        // searcher is not cached. It was created from a writer, and we want
+                        // the newest updates the next time that we get a searcher, so we will
+                        // go ahead and close this one out.
+                        itr.remove();
+
+                        // decrement the writer count because we incremented it when creating the searcher
+                        final IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
+                        if ( writerCount != null ) {
+                            if ( writerCount.getCount() <= 1 ) {
+                                try {
+                                    logger.debug("Index searcher for {} is not cached. Writer count is "
+                                            + "decremented to {}; closing writer", indexDirectory, writerCount.getCount() - 1);
+
+                                    writerCount.close();
+                                } catch (final IOException ioe) {
+                                    logger.warn("Failed to close Index Writer for {} due to {}", absoluteFile, ioe);
+                                    if ( logger.isDebugEnabled() ) {
+                                        logger.warn("", ioe);
+                                    }
+                                }
+                            } else {
+                                logger.debug("Index searcher for {} is not cached. Writer count is decremented "
+                                        + "to {}; leaving writer open", indexDirectory, writerCount.getCount() - 1);
+
+                                writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
+                                        writerCount.getAnalyzer(), writerCount.getDirectory(),
+                                        writerCount.getCount() - 1));
+                            }
+                        }
+
+                        try {
+                            logger.debug("Closing Index Searcher for {}", indexDirectory);
+                            activeSearcher.close();
+                        } catch (final IOException ioe) {
+                            logger.warn("Failed to close Index Searcher for {} due to {}", absoluteFile, ioe);
+                            if ( logger.isDebugEnabled() ) {
+                                logger.warn("", ioe);
+                            }
+                        }
+                    }
+                }
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        logger.debug("Closing Index Manager");
+
+        lock.lock();
+        try {
+            IOException ioe = null;
+
+            for ( final IndexWriterCount count : writerCounts.values() ) {
+                try {
+                    count.close();
+                } catch (final IOException e) {
+                    if ( ioe == null ) {
+                        ioe = e;
+                    } else {
+                        ioe.addSuppressed(e);
+                    }
+                }
+            }
+
+            for (final List<ActiveIndexSearcher> searcherList : activeSearchers.values()) {
+                for (final ActiveIndexSearcher searcher : searcherList) {
+                    try {
+                        searcher.close();
+                    } catch (final IOException e) {
+                        if ( ioe == null ) {
+                            ioe = e;
+                        } else {
+                            ioe.addSuppressed(e);
+                        }
+                    }
+                }
+            }
+
+            if ( ioe != null ) {
+                throw ioe;
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+
+    private static void close(final Closeable... closeables) throws IOException {
+        IOException ioe = null;
+        for ( final Closeable closeable : closeables ) {
+            if ( closeable == null ) {
+                continue;
+            }
+
+            try {
+                closeable.close();
+            } catch (final IOException e) {
+                if ( ioe == null ) {
+                    ioe = e;
+                } else {
+                    ioe.addSuppressed(e);
+                }
+            }
+        }
+
+        if ( ioe != null ) {
+            throw ioe;
+        }
+    }
+
+
+    private static class ActiveIndexSearcher implements Closeable {
+        private final IndexSearcher searcher;
+        private final DirectoryReader directoryReader;
+        private final Directory directory;
+        private final boolean cache;
+
+        public ActiveIndexSearcher(IndexSearcher searcher, DirectoryReader directoryReader,
+                Directory directory, final boolean cache) {
+            this.searcher = searcher;
+            this.directoryReader = directoryReader;
+            this.directory = directory;
+            this.cache = cache;
+        }
+
+        public boolean isCache() {
+            return cache;
+        }
+
+        public IndexSearcher getSearcher() {
+            return searcher;
+        }
+
+        @Override
+        public void close() throws IOException {
+            IndexManager.close(directoryReader, directory);
+        }
+    }
+
+
+    private static class IndexWriterCount implements Closeable {
+        private final IndexWriter writer;
+        private final Analyzer analyzer;
+        private final Directory directory;
+        private final int count;
+
+        public IndexWriterCount(final IndexWriter writer, final Analyzer analyzer, final Directory directory, final int count) {
+            this.writer = writer;
+            this.analyzer = analyzer;
+            this.directory = directory;
+            this.count = count;
+        }
+
+        public Analyzer getAnalyzer() {
+            return analyzer;
+        }
+
+        public Directory getDirectory() {
+            return directory;
+        }
+
+        public IndexWriter getWriter() {
+            return writer;
+        }
+
+        public int getCount() {
+            return count;
+        }
+
+        @Override
+        public void close() throws IOException {
+            IndexManager.close(writer, analyzer, directory);
+        }
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
index dcb6e08..53869f4 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
@@ -35,7 +35,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class IndexSearch {
-	private final Logger logger = LoggerFactory.getLogger(IndexSearch.class);
+    private final Logger logger = LoggerFactory.getLogger(IndexSearch.class);
     private final PersistentProvenanceRepository repository;
     private final File indexDirectory;
     private final IndexManager indexManager;
@@ -65,17 +65,17 @@ public class IndexSearch {
         final long start = System.nanoTime();
         IndexSearcher searcher = null;
         try {
-        	searcher = indexManager.borrowIndexSearcher(indexDirectory);
+            searcher = indexManager.borrowIndexSearcher(indexDirectory);
             final long searchStartNanos = System.nanoTime();
             final long openSearcherNanos = searchStartNanos - start;
-            
+
             final TopDocs topDocs = searcher.search(luceneQuery, provenanceQuery.getMaxResults());
             final long finishSearch = System.nanoTime();
             final long searchNanos = finishSearch - searchStartNanos;
-            
-            logger.debug("Searching {} took {} millis; opening searcher took {} millis", this, 
-            		TimeUnit.NANOSECONDS.toMillis(searchNanos), TimeUnit.NANOSECONDS.toMillis(openSearcherNanos));
-            
+
+            logger.debug("Searching {} took {} millis; opening searcher took {} millis", this,
+                    TimeUnit.NANOSECONDS.toMillis(searchNanos), TimeUnit.NANOSECONDS.toMillis(openSearcherNanos));
+
             if (topDocs.totalHits == 0) {
                 sqr.update(Collections.<ProvenanceEventRecord>emptyList(), 0);
                 return sqr;
@@ -83,31 +83,31 @@ public class IndexSearch {
 
             final DocsReader docsReader = new DocsReader(repository.getConfiguration().getStorageDirectories());
             matchingRecords = docsReader.read(topDocs, searcher.getIndexReader(), repository.getAllLogFiles(), retrievedCount, provenanceQuery.getMaxResults());
-            
+
             final long readRecordsNanos = System.nanoTime() - finishSearch;
             logger.debug("Reading {} records took {} millis for {}", matchingRecords.size(), TimeUnit.NANOSECONDS.toMillis(readRecordsNanos), this);
-            
+
             sqr.update(matchingRecords, topDocs.totalHits);
             return sqr;
         } catch (final FileNotFoundException e) {
             // nothing has been indexed yet, or the data has already aged off
-        	logger.warn("Attempted to search Provenance Index {} but could not find the file due to {}", indexDirectory, e);
-        	if ( logger.isDebugEnabled() ) {
-        		logger.warn("", e);
-        	}
-        	
+            logger.warn("Attempted to search Provenance Index {} but could not find the file due to {}", indexDirectory, e);
+            if ( logger.isDebugEnabled() ) {
+                logger.warn("", e);
+            }
+
             sqr.update(Collections.<ProvenanceEventRecord>emptyList(), 0);
             return sqr;
         } finally {
-        	if ( searcher != null ) {
-        		indexManager.returnIndexSearcher(indexDirectory, searcher);
-        	}
+            if ( searcher != null ) {
+                indexManager.returnIndexSearcher(indexDirectory, searcher);
+            }
         }
     }
 
-    
+
     @Override
     public String toString() {
-    	return "IndexSearcher[" + indexDirectory + "]";
+        return "IndexSearcher[" + indexDirectory + "]";
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java
index 5e87913..46be391 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java
@@ -16,50 +16,30 @@
  */
 package org.apache.nifi.provenance.lucene;
 
-import java.io.EOFException;
-import java.io.File;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.lucene.analysis.Analyzer;
-import org.apache.lucene.analysis.standard.StandardAnalyzer;
 import org.apache.lucene.document.Document;
 import org.apache.lucene.document.Field.Store;
 import org.apache.lucene.document.IntField;
 import org.apache.lucene.document.LongField;
 import org.apache.lucene.document.StringField;
 import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.index.IndexWriterConfig;
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.FSDirectory;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.provenance.IndexConfiguration;
 import org.apache.nifi.provenance.PersistentProvenanceRepository;
 import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.provenance.SearchableFields;
 import org.apache.nifi.provenance.StandardProvenanceEventRecord;
-import org.apache.nifi.provenance.rollover.RolloverAction;
 import org.apache.nifi.provenance.search.SearchableField;
-import org.apache.nifi.provenance.serialization.RecordReader;
-import org.apache.nifi.provenance.serialization.RecordReaders;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-public class IndexingAction implements RolloverAction {
-
-    private final PersistentProvenanceRepository repository;
+public class IndexingAction {
     private final Set<SearchableField> nonAttributeSearchableFields;
     private final Set<SearchableField> attributeSearchableFields;
-    private final IndexConfiguration indexConfiguration;
-    private static final Logger logger = LoggerFactory.getLogger(IndexingAction.class);
-
-    public IndexingAction(final PersistentProvenanceRepository repo, final IndexConfiguration indexConfig) {
-        repository = repo;
-        indexConfiguration = indexConfig;
 
+    public IndexingAction(final PersistentProvenanceRepository repo) {
         attributeSearchableFields = Collections.unmodifiableSet(new HashSet<>(repo.getConfiguration().getSearchableAttributes()));
         nonAttributeSearchableFields = Collections.unmodifiableSet(new HashSet<>(repo.getConfiguration().getSearchableFields()));
     }
@@ -72,7 +52,7 @@ public class IndexingAction implements RolloverAction {
         doc.add(new StringField(field.getSearchableFieldName(), value.toLowerCase(), store));
     }
 
-    
+
     public void index(final StandardProvenanceEventRecord record, final IndexWriter indexWriter, final Integer blockIndex) throws IOException {
         final Map<String, String> attributes = record.getAttributes();
 
@@ -105,14 +85,14 @@ public class IndexingAction implements RolloverAction {
             doc.add(new LongField(SearchableFields.EventTime.getSearchableFieldName(), record.getEventTime(), Store.NO));
             doc.add(new LongField(SearchableFields.FileSize.getSearchableFieldName(), record.getFileSize(), Store.NO));
             doc.add(new StringField(FieldNames.STORAGE_FILENAME, storageFilename, Store.YES));
-            
+
             if ( blockIndex == null ) {
-            	doc.add(new LongField(FieldNames.STORAGE_FILE_OFFSET, record.getStorageByteOffset(), Store.YES));
+                doc.add(new LongField(FieldNames.STORAGE_FILE_OFFSET, record.getStorageByteOffset(), Store.YES));
             } else {
-	            doc.add(new IntField(FieldNames.BLOCK_INDEX, blockIndex, Store.YES));
-	            doc.add(new LongField(SearchableFields.Identifier.getSearchableFieldName(), record.getEventId(), Store.YES));
+                doc.add(new IntField(FieldNames.BLOCK_INDEX, blockIndex, Store.YES));
+                doc.add(new LongField(SearchableFields.Identifier.getSearchableFieldName(), record.getEventId(), Store.YES));
             }
-            
+
             for (final String lineageIdentifier : record.getLineageIdentifiers()) {
                 addField(doc, SearchableFields.LineageIdentifier, lineageIdentifier, Store.NO);
             }
@@ -150,87 +130,4 @@ public class IndexingAction implements RolloverAction {
             indexWriter.addDocument(doc);
         }
     }
-    
-    @Override
-    public File execute(final File fileRolledOver) throws IOException {
-        final File indexingDirectory = indexConfiguration.getWritableIndexDirectory(fileRolledOver);
-        int indexCount = 0;
-        long maxId = -1L;
-
-        try (final Directory directory = FSDirectory.open(indexingDirectory);
-                final Analyzer analyzer = new StandardAnalyzer()) {
-
-            final IndexWriterConfig config = new IndexWriterConfig(LuceneUtil.LUCENE_VERSION, analyzer);
-            config.setWriteLockTimeout(300000L);
-
-            try (final IndexWriter indexWriter = new IndexWriter(directory, config);
-                    final RecordReader reader = RecordReaders.newRecordReader(fileRolledOver, repository.getAllLogFiles())) {
-                StandardProvenanceEventRecord record;
-                while (true) {
-                	final Integer blockIndex;
-                	if ( reader.isBlockIndexAvailable() ) {
-                		blockIndex = reader.getBlockIndex();
-                	} else {
-                		blockIndex = null;
-                	}
-                	
-                    try {
-                        record = reader.nextRecord();
-                    } catch (final EOFException eof) {
-                        // system was restarted while writing to the log file. Nothing we can do here, so ignore this record.
-                        // On system restart, the FlowFiles should be back in their "original" queues, so the events will be re-created
-                        // when the data is re-processed
-                        break;
-                    }
-
-                    if (record == null) {
-                        break;
-                    }
-
-                    maxId = record.getEventId();
-
-                    index(record, indexWriter, blockIndex);
-                    indexCount++;
-                }
-
-                indexWriter.commit();
-            } catch (final EOFException eof) {
-                // nothing in the file. Move on.
-            }
-        } finally {
-            if (maxId >= -1) {
-                indexConfiguration.setMaxIdIndexed(maxId);
-            }
-        }
-
-        final File newFile = new File(fileRolledOver.getParent(),
-                LuceneUtil.substringBeforeLast(fileRolledOver.getName(), ".")
-                + ".indexed."
-                + LuceneUtil.substringAfterLast(fileRolledOver.getName(), "."));
-
-        boolean renamed = false;
-        for (int i = 0; i < 10 && !renamed; i++) {
-            renamed = fileRolledOver.renameTo(newFile);
-            if (!renamed) {
-                try {
-                    Thread.sleep(25L);
-                } catch (final InterruptedException e) {
-                }
-            }
-        }
-
-        if (renamed) {
-            logger.info("Finished indexing Provenance Log File {} to index {} with {} records indexed and renamed file to {}",
-                    fileRolledOver, indexingDirectory, indexCount, newFile);
-            return newFile;
-        } else {
-            logger.warn("Finished indexing Provenance Log File {} with {} records indexed but failed to rename file to {}; indexed {} records", new Object[]{fileRolledOver, indexCount, newFile, indexCount});
-            return fileRolledOver;
-        }
-    }
-
-    @Override
-    public boolean hasBeenPerformed(final File fileRolledOver) {
-        return fileRolledOver.getName().contains(".indexed.");
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java
index 54cde15..3f75c00 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java
@@ -48,7 +48,8 @@ public class LineageQuery {
     public static final int MAX_LINEAGE_UUIDS = 100;
     private static final Logger logger = LoggerFactory.getLogger(LineageQuery.class);
 
-    public static Set<ProvenanceEventRecord> computeLineageForFlowFiles(final PersistentProvenanceRepository repo, final File indexDirectory, final String lineageIdentifier, final Collection<String> flowFileUuids) throws IOException {
+    public static Set<ProvenanceEventRecord> computeLineageForFlowFiles(final PersistentProvenanceRepository repo, final File indexDirectory,
+            final String lineageIdentifier, final Collection<String> flowFileUuids) throws IOException {
         if (requireNonNull(flowFileUuids).size() > MAX_LINEAGE_UUIDS) {
             throw new IllegalArgumentException(String.format("Cannot compute lineage for more than %s FlowFiles. This lineage contains %s.", MAX_LINEAGE_UUIDS, flowFileUuids.size()));
         }
@@ -99,7 +100,8 @@ public class LineageQuery {
             final DocsReader docsReader = new DocsReader(repo.getConfiguration().getStorageDirectories());
             final Set<ProvenanceEventRecord> recs = docsReader.read(uuidQueryTopDocs, indexReader, repo.getAllLogFiles(), new AtomicInteger(0), Integer.MAX_VALUE);
             final long readDocsEnd = System.nanoTime();
-            logger.debug("Finished Lineage Query; Lucene search took {} millis, reading records took {} millis", TimeUnit.NANOSECONDS.toMillis(searchEnd - searchStart), TimeUnit.NANOSECONDS.toMillis(readDocsEnd - searchEnd));
+            logger.debug("Finished Lineage Query; Lucene search took {} millis, reading records took {} millis",
+                    TimeUnit.NANOSECONDS.toMillis(searchEnd - searchStart), TimeUnit.NANOSECONDS.toMillis(readDocsEnd - searchEnd));
 
             return recs;
         }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java
index 59dc10b..c622ea1 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java
@@ -78,16 +78,16 @@ public class LuceneUtil {
         final String searchString = baseName + ".";
         for (final Path path : allProvenanceLogs) {
             if (path.toFile().getName().startsWith(searchString)) {
-            	final File file = path.toFile();
-            	if ( file.exists() ) {
-            		matchingFiles.add(file);
-            	} else {
-            		final File dir = file.getParentFile();
-            		final File gzFile = new File(dir, file.getName() + ".gz");
-            		if ( gzFile.exists() ) {
-            			matchingFiles.add(gzFile);
-            		}
-            	}
+                final File file = path.toFile();
+                if ( file.exists() ) {
+                    matchingFiles.add(file);
+                } else {
+                    final File dir = file.getParentFile();
+                    final File gzFile = new File(dir, file.getName() + ".gz");
+                    if ( gzFile.exists() ) {
+                        matchingFiles.add(gzFile);
+                    }
+                }
             }
         }
 
@@ -144,16 +144,16 @@ public class LuceneUtil {
                 final IndexableField fileOffset1 = o1.getField(FieldNames.BLOCK_INDEX);
                 final IndexableField fileOffset2 = o1.getField(FieldNames.BLOCK_INDEX);
                 if ( fileOffset1 != null && fileOffset2 != null ) {
-                	final int blockIndexResult = Long.compare(fileOffset1.numericValue().longValue(), fileOffset2.numericValue().longValue());
-                	if ( blockIndexResult != 0 ) {
-                		return blockIndexResult;
-                	}
-                	
-                	final long eventId1 = o1.getField(SearchableFields.Identifier.getSearchableFieldName()).numericValue().longValue();
-                	final long eventId2 = o2.getField(SearchableFields.Identifier.getSearchableFieldName()).numericValue().longValue();
-                	return Long.compare(eventId1, eventId2);
+                    final int blockIndexResult = Long.compare(fileOffset1.numericValue().longValue(), fileOffset2.numericValue().longValue());
+                    if ( blockIndexResult != 0 ) {
+                        return blockIndexResult;
+                    }
+
+                    final long eventId1 = o1.getField(SearchableFields.Identifier.getSearchableFieldName()).numericValue().longValue();
+                    final long eventId2 = o2.getField(SearchableFields.Identifier.getSearchableFieldName()).numericValue().longValue();
+                    return Long.compare(eventId1, eventId2);
                 }
-                
+
                 final long offset1 = o1.getField(FieldNames.STORAGE_FILE_OFFSET).numericValue().longValue();
                 final long offset2 = o2.getField(FieldNames.STORAGE_FILE_OFFSET).numericValue().longValue();
                 return Long.compare(offset1, offset2);

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/rollover/CompressionAction.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/rollover/CompressionAction.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/rollover/CompressionAction.java
deleted file mode 100644
index d014618..0000000
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/rollover/CompressionAction.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.provenance.rollover;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-
-import org.apache.nifi.stream.io.GZIPOutputStream;
-import org.apache.nifi.stream.io.StreamUtils;
-import org.apache.nifi.provenance.lucene.IndexingAction;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class CompressionAction implements RolloverAction {
-
-    private static final Logger logger = LoggerFactory.getLogger(IndexingAction.class);
-
-    @Override
-    public File execute(final File fileRolledOver) throws IOException {
-        final File gzFile = new File(fileRolledOver.getParent(), fileRolledOver.getName() + ".gz");
-        try (final FileInputStream in = new FileInputStream(fileRolledOver);
-                final OutputStream fos = new FileOutputStream(gzFile);
-                final GZIPOutputStream gzipOut = new GZIPOutputStream(fos, 1)) {
-            StreamUtils.copy(in, gzipOut);
-            in.getFD().sync();
-        }
-
-        boolean deleted = false;
-        for (int i = 0; i < 10 && !deleted; i++) {
-            deleted = fileRolledOver.delete();
-        }
-
-        logger.info("Finished compressing Provenance Log File {}", fileRolledOver);
-        return gzFile;
-    }
-
-    @Override
-    public boolean hasBeenPerformed(final File fileRolledOver) {
-        return fileRolledOver.getName().contains(".gz");
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/rollover/RolloverAction.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/rollover/RolloverAction.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/rollover/RolloverAction.java
deleted file mode 100644
index 33401e9..0000000
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/rollover/RolloverAction.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.provenance.rollover;
-
-import java.io.File;
-import java.io.IOException;
-
-public interface RolloverAction {
-
-    /**
-     * Performs some action against the given File and returns the new File that
-     * contains the modified version
-     *
-     * @param fileRolledOver
-     * @return
-     * @throws IOException
-     */
-    File execute(File fileRolledOver) throws IOException;
-
-    boolean hasBeenPerformed(File fileRolledOver);
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReader.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReader.java
index 8bdc88a..91c8222 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReader.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReader.java
@@ -24,75 +24,80 @@ import org.apache.nifi.provenance.toc.TocReader;
 
 public interface RecordReader extends Closeable {
 
-	/**
-	 * Returns the next record in the reader, or <code>null</code> if there is no more data available.
-	 * @return
-	 * @throws IOException
-	 */
+    /**
+     * Returns the next record in the reader, or <code>null</code> if there is no more data available.
+     * @return the next Provenance event in the stream
+     * @throws IOException if unable to read the next event from the stream
+     */
     StandardProvenanceEventRecord nextRecord() throws IOException;
 
     /**
      * Skips the specified number of bytes
-     * @param bytesToSkip
-     * @throws IOException
+     * @param bytesToSkip the number of bytes to skip ahead
+     * @throws IOException if unable to skip ahead the specified number of bytes (e.g., the stream does
+     *  not contain this many more bytes)
      */
     void skip(long bytesToSkip) throws IOException;
 
     /**
      * Skips to the specified byte offset in the underlying stream.
-     * @param position
+     * @param position the byte offset to skip to
      * @throws IOException if the underlying stream throws IOException, or if the reader has already
      * passed the specified byte offset
      */
     void skipTo(long position) throws IOException;
-    
+
     /**
      * Skips to the specified compression block
-     * 
-     * @param blockIndex
+     *
+     * @param blockIndex the byte index to skip to
      * @throws IOException if the underlying stream throws IOException, or if the reader has already
      * read passed the specified compression block index
      * @throws IllegalStateException if the RecordReader does not have a TableOfContents associated with it
      */
     void skipToBlock(int blockIndex) throws IOException;
-    
+
     /**
      * Returns the block index that the Reader is currently reading from.
      * Note that the block index is incremented at the beginning of the {@link #nextRecord()}
-     * method. This means that this method will return the block from which the previous record was read, 
+     * method. This means that this method will return the block from which the previous record was read,
      * if calling {@link #nextRecord()} continually, not the block from which the next record will be read.
-     * @return
+     *
+     * @return the current block index
+     * @throws IllegalStateException if the reader is reading a provenance event file that does not contain
+     * a Table of Contents
      */
     int getBlockIndex();
-    
+
     /**
      * Returns <code>true</code> if the compression block index is available. It will be available
      * if and only if the reader is created with a TableOfContents
-     * 
-     * @return
+     *
+     * @return true if the reader is reading from an event file that has a Table of Contents
      */
     boolean isBlockIndexAvailable();
-    
+
     /**
      * Returns the {@link TocReader} that is used to keep track of compression blocks, if one exists,
      * <code>null</code> otherwise
-     * @return
+     *
+     * @return the TocReader if the underlying event file has an Table of Contents, <code>null</code> otherwise.
      */
     TocReader getTocReader();
-    
+
     /**
-     * Returns the number of bytes that have been consumed from the stream (read or skipped).
-     * @return
+     * @return the number of bytes that have been consumed from the stream (read or skipped).
      */
     long getBytesConsumed();
-    
+
     /**
      * Returns the ID of the last event in this record reader, or -1 if the reader has no records or
      * has already read through all records. Note: This method will consume the stream until the end,
      * so no more records will be available on this reader after calling this method.
-     * 
-     * @return
-     * @throws IOException
+     *
+     * @return the ID of the last event in this record reader, or -1 if the reader has no records or
+     * has already read through all records
+     * @throws IOException if unable to get id of the last event
      */
     long getMaxEventId() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
index dff281c..cab5e6f 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
@@ -37,75 +37,75 @@ public class RecordReaders {
         InputStream fis = null;
 
         try {
-	        if (!file.exists()) {
-	            if (provenanceLogFiles != null) {
-		            final String baseName = LuceneUtil.substringBefore(file.getName(), ".") + ".";
-		            for (final Path path : provenanceLogFiles) {
-		                if (path.toFile().getName().startsWith(baseName)) {
-		                    file = path.toFile();
-		                    break;
-		                }
-		            }
-	            }
-	        }
-	
-	        if ( file.exists() ) {
-	            try {
-	                fis = new FileInputStream(file);
-	            } catch (final FileNotFoundException fnfe) {
-	                fis = null;
-	            }
-	        }
-	        
-	        String filename = file.getName();
-	        openStream: while ( fis == null ) {
-	            final File dir = file.getParentFile();
-	            final String baseName = LuceneUtil.substringBefore(file.getName(), ".");
-	            
-	            // depending on which rollover actions have occurred, we could have 3 possibilities for the
-	            // filename that we need. The majority of the time, we will use the extension ".prov.indexed.gz"
-	            // because most often we are compressing on rollover and most often we have already finished
-	            // compressing by the time that we are querying the data.
-	            for ( final String extension : new String[] {".prov.gz", ".prov"} ) {
-	                file = new File(dir, baseName + extension);
-	                if ( file.exists() ) {
-	                    try {
-	                        fis = new FileInputStream(file);
-	                        filename = baseName + extension;
-	                        break openStream;
-	                    } catch (final FileNotFoundException fnfe) {
-	                        // file was modified by a RolloverAction after we verified that it exists but before we could
-	                        // create an InputStream for it. Start over.
-	                        fis = null;
-	                        continue openStream;
-	                    }
-	                }
-	            }
-	            
-	            break;
-	        }
-	
-	        if ( fis == null ) {
-	            throw new FileNotFoundException("Unable to locate file " + originalFile);
-	        }
-	
-	    	final File tocFile = TocUtil.getTocFile(file);
-	    	if ( tocFile.exists() ) {
-	    		final TocReader tocReader = new StandardTocReader(tocFile);
-	    		return new StandardRecordReader(fis, filename, tocReader);
-	    	} else {
-	    		return new StandardRecordReader(fis, filename);
-	    	}
+            if (!file.exists()) {
+                if (provenanceLogFiles != null) {
+                    final String baseName = LuceneUtil.substringBefore(file.getName(), ".") + ".";
+                    for (final Path path : provenanceLogFiles) {
+                        if (path.toFile().getName().startsWith(baseName)) {
+                            file = path.toFile();
+                            break;
+                        }
+                    }
+                }
+            }
+
+            if ( file.exists() ) {
+                try {
+                    fis = new FileInputStream(file);
+                } catch (final FileNotFoundException fnfe) {
+                    fis = null;
+                }
+            }
+
+            String filename = file.getName();
+            openStream: while ( fis == null ) {
+                final File dir = file.getParentFile();
+                final String baseName = LuceneUtil.substringBefore(file.getName(), ".");
+
+                // depending on which rollover actions have occurred, we could have 3 possibilities for the
+                // filename that we need. The majority of the time, we will use the extension ".prov.indexed.gz"
+                // because most often we are compressing on rollover and most often we have already finished
+                // compressing by the time that we are querying the data.
+                for ( final String extension : new String[] {".prov.gz", ".prov"} ) {
+                    file = new File(dir, baseName + extension);
+                    if ( file.exists() ) {
+                        try {
+                            fis = new FileInputStream(file);
+                            filename = baseName + extension;
+                            break openStream;
+                        } catch (final FileNotFoundException fnfe) {
+                            // file was modified by a RolloverAction after we verified that it exists but before we could
+                            // create an InputStream for it. Start over.
+                            fis = null;
+                            continue openStream;
+                        }
+                    }
+                }
+
+                break;
+            }
+
+            if ( fis == null ) {
+                throw new FileNotFoundException("Unable to locate file " + originalFile);
+            }
+
+            final File tocFile = TocUtil.getTocFile(file);
+            if ( tocFile.exists() ) {
+                final TocReader tocReader = new StandardTocReader(tocFile);
+                return new StandardRecordReader(fis, filename, tocReader);
+            } else {
+                return new StandardRecordReader(fis, filename);
+            }
         } catch (final IOException ioe) {
-        	if ( fis != null ) {
-        		try {
-        			fis.close();
-        		} catch (final IOException inner) {
-        			ioe.addSuppressed(inner);
-        		}
-        	}
-        	
-        	throw ioe;
+            if ( fis != null ) {
+                try {
+                    fis.close();
+                } catch (final IOException inner) {
+                    ioe.addSuppressed(inner);
+                }
+            }
+
+            throw ioe;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java
index 58f4dc2..d89fd6f 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java
@@ -28,31 +28,27 @@ public interface RecordWriter extends Closeable {
     /**
      * Writes header information to the underlying stream
      *
-     * @throws IOException
+     * @throws IOException if unable to write header information to the underlying stream
      */
     void writeHeader() throws IOException;
 
     /**
      * Writes the given record out to the underlying stream
      *
-     * @param record
-     * @param recordIdentifier
+     * @param record the record to write
+     * @param recordIdentifier the new identifier of the record
      * @return the number of bytes written for the given records
-     * @throws IOException
+     * @throws IOException if unable to write the record to the stream
      */
     long writeRecord(ProvenanceEventRecord record, long recordIdentifier) throws IOException;
 
     /**
-     * Returns the number of Records that have been written to this RecordWriter
-     *
-     * @return
+     * @return the number of Records that have been written to this RecordWriter
      */
     int getRecordsWritten();
 
     /**
-     * Returns the file that this RecordWriter is writing to
-     *
-     * @return
+     * @return the file that this RecordWriter is writing to
      */
     File getFile();
 
@@ -73,19 +69,18 @@ public interface RecordWriter extends Closeable {
      * not immediately available, returns <code>false</code>; otherwise, obtains
      * the lock and returns <code>true</code>.
      *
-     * @return
+     * @return <code>true</code> if the lock was obtained, <code>false</code> otherwise.
      */
     boolean tryLock();
 
     /**
      * Syncs the content written to this writer to disk.
-     * @throws java.io.IOException
+     * @throws IOException if unable to sync content to disk
      */
     void sync() throws IOException;
 
     /**
-     * Returns the TOC Writer that is being used to write the Table of Contents for this journal
-     * @return
+     * @return the TOC Writer that is being used to write the Table of Contents for this journal
      */
     TocWriter getTocWriter();
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriters.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriters.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriters.java
index 47b7c7e..cf8f7b4 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriters.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriters.java
@@ -25,14 +25,14 @@ import org.apache.nifi.provenance.toc.TocUtil;
 import org.apache.nifi.provenance.toc.TocWriter;
 
 public class RecordWriters {
-	private static final int DEFAULT_COMPRESSION_BLOCK_SIZE = 1024 * 1024;	// 1 MB
+    private static final int DEFAULT_COMPRESSION_BLOCK_SIZE = 1024 * 1024; // 1 MB
 
     public static RecordWriter newRecordWriter(final File file, final boolean compressed, final boolean createToc) throws IOException {
-    	return newRecordWriter(file, compressed, createToc, DEFAULT_COMPRESSION_BLOCK_SIZE);
+        return newRecordWriter(file, compressed, createToc, DEFAULT_COMPRESSION_BLOCK_SIZE);
     }
-    
+
     public static RecordWriter newRecordWriter(final File file, final boolean compressed, final boolean createToc, final int compressionBlockBytes) throws IOException {
-    	final TocWriter tocWriter = createToc ? new StandardTocWriter(TocUtil.getTocFile(file), false, false) : null;
+        final TocWriter tocWriter = createToc ? new StandardTocWriter(TocUtil.getTocFile(file), false, false) : null;
         return new StandardRecordWriter(file, tocWriter, compressed, compressionBlockBytes);
     }