You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/04/27 20:14:59 UTC

[1/4] incubator-nifi git commit: NIFI-527: Code cleanup

Repository: incubator-nifi
Updated Branches:
  refs/heads/develop 10860944d -> 384b2ac25


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocReader.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocReader.java
index 8944cec..7c13a2a 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocReader.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocReader.java
@@ -24,9 +24,9 @@ import java.io.IOException;
 
 /**
  * Standard implementation of TocReader.
- * 
+ *
  * Expects .toc file to be in the following format;
- * 
+ *
  * byte 0: version
  * byte 1: boolean: compressionFlag -> 0 = journal is NOT compressed, 1 = journal is compressed
  * byte 2-9: long: offset of block 0
@@ -37,21 +37,21 @@ import java.io.IOException;
 public class StandardTocReader implements TocReader {
     private final boolean compressed;
     private final long[] offsets;
-    
+
     public StandardTocReader(final File file) throws IOException {
         try (final FileInputStream fis = new FileInputStream(file);
-             final DataInputStream dis = new DataInputStream(fis)) {
-            
+                final DataInputStream dis = new DataInputStream(fis)) {
+
             final int version = dis.read();
             if ( version < 0 ) {
                 throw new EOFException();
             }
-            
+
             final int compressionFlag = dis.read();
             if ( compressionFlag < 0 ) {
                 throw new EOFException();
             }
-            
+
             if ( compressionFlag == 0 ) {
                 compressed = false;
             } else if ( compressionFlag == 1 ) {
@@ -59,21 +59,21 @@ public class StandardTocReader implements TocReader {
             } else {
                 throw new IOException("Table of Contents appears to be corrupt: could not read 'compression flag' from header; expected value of 0 or 1 but got " + compressionFlag);
             }
-            
+
             final int numBlocks = (int) ((file.length() - 2) / 8);
             offsets = new long[numBlocks];
-            
+
             for (int i=0; i < numBlocks; i++) {
                 offsets[i] = dis.readLong();
             }
         }
     }
-    
+
     @Override
     public boolean isCompressed() {
         return compressed;
     }
-    
+
     @Override
     public long getBlockOffset(final int blockIndex) {
         if ( blockIndex >= offsets.length ) {
@@ -89,20 +89,20 @@ public class StandardTocReader implements TocReader {
         }
         return offsets[offsets.length - 1];
     }
-    
+
     @Override
     public void close() throws IOException {
     }
 
-	@Override
-	public int getBlockIndex(final long blockOffset) {
-		for (int i=0; i < offsets.length; i++) {
-			if ( offsets[i] > blockOffset ) {
-				return i-1;
-			}
-		}
-		
-		return offsets.length - 1;
-	}
+    @Override
+    public int getBlockIndex(final long blockOffset) {
+        for (int i=0; i < offsets.length; i++) {
+            if ( offsets[i] > blockOffset ) {
+                return i-1;
+            }
+        }
+
+        return offsets.length - 1;
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocWriter.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocWriter.java
index 488f225..10de459 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocWriter.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/StandardTocWriter.java
@@ -19,7 +19,6 @@ package org.apache.nifi.provenance.toc;
 import java.io.BufferedOutputStream;
 import java.io.DataOutputStream;
 import java.io.File;
-import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.file.Files;
@@ -29,7 +28,7 @@ import org.slf4j.LoggerFactory;
 
 /**
  * Standard implementation of {@link TocWriter}.
- * 
+ *
  * Format of .toc file:
  * byte 0: version
  * byte 1: compressed: 0 -> not compressed, 1 -> compressed
@@ -39,27 +38,27 @@ import org.slf4j.LoggerFactory;
  * byte (N*8+2)-(N*8+9): long: offset of block N
  */
 public class StandardTocWriter implements TocWriter {
-	private static final Logger logger = LoggerFactory.getLogger(StandardTocWriter.class);
-	
+    private static final Logger logger = LoggerFactory.getLogger(StandardTocWriter.class);
+
     public static final byte VERSION = 1;
-    
+
     private final File file;
     private final FileOutputStream fos;
     private final boolean alwaysSync;
     private int index = -1;
-    
+
     /**
      * Creates a StandardTocWriter that writes to the given file.
      * @param file the file to write to
      * @param compressionFlag whether or not the journal is compressed
-     * @throws FileNotFoundException 
+     * @throws IOException if unable to write header info to the specified file
      */
     public StandardTocWriter(final File file, final boolean compressionFlag, final boolean alwaysSync) throws IOException {
         final File tocDir = file.getParentFile();
         if ( !tocDir.exists() ) {
-        	Files.createDirectories(tocDir.toPath());
+            Files.createDirectories(tocDir.toPath());
         }
-        
+
         this.file = file;
         fos = new FileOutputStream(file);
         this.alwaysSync = alwaysSync;
@@ -69,12 +68,12 @@ public class StandardTocWriter implements TocWriter {
         header[1] = (byte) (compressionFlag ? 1 : 0);
         fos.write(header);
         fos.flush();
-        
+
         if ( alwaysSync ) {
             sync();
         }
     }
-    
+
     @Override
     public void addBlockOffset(final long offset) throws IOException {
         final BufferedOutputStream bos = new BufferedOutputStream(fos);
@@ -83,17 +82,17 @@ public class StandardTocWriter implements TocWriter {
         dos.flush();
         index++;
         logger.debug("Adding block {} at offset {}", index, offset);
-        
+
         if ( alwaysSync ) {
             sync();
         }
     }
-    
+
     @Override
     public void sync() throws IOException {
-    	fos.getFD().sync();
+        fos.getFD().sync();
     }
-    
+
     @Override
     public int getCurrentBlockIndex() {
         return index;
@@ -104,15 +103,15 @@ public class StandardTocWriter implements TocWriter {
         if (alwaysSync) {
             fos.getFD().sync();
         }
-        
+
         fos.close();
     }
-    
+
     @Override
     public File getFile() {
         return file;
     }
-    
+
     @Override
     public String toString() {
         return "TOC Writer for " + file;

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocReader.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocReader.java
index 7c197be..97e2838 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocReader.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocReader.java
@@ -32,27 +32,31 @@ public interface TocReader extends Closeable {
 
     /**
      * Indicates whether or not the corresponding Journal file is compressed
-     * @return
+     * @return <code>true</code> if the event file is compressed
      */
     boolean isCompressed();
 
     /**
      * Returns the byte offset into the Journal File for the Block with the given index.
-     * @param blockIndex
-     * @return
+     *
+     * @param blockIndex the block index to get the byte offset for
+     * @return the byte offset for the given block index, or <code>-1</code> if the given block index
+     * does not exist
      */
     long getBlockOffset(int blockIndex);
-    
+
     /**
      * Returns the byte offset into the Journal File of the last Block in the given index
-     * @return
+     * @return the byte offset into the Journal File of the last Block in the given index
      */
     long getLastBlockOffset();
-    
+
     /**
      * Returns the index of the block that contains the given offset
-     * @param blockOffset
-     * @return
+     *
+     * @param blockOffset the byte offset for which the block index is desired
+     *
+     * @return the index of the block that contains the given offset
      */
     int getBlockIndex(long blockOffset);
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocUtil.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocUtil.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocUtil.java
index c30ac98..3fa7d67 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocUtil.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocUtil.java
@@ -22,16 +22,19 @@ import org.apache.nifi.provenance.lucene.LuceneUtil;
 
 public class TocUtil {
 
-	/**
-	 * Returns the file that should be used as the Table of Contents for the given Journal File
-	 * @param journalFile
-	 * @return
-	 */
-	public static File getTocFile(final File journalFile) {
-    	final File tocDir = new File(journalFile.getParentFile(), "toc");
-    	final String basename = LuceneUtil.substringBefore(journalFile.getName(), ".");
-    	final File tocFile = new File(tocDir, basename + ".toc");
-    	return tocFile;
-	}
-	
+    /**
+     * Returns the file that should be used as the Table of Contents for the given Journal File.
+     * Note, if no TOC exists for the given Journal File, a File will still be returned but the file
+     * will not actually exist.
+     *
+     * @param journalFile the journal file for which to get the Table of Contents
+     * @return the file that represents the Table of Contents for the specified journal file.
+     */
+    public static File getTocFile(final File journalFile) {
+        final File tocDir = new File(journalFile.getParentFile(), "toc");
+        final String basename = LuceneUtil.substringBefore(journalFile.getName(), ".");
+        final File tocFile = new File(tocDir, basename + ".toc");
+        return tocFile;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocWriter.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocWriter.java
index c678053..38f910f 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocWriter.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/toc/TocWriter.java
@@ -27,26 +27,24 @@ public interface TocWriter extends Closeable {
 
     /**
      * Adds the given block offset as the next Block Offset in the Table of Contents
-     * @param offset
-     * @throws IOException
+     * @param offset the byte offset at which the block begins
+     * @throws IOException if unable to persist the block index
      */
     void addBlockOffset(long offset) throws IOException;
-    
+
     /**
-     * Returns the index of the current Block
-     * @return
+     * @return the index of the current Block
      */
     int getCurrentBlockIndex();
-    
+
     /**
-     * Returns the file that is currently being written to
-     * @return
+     * @return the file that is currently being written to
      */
     File getFile();
 
     /**
      * Synchronizes the data with the underlying storage device
-     * @throws IOException
+     * @throws IOException if unable to synchronize the data with the underlying storage device
      */
     void sync() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
index 5541ab5..7d97bcd 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
@@ -75,7 +75,7 @@ public class TestPersistentProvenanceRepository {
 
     private PersistentProvenanceRepository repo;
     private RepositoryConfiguration config;
-    
+
     public static final int DEFAULT_ROLLOVER_MILLIS = 2000;
 
     private RepositoryConfiguration createConfiguration() {
@@ -89,9 +89,9 @@ public class TestPersistentProvenanceRepository {
 
     @BeforeClass
     public static void setLogLevel() {
-    	System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance", "DEBUG");
+        System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance", "DEBUG");
     }
-    
+
     @Before
     public void printTestName() {
         System.out.println("\n\n\n***********************  " + name.getMethodName() + "  *****************************");
@@ -105,33 +105,33 @@ public class TestPersistentProvenanceRepository {
             } catch (final IOException ioe) {
             }
         }
-        
+
         // Delete all of the storage files. We do this in order to clean up the tons of files that
         // we create but also to ensure that we have closed all of the file handles. If we leave any
         // streams open, for instance, this will throw an IOException, causing our unit test to fail.
         for ( final File storageDir : config.getStorageDirectories() ) {
-        	int i;
-        	for (i=0; i < 3; i++) {
-        		try {
-        			FileUtils.deleteFile(storageDir, true);
-        			break;
-	        	} catch (final IOException ioe) {
-	        		// if there is a virus scanner, etc. running in the background we may not be able to
-	        		// delete the file. Wait a sec and try again.
-	        		if ( i == 2 ) {
-	        			throw ioe;
-	        		} else {
-	        			try {
-	        				Thread.sleep(1000L);
-	        			} catch (final InterruptedException ie) {
-	        			}
-	        		}
-	        	}
-	        }
+            int i;
+            for (i=0; i < 3; i++) {
+                try {
+                    FileUtils.deleteFile(storageDir, true);
+                    break;
+                } catch (final IOException ioe) {
+                    // if there is a virus scanner, etc. running in the background we may not be able to
+                    // delete the file. Wait a sec and try again.
+                    if ( i == 2 ) {
+                        throw ioe;
+                    } else {
+                        try {
+                            Thread.sleep(1000L);
+                        } catch (final InterruptedException ie) {
+                        }
+                    }
+                }
+            }
         }
     }
 
-    
+
 
     private EventReporter getEventReporter() {
         return new EventReporter() {
@@ -241,7 +241,7 @@ public class TestPersistentProvenanceRepository {
         }
 
         Thread.sleep(1000L);
-        
+
         repo.close();
         Thread.sleep(500L); // Give the repo time to shutdown (i.e., close all file handles, etc.)
 
@@ -431,7 +431,7 @@ public class TestPersistentProvenanceRepository {
         repo.waitForRollover();
 
         final Query query = new Query(UUID.randomUUID().toString());
-//        query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.FlowFileUUID, "00000000-0000-0000-0000*"));
+        //        query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.FlowFileUUID, "00000000-0000-0000-0000*"));
         query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.Filename, "file-*"));
         query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, "12?4"));
         query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.TransitURI, "nifi://*"));
@@ -905,14 +905,14 @@ public class TestPersistentProvenanceRepository {
         secondRepo.initialize(getEventReporter());
 
         try {
-	        final ProvenanceEventRecord event11 = builder.build();
-	        secondRepo.registerEvent(event11);
-	        secondRepo.waitForRollover();
-	        final ProvenanceEventRecord event11Retrieved = secondRepo.getEvent(10L);
-	        assertNotNull(event11Retrieved);
-	        assertEquals(10, event11Retrieved.getEventId());
+            final ProvenanceEventRecord event11 = builder.build();
+            secondRepo.registerEvent(event11);
+            secondRepo.waitForRollover();
+            final ProvenanceEventRecord event11Retrieved = secondRepo.getEvent(10L);
+            assertNotNull(event11Retrieved);
+            assertEquals(10, event11Retrieved.getEventId());
         } finally {
-        	secondRepo.close();
+            secondRepo.close();
         }
     }
 
@@ -983,26 +983,26 @@ public class TestPersistentProvenanceRepository {
         storageDirFiles = config.getStorageDirectories().get(0).listFiles(indexFileFilter);
         assertEquals(0, storageDirFiles.length);
     }
-    
-    
+
+
     @Test
     public void testBackPressure() throws IOException, InterruptedException {
         final RepositoryConfiguration config = createConfiguration();
-        config.setMaxEventFileCapacity(1L);	// force rollover on each record.
+        config.setMaxEventFileCapacity(1L);  // force rollover on each record.
         config.setJournalCount(1);
-        
+
         final AtomicInteger journalCountRef = new AtomicInteger(0);
-        
-    	repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS) {
-    		@Override
-    		protected int getJournalCount() {
-    			return journalCountRef.get();
-    		}
-    	};
+
+        repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS) {
+            @Override
+            protected int getJournalCount() {
+                return journalCountRef.get();
+            }
+        };
         repo.initialize(getEventReporter());
 
-    	final Map<String, String> attributes = new HashMap<>();
-    	final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
+        final Map<String, String> attributes = new HashMap<>();
+        final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
         builder.setEventTime(System.currentTimeMillis());
         builder.setEventType(ProvenanceEventType.RECEIVE);
         builder.setTransitUri("nifi://unit-test");
@@ -1023,31 +1023,31 @@ public class TestPersistentProvenanceRepository {
 
         final AtomicLong threadNanos = new AtomicLong(0L);
         final Thread t = new Thread(new Runnable() {
-			@Override
-			public void run() {
-				final long start = System.nanoTime();
-		        builder.fromFlowFile(createFlowFile(13, 3000L, attributes));
-		        attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + 13);
-		        repo.registerEvent(builder.build());
-		        threadNanos.set(System.nanoTime() - start);
-			}
+            @Override
+            public void run() {
+                final long start = System.nanoTime();
+                builder.fromFlowFile(createFlowFile(13, 3000L, attributes));
+                attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + 13);
+                repo.registerEvent(builder.build());
+                threadNanos.set(System.nanoTime() - start);
+            }
         });
         t.start();
 
         Thread.sleep(1500L);
-        
+
         journalCountRef.set(1);
         t.join();
-        
+
         final int threadMillis = (int) TimeUnit.NANOSECONDS.toMillis(threadNanos.get());
-        assertTrue(threadMillis > 1200);	// use 1200 to account for the fact that the timing is not exact
-        
+        assertTrue(threadMillis > 1200); // use 1200 to account for the fact that the timing is not exact
+
         builder.fromFlowFile(createFlowFile(15, 3000L, attributes));
         attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + 15);
         repo.registerEvent(builder.build());
     }
-    
-    
+
+
     // TODO: test EOF on merge
     // TODO: Test journal with no records
 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java
index 6f85b94..136f244 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java
@@ -40,15 +40,15 @@ import org.junit.Test;
 public class TestStandardRecordReaderWriter {
     @BeforeClass
     public static void setLogLevel() {
-    	System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance", "DEBUG");
+        System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance", "DEBUG");
     }
 
-	private ProvenanceEventRecord createEvent() {
-		final Map<String, String> attributes = new HashMap<>();
-		attributes.put("filename", "1.txt");
+    private ProvenanceEventRecord createEvent() {
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("filename", "1.txt");
         attributes.put("uuid", UUID.randomUUID().toString());
 
-		final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
+        final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
         builder.setEventTime(System.currentTimeMillis());
         builder.setEventType(ProvenanceEventType.RECEIVE);
         builder.setTransitUri("nifi://unit-test");
@@ -58,132 +58,132 @@ public class TestStandardRecordReaderWriter {
         final ProvenanceEventRecord record = builder.build();
 
         return record;
-	}
-	
-	@Test
-	public void testSimpleWriteWithToc() throws IOException {
+    }
+
+    @Test
+    public void testSimpleWriteWithToc() throws IOException {
         final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite");
         final File tocFile = TocUtil.getTocFile(journalFile);
         final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false);
         final StandardRecordWriter writer = new StandardRecordWriter(journalFile, tocWriter, false, 1024 * 1024);
-        
+
         writer.writeHeader();
         writer.writeRecord(createEvent(), 1L);
         writer.close();
 
         final TocReader tocReader = new StandardTocReader(tocFile);
-        
+
         try (final FileInputStream fis = new FileInputStream(journalFile);
-        	final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) {
-        	assertEquals(0, reader.getBlockIndex());
-        	reader.skipToBlock(0);
-        	StandardProvenanceEventRecord recovered = reader.nextRecord();
-        	assertNotNull(recovered);
-        	
-        	assertEquals("nifi://unit-test", recovered.getTransitUri());
-        	assertNull(reader.nextRecord());
+                final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) {
+            assertEquals(0, reader.getBlockIndex());
+            reader.skipToBlock(0);
+            StandardProvenanceEventRecord recovered = reader.nextRecord();
+            assertNotNull(recovered);
+
+            assertEquals("nifi://unit-test", recovered.getTransitUri());
+            assertNull(reader.nextRecord());
         }
-        
+
         FileUtils.deleteFile(journalFile.getParentFile(), true);
-	}
-	
-	
-	@Test
-	public void testSingleRecordCompressed() throws IOException {
+    }
+
+
+    @Test
+    public void testSingleRecordCompressed() throws IOException {
         final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite.gz");
         final File tocFile = TocUtil.getTocFile(journalFile);
         final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false);
         final StandardRecordWriter writer = new StandardRecordWriter(journalFile, tocWriter, true, 100);
-        
+
         writer.writeHeader();
         writer.writeRecord(createEvent(), 1L);
         writer.close();
 
         final TocReader tocReader = new StandardTocReader(tocFile);
-        
+
         try (final FileInputStream fis = new FileInputStream(journalFile);
-        	final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) {
-        	assertEquals(0, reader.getBlockIndex());
-        	reader.skipToBlock(0);
-        	StandardProvenanceEventRecord recovered = reader.nextRecord();
-        	assertNotNull(recovered);
-        	
-        	assertEquals("nifi://unit-test", recovered.getTransitUri());
-        	assertNull(reader.nextRecord());
+                final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) {
+            assertEquals(0, reader.getBlockIndex());
+            reader.skipToBlock(0);
+            StandardProvenanceEventRecord recovered = reader.nextRecord();
+            assertNotNull(recovered);
+
+            assertEquals("nifi://unit-test", recovered.getTransitUri());
+            assertNull(reader.nextRecord());
         }
-        
+
         FileUtils.deleteFile(journalFile.getParentFile(), true);
-	}
-	
-	
-	@Test
-	public void testMultipleRecordsSameBlockCompressed() throws IOException {
+    }
+
+
+    @Test
+    public void testMultipleRecordsSameBlockCompressed() throws IOException {
         final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite.gz");
         final File tocFile = TocUtil.getTocFile(journalFile);
         final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false);
         // new record each 1 MB of uncompressed data
         final StandardRecordWriter writer = new StandardRecordWriter(journalFile, tocWriter, true, 1024 * 1024);
-        
+
         writer.writeHeader();
         for (int i=0; i < 10; i++) {
-        	writer.writeRecord(createEvent(), i);
+            writer.writeRecord(createEvent(), i);
         }
         writer.close();
 
         final TocReader tocReader = new StandardTocReader(tocFile);
-        
+
         try (final FileInputStream fis = new FileInputStream(journalFile);
-        	final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) {
-        	for (int i=0; i < 10; i++) {
-	        	assertEquals(0, reader.getBlockIndex());
-	        	
-	        	// call skipToBlock half the time to ensure that we can; avoid calling it
-	        	// the other half of the time to ensure that it's okay.
-	        	if (i <= 5) {
-	        		reader.skipToBlock(0);
-	        	}
-	        	
-	        	StandardProvenanceEventRecord recovered = reader.nextRecord();
-	        	assertNotNull(recovered);
-	        	assertEquals("nifi://unit-test", recovered.getTransitUri());
-        	}
-        	
-        	assertNull(reader.nextRecord());
+                final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) {
+            for (int i=0; i < 10; i++) {
+                assertEquals(0, reader.getBlockIndex());
+
+                // call skipToBlock half the time to ensure that we can; avoid calling it
+                // the other half of the time to ensure that it's okay.
+                if (i <= 5) {
+                    reader.skipToBlock(0);
+                }
+
+                StandardProvenanceEventRecord recovered = reader.nextRecord();
+                assertNotNull(recovered);
+                assertEquals("nifi://unit-test", recovered.getTransitUri());
+            }
+
+            assertNull(reader.nextRecord());
         }
-        
+
         FileUtils.deleteFile(journalFile.getParentFile(), true);
-	}
-	
-	
-	@Test
-	public void testMultipleRecordsMultipleBlocksCompressed() throws IOException {
+    }
+
+
+    @Test
+    public void testMultipleRecordsMultipleBlocksCompressed() throws IOException {
         final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite.gz");
         final File tocFile = TocUtil.getTocFile(journalFile);
         final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false);
         // new block each 10 bytes
         final StandardRecordWriter writer = new StandardRecordWriter(journalFile, tocWriter, true, 100);
-        
+
         writer.writeHeader();
         for (int i=0; i < 10; i++) {
-        	writer.writeRecord(createEvent(), i);
+            writer.writeRecord(createEvent(), i);
         }
         writer.close();
 
         final TocReader tocReader = new StandardTocReader(tocFile);
-        
+
         try (final FileInputStream fis = new FileInputStream(journalFile);
-        	final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) {
-        	for (int i=0; i < 10; i++) {
-	        	StandardProvenanceEventRecord recovered = reader.nextRecord();
-	        	System.out.println(recovered);
-	        	assertNotNull(recovered);
-	        	assertEquals((long) i, recovered.getEventId());
-	        	assertEquals("nifi://unit-test", recovered.getTransitUri());
-        	}
-        	
-        	assertNull(reader.nextRecord());
+                final StandardRecordReader reader = new StandardRecordReader(fis, journalFile.getName(), tocReader)) {
+            for (int i=0; i < 10; i++) {
+                StandardProvenanceEventRecord recovered = reader.nextRecord();
+                System.out.println(recovered);
+                assertNotNull(recovered);
+                assertEquals((long) i, recovered.getEventId());
+                assertEquals("nifi://unit-test", recovered.getTransitUri());
+            }
+
+            assertNull(reader.nextRecord());
         }
-        
+
         FileUtils.deleteFile(journalFile.getParentFile(), true);
-	}
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java
index 7459fe8..eb0f736 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java
@@ -24,7 +24,7 @@ import java.util.Set;
 import org.apache.nifi.flowfile.FlowFile;
 
 public class TestUtil {
-	public static FlowFile createFlowFile(final long id, final long fileSize, final Map<String, String> attributes) {
+    public static FlowFile createFlowFile(final long id, final long fileSize, final Map<String, String> attributes) {
         final Map<String, String> attrCopy = new HashMap<>(attributes);
 
         return new FlowFile() {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocReader.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocReader.java
index 30326e7..87400a0 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocReader.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocReader.java
@@ -38,7 +38,7 @@ public class TestStandardTocReader {
             out.write(0);
             out.write(0);
         }
-        
+
         try {
             try(final StandardTocReader reader = new StandardTocReader(file)) {
                 assertFalse(reader.isCompressed());
@@ -46,13 +46,13 @@ public class TestStandardTocReader {
         } finally {
             file.delete();
         }
-        
-        
+
+
         try (final OutputStream out = new FileOutputStream(file)) {
             out.write(0);
             out.write(1);
         }
-        
+
         try {
             try(final StandardTocReader reader = new StandardTocReader(file)) {
                 assertTrue(reader.isCompressed());
@@ -61,25 +61,25 @@ public class TestStandardTocReader {
             file.delete();
         }
     }
-    
-    
+
+
     @Test
     public void testGetBlockIndex() throws IOException {
         final File file = new File("target/" + UUID.randomUUID().toString());
         try (final OutputStream out = new FileOutputStream(file);
-             final DataOutputStream dos = new DataOutputStream(out)) {
+                final DataOutputStream dos = new DataOutputStream(out)) {
             out.write(0);
             out.write(0);
-            
+
             for (int i=0; i < 1024; i++) {
                 dos.writeLong(i * 1024L);
             }
         }
-        
+
         try {
             try(final StandardTocReader reader = new StandardTocReader(file)) {
                 assertFalse(reader.isCompressed());
-                
+
                 for (int i=0; i < 1024; i++) {
                     assertEquals(i * 1024, reader.getBlockOffset(i));
                 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocWriter.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocWriter.java
index 70f55a2..aebe0d5 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocWriter.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/toc/TestStandardTocWriter.java
@@ -31,12 +31,12 @@ public class TestStandardTocWriter {
         final File tocFile = new File("target/" + UUID.randomUUID().toString() + ".toc");
         try {
             assertTrue( tocFile.createNewFile() );
-            
+
             try (final StandardTocWriter writer = new StandardTocWriter(tocFile, false, false)) {
             }
         } finally {
             FileUtils.deleteFile(tocFile, false);
         }
     }
-    
+
 }


[4/4] incubator-nifi git commit: Merge branch 'develop' of http://git-wip-us.apache.org/repos/asf/incubator-nifi into develop

Posted by ma...@apache.org.
Merge branch 'develop' of http://git-wip-us.apache.org/repos/asf/incubator-nifi into develop


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/384b2ac2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/384b2ac2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/384b2ac2

Branch: refs/heads/develop
Commit: 384b2ac2535987a42ae36568d285c829461a1587
Parents: 3cd18b0 1086094
Author: Mark Payne <ma...@hotmail.com>
Authored: Mon Apr 27 14:14:48 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Mon Apr 27 14:14:48 2015 -0400

----------------------------------------------------------------------
 nifi-parent/pom.xml                             |   6 +-
 .../nifi-hadoop-bundle/nifi-hadoop-nar/pom.xml  |  28 +--
 .../hadoop/AbstractHadoopProcessor.java         |   7 +-
 .../hadoop/CreateHadoopSequenceFile.java        |  28 +--
 .../apache/nifi/processors/hadoop/GetHDFS.java  |  60 +++----
 .../processors/hadoop/GetHDFSSequenceFile.java  |  18 +-
 .../nifi/processors/hadoop/KeyValueReader.java  |   6 +-
 .../nifi/processors/hadoop/ValueReader.java     |   5 +-
 .../hadoop/util/ByteFilteringOutputStream.java  |  24 +--
 .../hadoop/util/InputStreamWritable.java        |   6 +-
 .../hadoop/util/OutputStreamWritable.java       |   3 +-
 .../hadoop/util/SequenceFileWriter.java         |  12 +-
 .../nifi/processors/standard/BinFiles.java      |  15 +-
 .../processors/standard/CompressContent.java    |   9 +-
 .../nifi/processors/standard/ControlRate.java   |  11 +-
 .../standard/ConvertCharacterSet.java           |  22 ++-
 .../processors/standard/DistributeLoad.java     |  17 +-
 .../processors/standard/EvaluateJsonPath.java   |  36 ++--
 .../nifi/processors/standard/EvaluateXPath.java |   9 +-
 .../processors/standard/EvaluateXQuery.java     |   6 +-
 .../processors/standard/ExecuteProcess.java     |   9 +-
 .../standard/ExecuteStreamCommand.java          |  21 +--
 .../nifi/processors/standard/ExtractText.java   |   3 +-
 .../processors/standard/GenerateFlowFile.java   |   3 +-
 .../nifi/processors/standard/GetFile.java       |   6 +-
 .../nifi/processors/standard/GetJMSTopic.java   |   3 +-
 .../processors/standard/HandleHttpRequest.java  |  17 +-
 .../processors/standard/HandleHttpResponse.java |   3 +-
 .../nifi/processors/standard/HashAttribute.java |  10 +-
 .../nifi/processors/standard/InvokeHTTP.java    |  24 ++-
 .../nifi/processors/standard/JmsConsumer.java   |   6 +-
 .../nifi/processors/standard/ListenUDP.java     | 178 +++++++++----------
 .../nifi/processors/standard/MergeContent.java  |   6 +-
 .../nifi/processors/standard/PostHTTP.java      |  47 +++--
 .../nifi/processors/standard/PutEmail.java      |   6 +-
 .../apache/nifi/processors/standard/PutFTP.java |   9 +-
 .../processors/standard/PutFileTransfer.java    |  11 +-
 .../apache/nifi/processors/standard/PutJMS.java |   3 +-
 .../nifi/processors/standard/PutSFTP.java       |   6 +-
 .../nifi/processors/standard/ReplaceText.java   |  28 ++-
 .../standard/ReplaceTextWithMapping.java        |  13 +-
 .../processors/standard/RouteOnAttribute.java   |   6 +-
 .../nifi/processors/standard/ScanAttribute.java |   5 +-
 .../nifi/processors/standard/SplitContent.java  |   9 +-
 .../nifi/processors/standard/SplitText.java     |  18 +-
 .../nifi/processors/standard/SplitXml.java      |   3 +-
 .../nifi/processors/standard/TransformXml.java  |  52 +++---
 .../nifi/processors/standard/UnpackContent.java |  39 ++--
 .../nifi/processors/standard/ValidateXml.java   |  51 +++---
 .../servlets/ContentAcknowledgmentServlet.java  |   5 -
 .../standard/servlets/ListenHTTPServlet.java    |   5 -
 .../nifi/processors/standard/util/Bin.java      |  22 +--
 .../processors/standard/util/BinManager.java    |   2 +-
 .../standard/util/DocumentReaderCallback.java   |  10 +-
 .../processors/standard/util/FTPTransfer.java   |   3 +-
 .../nifi/processors/standard/util/FTPUtils.java |   2 +-
 .../processors/standard/util/FileTransfer.java  |  29 ++-
 .../processors/standard/util/SFTPTransfer.java  |   9 +-
 .../standard/util/XmlSplitterSaxParser.java     |  11 +-
 .../processors/standard/TestDistributeLoad.java |   3 +-
 .../standard/TestHandleHttpRequest.java         |   4 +-
 .../distributed/cache/client/CommsSession.java  |  16 +-
 .../DistributedMapCacheClientService.java       |   7 +-
 .../DistributedSetCacheClientService.java       |   6 +-
 .../cache/client/SSLCommsSession.java           |  25 +--
 .../cache/client/StandardCommsSession.java      |   1 +
 .../additionalDetails.html                      |  60 +++----
 .../cache/server/AbstractCacheServer.java       |  25 +--
 .../distributed/cache/server/CacheRecord.java   |  12 +-
 .../distributed/cache/server/CacheServer.java   |   3 +-
 .../cache/server/DistributedCacheServer.java    |   3 +-
 .../cache/server/DistributedSetCacheServer.java |  13 +-
 .../cache/server/EvictionPolicy.java            |  24 +--
 .../cache/server/SetCacheServer.java            |  25 +--
 .../server/map/DistributedMapCacheServer.java   |  12 +-
 .../distributed/cache/server/map/MapCache.java  |   4 +
 .../cache/server/map/MapCacheRecord.java        |  19 +-
 .../cache/server/map/MapCacheServer.java        | 113 ++++++------
 .../cache/server/map/MapPutResult.java          |   5 +-
 .../cache/server/map/PersistentMapCache.java    |  51 +++---
 .../cache/server/map/SimpleMapCache.java        |  47 ++---
 .../cache/server/set/PersistentSetCache.java    |  57 +++---
 .../distributed/cache/server/set/SetCache.java  |   5 +-
 .../cache/server/set/SetCacheRecord.java        |  15 +-
 .../cache/server/set/SetCacheResult.java        |  11 +-
 .../cache/server/set/SimpleSetCache.java        |  41 ++---
 .../additionalDetails.html                      |  62 +++----
 .../cache/server/TestServerAndClient.java       |   9 +-
 .../nifi-http-context-map-api/pom.xml           |  34 ++--
 .../org/apache/nifi/http/HttpContextMap.java    |  45 +++--
 .../nifi-http-context-map/pom.xml               |  20 +--
 .../nifi/http/StandardHttpContextMap.java       |  83 ++++-----
 .../index.html                                  |  36 ++--
 .../nifi/ssl/StandardSSLContextService.java     |   3 +-
 .../apache/nifi/ssl/SSLContextServiceTest.java  |   4 +-
 95 files changed, 916 insertions(+), 1008 deletions(-)
----------------------------------------------------------------------



[3/4] incubator-nifi git commit: NIFI-527: Code cleanup

Posted by ma...@apache.org.
NIFI-527: Code cleanup


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/3cd18b0b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/3cd18b0b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/3cd18b0b

Branch: refs/heads/develop
Commit: 3cd18b0babc5133e35a2771bc0d0acaf974c381f
Parents: 666de3d
Author: Mark Payne <ma...@hotmail.com>
Authored: Mon Apr 27 14:13:55 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Mon Apr 27 14:13:55 2015 -0400

----------------------------------------------------------------------
 .../nifi/provenance/IndexConfiguration.java     |  12 +-
 .../PersistentProvenanceRepository.java         | 612 +++++++-------
 .../provenance/RepositoryConfiguration.java     | 106 +--
 .../nifi/provenance/StandardRecordReader.java   | 246 +++---
 .../nifi/provenance/StandardRecordWriter.java   | 138 ++--
 .../provenance/expiration/ExpirationAction.java |   6 +-
 .../provenance/lucene/DeleteIndexAction.java    |  12 +-
 .../nifi/provenance/lucene/DocsReader.java      |  79 +-
 .../nifi/provenance/lucene/IndexManager.java    | 820 +++++++++----------
 .../nifi/provenance/lucene/IndexSearch.java     |  38 +-
 .../nifi/provenance/lucene/IndexingAction.java  | 119 +--
 .../nifi/provenance/lucene/LineageQuery.java    |   6 +-
 .../nifi/provenance/lucene/LuceneUtil.java      |  38 +-
 .../provenance/rollover/CompressionAction.java  |  59 --
 .../provenance/rollover/RolloverAction.java     |  35 -
 .../provenance/serialization/RecordReader.java  |  57 +-
 .../provenance/serialization/RecordReaders.java | 136 +--
 .../provenance/serialization/RecordWriter.java  |  23 +-
 .../provenance/serialization/RecordWriters.java |   8 +-
 .../nifi/provenance/toc/StandardTocReader.java  |  44 +-
 .../nifi/provenance/toc/StandardTocWriter.java  |  35 +-
 .../apache/nifi/provenance/toc/TocReader.java   |  20 +-
 .../org/apache/nifi/provenance/toc/TocUtil.java |  27 +-
 .../apache/nifi/provenance/toc/TocWriter.java   |  16 +-
 .../TestPersistentProvenanceRepository.java     | 118 +--
 .../TestStandardRecordReaderWriter.java         | 162 ++--
 .../org/apache/nifi/provenance/TestUtil.java    |   2 +-
 .../provenance/toc/TestStandardTocReader.java   |  20 +-
 .../provenance/toc/TestStandardTocWriter.java   |   4 +-
 29 files changed, 1391 insertions(+), 1607 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java
index a5474d5..3beab65 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java
@@ -92,7 +92,7 @@ public class IndexConfiguration {
             }
             return firstRecord.getEventTime();
         } catch (final FileNotFoundException | EOFException fnf) {
-            return null;	// file no longer exists or there's no record in this file
+            return null; // file no longer exists or there's no record in this file
         } catch (final IOException ioe) {
             logger.warn("Failed to read first entry in file {} due to {}", provenanceLogFile, ioe.toString());
             logger.warn("", ioe);
@@ -201,7 +201,8 @@ public class IndexConfiguration {
      * desired
      * @param endTime the end time of the query for which the indices are
      * desired
-     * @return
+     * @return the index directories that are applicable only for the given time
+     * span (times inclusive).
      */
     public List<File> getIndexDirectories(final Long startTime, final Long endTime) {
         if (startTime == null && endTime == null) {
@@ -252,7 +253,8 @@ public class IndexConfiguration {
      *
      * @param provenanceLogFile the provenance log file for which the index
      * directories are desired
-     * @return
+     * @return the index directories that are applicable only for the given
+     * event log
      */
     public List<File> getIndexDirectories(final File provenanceLogFile) {
         final List<File> dirs = new ArrayList<>();
@@ -334,9 +336,7 @@ public class IndexConfiguration {
     }
 
     /**
-     * Returns the amount of disk space in bytes used by all of the indices
-     *
-     * @return
+     * @return the amount of disk space in bytes used by all of the indices
      */
     public long getIndexSize() {
         lock.lock();

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
index 48cc164..fe89a5e 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
@@ -139,7 +139,6 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
 
     private final List<ExpirationAction> expirationActions = new ArrayList<>();
 
-    private final IndexingAction indexingAction;
     private final ConcurrentMap<String, AsyncQuerySubmission> querySubmissionMap = new ConcurrentHashMap<>();
     private final ConcurrentMap<String, AsyncLineageSubmission> lineageSubmissionMap = new ConcurrentHashMap<>();
 
@@ -151,7 +150,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
     private final AtomicBoolean initialized = new AtomicBoolean(false);
 
     private final AtomicBoolean repoDirty = new AtomicBoolean(false);
-    // we keep the last 1000 records on hand so that when the UI is opened and it asks for the last 1000 records we don't need to 
+    // we keep the last 1000 records on hand so that when the UI is opened and it asks for the last 1000 records we don't need to
     // read them. Since this is a very cheap operation to keep them, it's worth the tiny expense for the improved user experience.
     private final RingBuffer<ProvenanceEventRecord> latestRecords = new RingBuffer<>(1000);
     private EventReporter eventReporter;
@@ -184,13 +183,6 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
         this.indexManager = new IndexManager();
         this.alwaysSync = configuration.isAlwaysSync();
         this.rolloverCheckMillis = rolloverCheckMillis;
-        
-        final List<SearchableField> fields = configuration.getSearchableFields();
-        if (fields != null && !fields.isEmpty()) {
-            indexingAction = new IndexingAction(this, indexConfig);
-        } else {
-            indexingAction = null;
-        }
 
         scheduledExecService = Executors.newScheduledThreadPool(3, new NamedThreadFactory("Provenance Maintenance Thread"));
         queryExecService = Executors.newFixedThreadPool(configuration.getQueryThreadPoolSize(), new NamedThreadFactory("Provenance Query Thread"));
@@ -205,69 +197,69 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
 
     @Override
     public void initialize(final EventReporter eventReporter) throws IOException {
-    	writeLock.lock();
-    	try {
-	        if (initialized.getAndSet(true)) {
-	            return;
-	        }
-	
-	        this.eventReporter = eventReporter;
-	
-	        recover();
-	
-	        if (configuration.isAllowRollover()) {
-	            writers = createWriters(configuration, idGenerator.get());
-	        }
-	
-	        if (configuration.isAllowRollover()) {
-	            scheduledExecService.scheduleWithFixedDelay(new Runnable() {
-	                @Override
-	                public void run() {
-	                    // Check if we need to roll over
-	                    if (needToRollover()) {
-	                        // it appears that we do need to roll over. Obtain write lock so that we can do so, and then
-	                        // confirm that we still need to.
-	                        writeLock.lock();
-	                        try {
-	                            logger.debug("Obtained write lock to perform periodic rollover");
-	
-	                            if (needToRollover()) {
-	                                try {
-	                                    rollover(false);
-	                                } catch (final Exception e) {
-	                                    logger.error("Failed to roll over Provenance Event Log due to {}", e.toString());
-	                                    logger.error("", e);
-	                                }
-	                            }
-	                        } finally {
-	                            writeLock.unlock();
-	                        }
-	                    }
-	                }
-	            }, rolloverCheckMillis, rolloverCheckMillis, TimeUnit.MILLISECONDS);
-	
-	            scheduledExecService.scheduleWithFixedDelay(new RemoveExpiredQueryResults(), 30L, 3L, TimeUnit.SECONDS);
-	            scheduledExecService.scheduleWithFixedDelay(new Runnable() {
-	                @Override
-	                public void run() {
-	                    try {
-	                        purgeOldEvents();
-	                    } catch (final Exception e) {
-	                        logger.error("Failed to purge old events from Provenance Repo due to {}", e.toString());
-	                        if (logger.isDebugEnabled()) {
-	                            logger.error("", e);
-	                        }
-	                        eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Failed to purge old events from Provenance Repo due to " + e.toString());
-	                    }
-	                }
-	            }, 1L, 1L, TimeUnit.MINUTES);
-	
-	            expirationActions.add(new DeleteIndexAction(this, indexConfig, indexManager));
-	            expirationActions.add(new FileRemovalAction());
-	        }
-    	} finally {
-    		writeLock.unlock();
-    	}
+        writeLock.lock();
+        try {
+            if (initialized.getAndSet(true)) {
+                return;
+            }
+
+            this.eventReporter = eventReporter;
+
+            recover();
+
+            if (configuration.isAllowRollover()) {
+                writers = createWriters(configuration, idGenerator.get());
+            }
+
+            if (configuration.isAllowRollover()) {
+                scheduledExecService.scheduleWithFixedDelay(new Runnable() {
+                    @Override
+                    public void run() {
+                        // Check if we need to roll over
+                        if (needToRollover()) {
+                            // it appears that we do need to roll over. Obtain write lock so that we can do so, and then
+                            // confirm that we still need to.
+                            writeLock.lock();
+                            try {
+                                logger.debug("Obtained write lock to perform periodic rollover");
+
+                                if (needToRollover()) {
+                                    try {
+                                        rollover(false);
+                                    } catch (final Exception e) {
+                                        logger.error("Failed to roll over Provenance Event Log due to {}", e.toString());
+                                        logger.error("", e);
+                                    }
+                                }
+                            } finally {
+                                writeLock.unlock();
+                            }
+                        }
+                    }
+                }, rolloverCheckMillis, rolloverCheckMillis, TimeUnit.MILLISECONDS);
+
+                scheduledExecService.scheduleWithFixedDelay(new RemoveExpiredQueryResults(), 30L, 3L, TimeUnit.SECONDS);
+                scheduledExecService.scheduleWithFixedDelay(new Runnable() {
+                    @Override
+                    public void run() {
+                        try {
+                            purgeOldEvents();
+                        } catch (final Exception e) {
+                            logger.error("Failed to purge old events from Provenance Repo due to {}", e.toString());
+                            if (logger.isDebugEnabled()) {
+                                logger.error("", e);
+                            }
+                            eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Failed to purge old events from Provenance Repo due to " + e.toString());
+                        }
+                    }
+                }, 1L, 1L, TimeUnit.MINUTES);
+
+                expirationActions.add(new DeleteIndexAction(this, indexConfig, indexManager));
+                expirationActions.add(new FileRemovalAction());
+            }
+        } finally {
+            writeLock.unlock();
+        }
     }
 
     private static RepositoryConfiguration createRepositoryConfiguration() throws IOException {
@@ -489,28 +481,26 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
                 maxIdFile = file;
             }
 
-            if (firstId > maxIndexedId && indexingAction != null && indexingAction.hasBeenPerformed(file)) {
+            if (firstId > maxIndexedId) {
                 maxIndexedId = firstId - 1;
             }
 
-            if (firstId < minIndexedId && indexingAction != null && indexingAction.hasBeenPerformed(file)) {
+            if (firstId < minIndexedId) {
                 minIndexedId = firstId;
             }
         }
 
         if (maxIdFile != null) {
-            final boolean lastFileIndexed = indexingAction == null ? false : indexingAction.hasBeenPerformed(maxIdFile);
-
             // Determine the max ID in the last file.
             try (final RecordReader reader = RecordReaders.newRecordReader(maxIdFile, getAllLogFiles())) {
-            	final long eventId = reader.getMaxEventId();
+                final long eventId = reader.getMaxEventId();
                 if (eventId > maxId) {
                     maxId = eventId;
                 }
 
                 // If the ID is greater than the max indexed id and this file was indexed, then
                 // update the max indexed id
-                if (eventId > maxIndexedId && lastFileIndexed) {
+                if (eventId > maxIndexedId) {
                     maxIndexedId = eventId;
                 }
             } catch (final IOException ioe) {
@@ -567,7 +557,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
             // Read the records in the last file to find its max id
             if (greatestMinIdFile != null) {
                 try (final RecordReader recordReader = RecordReaders.newRecordReader(greatestMinIdFile, Collections.<Path>emptyList())) {
-                	maxId = recordReader.getMaxEventId();
+                    maxId = recordReader.getMaxEventId();
                 }
             }
 
@@ -604,11 +594,11 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
             queryExecService.shutdownNow();
 
             indexManager.close();
-            
+
             if ( writers != null ) {
-	            for (final RecordWriter writer : writers) {
-	                writer.close();
-	            }
+                for (final RecordWriter writer : writers) {
+                    writer.close();
+                }
             }
         } finally {
             writeLock.unlock();
@@ -624,7 +614,8 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
         readLock.lock();
         try {
             if (repoDirty.get()) {
-                logger.debug("Cannot persist provenance record because there was an IOException last time a record persistence was attempted. Will not attempt to persist more records until the repo has been rolled over.");
+                logger.debug("Cannot persist provenance record because there was an IOException last time a record persistence was attempted. "
+                        + "Will not attempt to persist more records until the repo has been rolled over.");
                 return;
             }
 
@@ -670,7 +661,8 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
             } catch (final IOException ioe) {
                 logger.error("Failed to persist Provenance Event due to {}. Will not attempt to write to the Provenance Repository again until the repository has rolled over.", ioe.toString());
                 logger.error("", ioe);
-                eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Failed to persist Provenance Event due to " + ioe.toString() + ". Will not attempt to write to the Provenance Repository again until the repository has rolled over");
+                eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Failed to persist Provenance Event due to " + ioe.toString() +
+                        ". Will not attempt to write to the Provenance Repository again until the repository has rolled over");
 
                 // Switch from readLock to writeLock so that we can perform rollover
                 readLock.unlock();
@@ -735,9 +727,9 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
     /**
      * Returns the size, in bytes, of the Repository storage
      *
-     * @param logFiles
-     * @param timeCutoff
-     * @return
+     * @param logFiles the log files to consider
+     * @param timeCutoff if a log file's last modified date is before timeCutoff, it will be skipped
+     * @return the size of all log files given whose last mod date comes after (or equal to) timeCutoff
      */
     public long getSize(final List<File> logFiles, final long timeCutoff) {
         long bytesUsed = 0L;
@@ -760,7 +752,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
     /**
      * Purges old events from the repository
      *
-     * @throws IOException
+     * @throws IOException if unable to purge old events due to an I/O problem
      */
     void purgeOldEvents() throws IOException {
         while (!recoveryFinished.get()) {
@@ -858,12 +850,16 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
 
                 removed.add(baseName);
             } catch (final FileNotFoundException fnf) {
-                logger.warn("Failed to perform Expiration Action {} on Provenance Event file {} because the file no longer exists; will not perform additional Expiration Actions on this file", currentAction, file);
+                logger.warn("Failed to perform Expiration Action {} on Provenance Event file {} because the file no longer exists; will not "
+                        + "perform additional Expiration Actions on this file", currentAction, file);
                 removed.add(baseName);
             } catch (final Throwable t) {
-                logger.warn("Failed to perform Expiration Action {} on Provenance Event file {} due to {}; will not perform additional Expiration Actions on this file at this time", currentAction, file, t.toString());
+                logger.warn("Failed to perform Expiration Action {} on Provenance Event file {} due to {}; will not perform additional "
+                        + "Expiration Actions on this file at this time", currentAction, file, t.toString());
                 logger.warn("", t);
-                eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed to perform Expiration Action " + currentAction + " on Provenance Event file " + file + " due to " + t.toString() + "; will not perform additional Expiration Actions on this file at this time");
+                eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed to perform Expiration Action " + currentAction +
+                        " on Provenance Event file " + file + " due to " + t.toString() + "; will not perform additional Expiration Actions " +
+                        "on this file at this time");
             }
         }
 
@@ -906,24 +902,24 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
 
     // made protected for testing purposes
     protected int getJournalCount() {
-    	// determine how many 'journals' we have in the journals directories
+        // determine how many 'journals' we have in the journals directories
         int journalFileCount = 0;
         for ( final File storageDir : configuration.getStorageDirectories() ) {
-        	final File journalsDir = new File(storageDir, "journals");
-        	final File[] journalFiles = journalsDir.listFiles();
-        	if ( journalFiles != null ) {
-        		journalFileCount += journalFiles.length;
-        	}
+            final File journalsDir = new File(storageDir, "journals");
+            final File[] journalFiles = journalsDir.listFiles();
+            if ( journalFiles != null ) {
+                journalFileCount += journalFiles.length;
+            }
         }
-        
+
         return journalFileCount;
     }
-    
+
     /**
      * MUST be called with the write lock held
      *
-     * @param force
-     * @throws IOException
+     * @param force if true, will force a rollover regardless of whether or not data has been written
+     * @throws IOException if unable to complete rollover
      */
     private void rollover(final boolean force) throws IOException {
         if (!configuration.isAllowRollover()) {
@@ -938,44 +934,44 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
                 final File writerFile = writer.getFile();
                 journalsToMerge.add(writerFile);
                 try {
-                	writer.close();
+                    writer.close();
                 } catch (final IOException ioe) {
-                	logger.warn("Failed to close {} due to {}", writer, ioe.toString());
-                	if ( logger.isDebugEnabled() ) {
-                		logger.warn("", ioe);
-                	}
+                    logger.warn("Failed to close {} due to {}", writer, ioe.toString());
+                    if ( logger.isDebugEnabled() ) {
+                        logger.warn("", ioe);
+                    }
                 }
             }
             if ( logger.isDebugEnabled() ) {
-            	logger.debug("Going to merge {} files for journals starting with ID {}", journalsToMerge.size(), LuceneUtil.substringBefore(journalsToMerge.get(0).getName(), "."));
+                logger.debug("Going to merge {} files for journals starting with ID {}", journalsToMerge.size(), LuceneUtil.substringBefore(journalsToMerge.get(0).getName(), "."));
             }
 
             int journalFileCount = getJournalCount();
             final int journalCountThreshold = configuration.getJournalCount() * 5;
             if ( journalFileCount > journalCountThreshold ) {
-            	logger.warn("The rate of the dataflow is exceeding the provenance recording rate. "
-            			+ "Slowing down flow to accomodate. Currently, there are {} journal files and "
-            			+ "threshold for blocking is {}", journalFileCount, journalCountThreshold);
-            	eventReporter.reportEvent(Severity.WARNING, "Provenance Repository", "The rate of the dataflow is "
-            			+ "exceeding the provenance recording rate. Slowing down flow to accomodate");
-            	
-            	while (journalFileCount > journalCountThreshold) {
-            		try {
-            			Thread.sleep(1000L);
-            		} catch (final InterruptedException ie) {
-            		}
-            		
-                	logger.debug("Provenance Repository is still behind. Keeping flow slowed down "
-                			+ "to accomodate. Currently, there are {} journal files and "
-                			+ "threshold for blocking is {}", journalFileCount, journalCountThreshold);
-
-            		journalFileCount = getJournalCount();
-            	}
-            	
-            	logger.info("Provenance Repository has no caught up with rolling over journal files. Current number of "
-            			+ "journal files to be rolled over is {}", journalFileCount);
-            }
-            
+                logger.warn("The rate of the dataflow is exceeding the provenance recording rate. "
+                        + "Slowing down flow to accomodate. Currently, there are {} journal files and "
+                        + "threshold for blocking is {}", journalFileCount, journalCountThreshold);
+                eventReporter.reportEvent(Severity.WARNING, "Provenance Repository", "The rate of the dataflow is "
+                        + "exceeding the provenance recording rate. Slowing down flow to accomodate");
+
+                while (journalFileCount > journalCountThreshold) {
+                    try {
+                        Thread.sleep(1000L);
+                    } catch (final InterruptedException ie) {
+                    }
+
+                    logger.debug("Provenance Repository is still behind. Keeping flow slowed down "
+                            + "to accomodate. Currently, there are {} journal files and "
+                            + "threshold for blocking is {}", journalFileCount, journalCountThreshold);
+
+                    journalFileCount = getJournalCount();
+                }
+
+                logger.info("Provenance Repository has no caught up with rolling over journal files. Current number of "
+                        + "journal files to be rolled over is {}", journalFileCount);
+            }
+
             writers = createWriters(configuration, idGenerator.get());
             streamStartTime.set(System.currentTimeMillis());
             recordsWrittenSinceRollover.getAndSet(0);
@@ -989,24 +985,24 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
             final Runnable rolloverRunnable = new Runnable() {
                 @Override
                 public void run() {
-                	try {
-	                    final File fileRolledOver;
-	
-	                    try {
-	                        fileRolledOver = mergeJournals(journalsToMerge, storageDir, getMergeFile(journalsToMerge, storageDir), eventReporter, latestRecords);
-	                        repoDirty.set(false);
-	                    } catch (final IOException ioe) {
-	                        repoDirty.set(true);
-	                        logger.error("Failed to merge Journal Files {} into a Provenance Log File due to {}", journalsToMerge, ioe.toString());
-	                        logger.error("", ioe);
-	                        return;
-	                    }
-	
-	                    if (fileRolledOver == null) {
-	                        return;
-	                    }
-	                    File file = fileRolledOver;
-	
+                    try {
+                        final File fileRolledOver;
+
+                        try {
+                            fileRolledOver = mergeJournals(journalsToMerge, storageDir, getMergeFile(journalsToMerge, storageDir), eventReporter, latestRecords);
+                            repoDirty.set(false);
+                        } catch (final IOException ioe) {
+                            repoDirty.set(true);
+                            logger.error("Failed to merge Journal Files {} into a Provenance Log File due to {}", journalsToMerge, ioe.toString());
+                            logger.error("", ioe);
+                            return;
+                        }
+
+                        if (fileRolledOver == null) {
+                            return;
+                        }
+                        File file = fileRolledOver;
+
                         // update our map of id to Path
                         // need lock to update the map, even though it's an AtomicReference, AtomicReference allows those doing a
                         // get() to obtain the most up-to-date version but we use a writeLock to prevent multiple threads modifying
@@ -1021,24 +1017,24 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
                         } finally {
                             writeLock.unlock();
                         }
-	
-	                    logger.info("Successfully Rolled over Provenance Event file containing {} records", recordsWritten);
-	                    rolloverCompletions.getAndIncrement();
-	                    
-	                    // We have finished successfully. Cancel the future so that we don't run anymore
-	                    Future<?> future;
-	                    while ((future = futureReference.get()) == null) {
-	                    	try {
-	                    		Thread.sleep(10L);
-	                    	} catch (final InterruptedException ie) {
-	                    	}
-	                    }
-	                    
-	                    future.cancel(false);
-	                } catch (final Throwable t) {
-	                	logger.error("Failed to rollover Provenance repository due to {}", t.toString());
-	                	logger.error("", t);
-	                }
+
+                        logger.info("Successfully Rolled over Provenance Event file containing {} records", recordsWritten);
+                        rolloverCompletions.getAndIncrement();
+
+                        // We have finished successfully. Cancel the future so that we don't run anymore
+                        Future<?> future;
+                        while ((future = futureReference.get()) == null) {
+                            try {
+                                Thread.sleep(10L);
+                            } catch (final InterruptedException ie) {
+                            }
+                        }
+
+                        future.cancel(false);
+                    } catch (final Throwable t) {
+                        logger.error("Failed to rollover Provenance repository due to {}", t.toString());
+                        logger.error("", t);
+                    }
                 }
             };
 
@@ -1074,10 +1070,10 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
             }
 
             for (final File journalFile : journalFiles) {
-            	if ( journalFile.isDirectory() ) {
-            		continue;
-            	}
-            	
+                if ( journalFile.isDirectory() ) {
+                    continue;
+                }
+
                 final String basename = LuceneUtil.substringBefore(journalFile.getName(), ".");
                 List<File> files = journalMap.get(basename);
                 if (files == null) {
@@ -1120,83 +1116,84 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
         return mergedFile;
     }
 
-    File mergeJournals(final List<File> journalFiles, final File storageDir, final File mergedFile, final EventReporter eventReporter, final RingBuffer<ProvenanceEventRecord> ringBuffer) throws IOException {
-    	logger.debug("Merging {} to {}", journalFiles, mergedFile);
-    	if ( this.closed ) {
-    		logger.info("Provenance Repository has been closed; will not merge journal files to {}", mergedFile);
-    		return null;
-    	}
-    	
+    File mergeJournals(final List<File> journalFiles, final File storageDir, final File mergedFile, final EventReporter eventReporter,
+            final RingBuffer<ProvenanceEventRecord> ringBuffer) throws IOException {
+        logger.debug("Merging {} to {}", journalFiles, mergedFile);
+        if ( this.closed ) {
+            logger.info("Provenance Repository has been closed; will not merge journal files to {}", mergedFile);
+            return null;
+        }
+
         if (journalFiles.isEmpty()) {
             return null;
         }
 
         Collections.sort(journalFiles, new Comparator<File>() {
-			@Override
-			public int compare(final File o1, final File o2) {
-				final String suffix1 = LuceneUtil.substringAfterLast(o1.getName(), ".");
-				final String suffix2 = LuceneUtil.substringAfterLast(o2.getName(), ".");
-
-				try {
-					final int journalIndex1 = Integer.parseInt(suffix1);
-					final int journalIndex2 = Integer.parseInt(suffix2);
-					return Integer.compare(journalIndex1, journalIndex2);
-				} catch (final NumberFormatException nfe) {
-					return o1.getName().compareTo(o2.getName());
-				}
-			}
+            @Override
+            public int compare(final File o1, final File o2) {
+                final String suffix1 = LuceneUtil.substringAfterLast(o1.getName(), ".");
+                final String suffix2 = LuceneUtil.substringAfterLast(o2.getName(), ".");
+
+                try {
+                    final int journalIndex1 = Integer.parseInt(suffix1);
+                    final int journalIndex2 = Integer.parseInt(suffix2);
+                    return Integer.compare(journalIndex1, journalIndex2);
+                } catch (final NumberFormatException nfe) {
+                    return o1.getName().compareTo(o2.getName());
+                }
+            }
         });
-        
+
         final String firstJournalFile = journalFiles.get(0).getName();
         final String firstFileSuffix = LuceneUtil.substringAfterLast(firstJournalFile, ".");
         final boolean allPartialFiles = firstFileSuffix.equals("0");
-        
+
         // check if we have all of the "partial" files for the journal.
         if (allPartialFiles) {
-        	if ( mergedFile.exists() ) {
-        		// we have all "partial" files and there is already a merged file. Delete the data from the index
-        		// because the merge file may not be fully merged. We will re-merge.
-        		logger.warn("Merged Journal File {} already exists; however, all partial journal files also exist "
-        				+ "so assuming that the merge did not finish. Repeating procedure in order to ensure consistency.");
-        		
-        		final DeleteIndexAction deleteAction = new DeleteIndexAction(this, indexConfig, indexManager);
-        		try {
-        			deleteAction.execute(mergedFile);
-        		} catch (final Exception e) {
-        			logger.warn("Failed to delete records from Journal File {} from the index; this could potentially result in duplicates. Failure was due to {}", mergedFile, e.toString());
-        			if ( logger.isDebugEnabled() ) {
-        				logger.warn("", e);
-        			}
-        		}
-
-        		// Since we only store the file's basename, block offset, and event ID, and because the newly created file could end up on
-        		// a different Storage Directory than the original, we need to ensure that we delete both the partially merged
-        		// file and the TOC file. Otherwise, we could get the wrong copy and have issues retrieving events.
-        		if ( !mergedFile.delete() ) {
-        			logger.error("Failed to delete partially written Provenance Journal File {}. This may result in events from this journal "
-        					+ "file not being able to be displayed. This file should be deleted manually.", mergedFile);
-        		}
-        		
-        		final File tocFile = TocUtil.getTocFile(mergedFile);
-        		if ( tocFile.exists() && !tocFile.delete() ) {
-        			logger.error("Failed to delete .toc file {}; this may result in not being able to read the Provenance Events from the {} Journal File. "
-        					+ "This can be corrected by manually deleting the {} file", tocFile, mergedFile, tocFile);
-        		}
-        	}
+            if ( mergedFile.exists() ) {
+                // we have all "partial" files and there is already a merged file. Delete the data from the index
+                // because the merge file may not be fully merged. We will re-merge.
+                logger.warn("Merged Journal File {} already exists; however, all partial journal files also exist "
+                        + "so assuming that the merge did not finish. Repeating procedure in order to ensure consistency.");
+
+                final DeleteIndexAction deleteAction = new DeleteIndexAction(this, indexConfig, indexManager);
+                try {
+                    deleteAction.execute(mergedFile);
+                } catch (final Exception e) {
+                    logger.warn("Failed to delete records from Journal File {} from the index; this could potentially result in duplicates. Failure was due to {}", mergedFile, e.toString());
+                    if ( logger.isDebugEnabled() ) {
+                        logger.warn("", e);
+                    }
+                }
+
+                // Since we only store the file's basename, block offset, and event ID, and because the newly created file could end up on
+                // a different Storage Directory than the original, we need to ensure that we delete both the partially merged
+                // file and the TOC file. Otherwise, we could get the wrong copy and have issues retrieving events.
+                if ( !mergedFile.delete() ) {
+                    logger.error("Failed to delete partially written Provenance Journal File {}. This may result in events from this journal "
+                            + "file not being able to be displayed. This file should be deleted manually.", mergedFile);
+                }
+
+                final File tocFile = TocUtil.getTocFile(mergedFile);
+                if ( tocFile.exists() && !tocFile.delete() ) {
+                    logger.error("Failed to delete .toc file {}; this may result in not being able to read the Provenance Events from the {} Journal File. "
+                            + "This can be corrected by manually deleting the {} file", tocFile, mergedFile, tocFile);
+                }
+            }
         } else {
-        	logger.warn("Cannot merge journal files {} because expected first file to end with extension '.0' "
-        			+ "but it did not; assuming that the files were already merged but only some finished deletion "
-        			+ "before restart. Deleting remaining partial journal files.", journalFiles);
-        	
-        	for ( final File file : journalFiles ) {
-        		if ( !file.delete() && file.exists() ) {
-        			logger.warn("Failed to delete unneeded journal file {}; this file should be cleaned up manually", file);
-        		}
-        	}
-        	
-        	return null;
-        }
-        
+            logger.warn("Cannot merge journal files {} because expected first file to end with extension '.0' "
+                    + "but it did not; assuming that the files were already merged but only some finished deletion "
+                    + "before restart. Deleting remaining partial journal files.", journalFiles);
+
+            for ( final File file : journalFiles ) {
+                if ( !file.delete() && file.exists() ) {
+                    logger.warn("Failed to delete unneeded journal file {}; this file should be cleaned up manually", file);
+                }
+            }
+
+            return null;
+        }
+
         final long startNanos = System.nanoTime();
 
         // Map each journal to a RecordReader
@@ -1241,12 +1238,14 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
                     record = reader.nextRecord();
                 } catch (final EOFException eof) {
                 } catch (final Exception e) {
-                    logger.warn("Failed to generate Provenance Event Record from Journal due to " + e + "; it's possible that the record wasn't completely written to the file. This record will be skipped.");
+                    logger.warn("Failed to generate Provenance Event Record from Journal due to " + e + "; it's possible that the record wasn't "
+                            + "completely written to the file. This record will be skipped.");
                     if (logger.isDebugEnabled()) {
                         logger.warn("", e);
                     }
 
-                    eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed to read Provenance Event Record from Journal due to " + e + "; it's possible that hte record wasn't completely written to the file. This record will be skipped.");
+                    eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed to read Provenance Event Record from Journal due to " + e +
+                            "; it's possible that hte record wasn't completely written to the file. This record will be skipped.");
                 }
 
                 if (record == null) {
@@ -1261,47 +1260,47 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
             try (final RecordWriter writer = RecordWriters.newRecordWriter(writerFile, configuration.isCompressOnRollover(), true)) {
                 writer.writeHeader();
 
-                final IndexingAction indexingAction = new IndexingAction(this, indexConfig);
-                
+                final IndexingAction indexingAction = new IndexingAction(this);
+
                 final File indexingDirectory = indexConfig.getWritableIndexDirectory(writerFile);
                 final IndexWriter indexWriter = indexManager.borrowIndexWriter(indexingDirectory);
                 try {
-                	long maxId = 0L;
-                	
-	                while (!recordToReaderMap.isEmpty()) {
-	                    final Map.Entry<StandardProvenanceEventRecord, RecordReader> entry = recordToReaderMap.entrySet().iterator().next();
-	                    final StandardProvenanceEventRecord record = entry.getKey();
-	                    final RecordReader reader = entry.getValue();
-	
-	                    writer.writeRecord(record, record.getEventId());
-	                    final int blockIndex = writer.getTocWriter().getCurrentBlockIndex();
-	                    
-	                    indexingAction.index(record, indexWriter, blockIndex);
-	                    maxId = record.getEventId();
-	                    
-	                    ringBuffer.add(record);
-	                    records++;
-	
-	                    // Remove this entry from the map
-	                    recordToReaderMap.remove(record);
-	
-	                    // Get the next entry from this reader and add it to the map
-	                    StandardProvenanceEventRecord nextRecord = null;
-	
-	                    try {
-	                        nextRecord = reader.nextRecord();
-	                    } catch (final EOFException eof) {
-	                    }
-	
-	                    if (nextRecord != null) {
-	                        recordToReaderMap.put(nextRecord, reader);
-	                    }
-	                }
-	                
-	                indexWriter.commit();
-	                indexConfig.setMaxIdIndexed(maxId);
+                    long maxId = 0L;
+
+                    while (!recordToReaderMap.isEmpty()) {
+                        final Map.Entry<StandardProvenanceEventRecord, RecordReader> entry = recordToReaderMap.entrySet().iterator().next();
+                        final StandardProvenanceEventRecord record = entry.getKey();
+                        final RecordReader reader = entry.getValue();
+
+                        writer.writeRecord(record, record.getEventId());
+                        final int blockIndex = writer.getTocWriter().getCurrentBlockIndex();
+
+                        indexingAction.index(record, indexWriter, blockIndex);
+                        maxId = record.getEventId();
+
+                        ringBuffer.add(record);
+                        records++;
+
+                        // Remove this entry from the map
+                        recordToReaderMap.remove(record);
+
+                        // Get the next entry from this reader and add it to the map
+                        StandardProvenanceEventRecord nextRecord = null;
+
+                        try {
+                            nextRecord = reader.nextRecord();
+                        } catch (final EOFException eof) {
+                        }
+
+                        if (nextRecord != null) {
+                            recordToReaderMap.put(nextRecord, reader);
+                        }
+                    }
+
+                    indexWriter.commit();
+                    indexConfig.setMaxIdIndexed(maxId);
                 } finally {
-                	indexManager.returnIndexWriter(indexingDirectory, indexWriter);
+                    indexManager.returnIndexWriter(indexingDirectory, indexWriter);
                 }
             }
         } finally {
@@ -1319,7 +1318,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
                 logger.warn("Failed to remove temporary journal file {}; this file should be cleaned up manually", journalFile.getAbsolutePath());
                 eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed to remove temporary journal file " + journalFile.getAbsolutePath() + "; this file should be cleaned up manually");
             }
-            
+
             final File tocFile = TocUtil.getTocFile(journalFile);
             if (!tocFile.delete() && tocFile.exists()) {
                 logger.warn("Failed to remove temporary journal TOC file {}; this file should be cleaned up manually", tocFile.getAbsolutePath());
@@ -1374,7 +1373,8 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
     public QuerySubmission submitQuery(final Query query) {
         final int numQueries = querySubmissionMap.size();
         if (numQueries > MAX_UNDELETED_QUERY_RESULTS) {
-            throw new IllegalStateException("Cannot process query because there are currently " + numQueries + " queries whose results have not been deleted due to poorly behaving clients not issuing DELETE requests. Please try again later.");
+            throw new IllegalStateException("Cannot process query because there are currently " + numQueries + " queries whose results have not "
+                    + "been deleted due to poorly behaving clients not issuing DELETE requests. Please try again later.");
         }
 
         if (query.getEndDate() != null && query.getStartDate() != null && query.getStartDate().getTime() > query.getEndDate().getTime()) {
@@ -1416,7 +1416,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
         final AtomicInteger retrievalCount = new AtomicInteger(0);
         final List<File> indexDirectories = indexConfig.getIndexDirectories(
                 query.getStartDate() == null ? null : query.getStartDate().getTime(),
-                query.getEndDate() == null ? null : query.getEndDate().getTime());
+                        query.getEndDate() == null ? null : query.getEndDate().getTime());
         final AsyncQuerySubmission result = new AsyncQuerySubmission(query, indexDirectories.size());
         querySubmissionMap.put(query.getIdentifier(), result);
 
@@ -1432,11 +1432,11 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
     }
 
     /**
-     * REMOVE-ME: This is for testing only and can be removed.
+     * This is for testing only and not actually used other than in debugging
      *
-     * @param luceneQuery
-     * @return
-     * @throws IOException
+     * @param luceneQuery the lucene query to execute
+     * @return an Iterator of ProvenanceEventRecord that match the query
+     * @throws IOException if unable to perform the query
      */
     public Iterator<ProvenanceEventRecord> queryLucene(final org.apache.lucene.search.Query luceneQuery) throws IOException {
         final List<File> indexFiles = indexConfig.getIndexDirectories();
@@ -1601,7 +1601,8 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
         return computeLineage(Collections.<String>singleton(flowFileUuid), LineageComputationType.FLOWFILE_LINEAGE, null, 0L, Long.MAX_VALUE);
     }
 
-    private Lineage computeLineage(final Collection<String> flowFileUuids, final LineageComputationType computationType, final Long eventId, final Long startTimestamp, final Long endTimestamp) throws IOException {
+    private Lineage computeLineage(final Collection<String> flowFileUuids, final LineageComputationType computationType, final Long eventId, final Long startTimestamp,
+            final Long endTimestamp) throws IOException {
         final AsyncLineageSubmission submission = submitLineageComputation(flowFileUuids, computationType, eventId, startTimestamp, endTimestamp);
         final StandardLineageResult result = submission.getResult();
         while (!result.isFinished()) {
@@ -1623,7 +1624,8 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
         return submitLineageComputation(Collections.singleton(flowFileUuid), LineageComputationType.FLOWFILE_LINEAGE, null, 0L, Long.MAX_VALUE);
     }
 
-    private AsyncLineageSubmission submitLineageComputation(final Collection<String> flowFileUuids, final LineageComputationType computationType, final Long eventId, final long startTimestamp, final long endTimestamp) {
+    private AsyncLineageSubmission submitLineageComputation(final Collection<String> flowFileUuids, final LineageComputationType computationType,
+            final Long eventId, final long startTimestamp, final long endTimestamp) {
         final List<File> indexDirs = indexConfig.getIndexDirectories(startTimestamp, endTimestamp);
         final AsyncLineageSubmission result = new AsyncLineageSubmission(computationType, eventId, flowFileUuids, indexDirs.size());
         lineageSubmissionMap.put(result.getLineageIdentifier(), result);
@@ -1647,16 +1649,16 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
             }
 
             switch (event.getEventType()) {
-                case CLONE:
-                case FORK:
-                case JOIN:
-                case REPLAY:
-                    return submitLineageComputation(event.getChildUuids(), LineageComputationType.EXPAND_CHILDREN, eventId, event.getEventTime(), Long.MAX_VALUE);
-                default:
-                    final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.<String>emptyList(), 1);
-                    lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
-                    submission.getResult().setError("Event ID " + eventId + " indicates an event of type " + event.getEventType() + " so its children cannot be expanded");
-                    return submission;
+            case CLONE:
+            case FORK:
+            case JOIN:
+            case REPLAY:
+                return submitLineageComputation(event.getChildUuids(), LineageComputationType.EXPAND_CHILDREN, eventId, event.getEventTime(), Long.MAX_VALUE);
+            default:
+                final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.<String>emptyList(), 1);
+                lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
+                submission.getResult().setError("Event ID " + eventId + " indicates an event of type " + event.getEventType() + " so its children cannot be expanded");
+                return submission;
             }
         } catch (final IOException ioe) {
             final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.<String>emptyList(), 1);
@@ -1684,17 +1686,17 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
             }
 
             switch (event.getEventType()) {
-                case JOIN:
-                case FORK:
-                case CLONE:
-                case REPLAY:
-                    return submitLineageComputation(event.getParentUuids(), LineageComputationType.EXPAND_PARENTS, eventId, 0L, event.getEventTime());
-                default: {
-                    final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections.<String>emptyList(), 1);
-                    lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
-                    submission.getResult().setError("Event ID " + eventId + " indicates an event of type " + event.getEventType() + " so its parents cannot be expanded");
-                    return submission;
-                }
+            case JOIN:
+            case FORK:
+            case CLONE:
+            case REPLAY:
+                return submitLineageComputation(event.getParentUuids(), LineageComputationType.EXPAND_PARENTS, eventId, 0L, event.getEventTime());
+            default: {
+                final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections.<String>emptyList(), 1);
+                lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
+                submission.getResult().setError("Event ID " + eventId + " indicates an event of type " + event.getEventType() + " so its parents cannot be expanded");
+                return submission;
+            }
             }
         } catch (final IOException ioe) {
             final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections.<String>emptyList(), 1);

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java
index 3951591..d0d147c 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/RepositoryConfiguration.java
@@ -34,7 +34,7 @@ public class RepositoryConfiguration {
     private long desiredIndexBytes = 1024L * 1024L * 500L; // 500 MB
     private int journalCount = 16;
     private int compressionBlockBytes = 1024 * 1024;
-    
+
     private List<SearchableField> searchableFields = new ArrayList<>();
     private List<SearchableField> searchableAttributes = new ArrayList<>();
     private boolean compress = true;
@@ -50,19 +50,19 @@ public class RepositoryConfiguration {
         return allowRollover;
     }
 
-    
+
     public int getCompressionBlockBytes() {
-		return compressionBlockBytes;
-	}
+        return compressionBlockBytes;
+    }
 
-	public void setCompressionBlockBytes(int compressionBlockBytes) {
-		this.compressionBlockBytes = compressionBlockBytes;
-	}
+    public void setCompressionBlockBytes(int compressionBlockBytes) {
+        this.compressionBlockBytes = compressionBlockBytes;
+    }
 
-	/**
+    /**
      * Specifies where the repository will store data
      *
-     * @return
+     * @return the directories where provenance files will be stored
      */
     public List<File> getStorageDirectories() {
         return Collections.unmodifiableList(storageDirectories);
@@ -71,18 +71,15 @@ public class RepositoryConfiguration {
     /**
      * Specifies where the repository should store data
      *
-     * @param storageDirectory
+     * @param storageDirectory the directory to store provenance files
      */
     public void addStorageDirectory(final File storageDirectory) {
         this.storageDirectories.add(storageDirectory);
     }
 
     /**
-     * Returns the minimum amount of time that a given record will stay in the
-     * repository
-     *
-     * @param timeUnit
-     * @return
+     * @param timeUnit the desired time unit
+     * @return the max amount of time that a given record will stay in the repository
      */
     public long getMaxRecordLife(final TimeUnit timeUnit) {
         return timeUnit.convert(recordLifeMillis, TimeUnit.MILLISECONDS);
@@ -91,8 +88,8 @@ public class RepositoryConfiguration {
     /**
      * Specifies how long a record should stay in the repository
      *
-     * @param maxRecordLife
-     * @param timeUnit
+     * @param maxRecordLife the max amount of time to keep a record in the repo
+     * @param timeUnit the period of time used by maxRecordLife
      */
     public void setMaxRecordLife(final long maxRecordLife, final TimeUnit timeUnit) {
         this.recordLifeMillis = TimeUnit.MILLISECONDS.convert(maxRecordLife, timeUnit);
@@ -101,7 +98,7 @@ public class RepositoryConfiguration {
     /**
      * Returns the maximum amount of data to store in the repository (in bytes)
      *
-     * @return
+     * @return the maximum amount of disk space to use for the prov repo
      */
     public long getMaxStorageCapacity() {
         return storageCapacity;
@@ -109,107 +106,91 @@ public class RepositoryConfiguration {
 
     /**
      * Sets the maximum amount of data to store in the repository (in bytes)
-     * @param maxStorageCapacity
+     *
+     * @param maxStorageCapacity the maximum amount of disk space to use for the prov repo
      */
     public void setMaxStorageCapacity(final long maxStorageCapacity) {
         this.storageCapacity = maxStorageCapacity;
     }
 
     /**
-     * Returns the maximum amount of time to write to a single event file
-     *
-     * @param timeUnit
-     * @return
+     * @param timeUnit the desired time unit for the returned value
+     * @return the maximum amount of time that the repo will write to a single event file
      */
     public long getMaxEventFileLife(final TimeUnit timeUnit) {
         return timeUnit.convert(eventFileMillis, TimeUnit.MILLISECONDS);
     }
 
     /**
-     * Sets the maximum amount of time to write to a single event file
-     *
-     * @param maxEventFileTime
-     * @param timeUnit
+     * @param maxEventFileTime the max amount of time to write to a single event file
+     * @param timeUnit the units for the value supplied by maxEventFileTime
      */
     public void setMaxEventFileLife(final long maxEventFileTime, final TimeUnit timeUnit) {
         this.eventFileMillis = TimeUnit.MILLISECONDS.convert(maxEventFileTime, timeUnit);
     }
 
     /**
-     * Returns the maximum number of bytes (pre-compression) that will be
+     * @return the maximum number of bytes (pre-compression) that will be
      * written to a single event file before the file is rolled over
-     *
-     * @return
      */
     public long getMaxEventFileCapacity() {
         return eventFileBytes;
     }
 
     /**
-     * Sets the maximum number of bytes (pre-compression) that will be written
+     * @param maxEventFileBytes the maximum number of bytes (pre-compression) that will be written
      * to a single event file before the file is rolled over
-     *
-     * @param maxEventFileBytes
      */
     public void setMaxEventFileCapacity(final long maxEventFileBytes) {
         this.eventFileBytes = maxEventFileBytes;
     }
 
     /**
-     * Returns the fields that can be indexed
-     *
-     * @return
+     * @return the fields that should be indexed
      */
     public List<SearchableField> getSearchableFields() {
         return Collections.unmodifiableList(searchableFields);
     }
 
     /**
-     * Sets the fields to index
-     *
-     * @param searchableFields
+     * @param searchableFields the fields to index
      */
     public void setSearchableFields(final List<SearchableField> searchableFields) {
         this.searchableFields = new ArrayList<>(searchableFields);
     }
 
     /**
-     * Returns the FlowFile attributes that can be indexed
-     *
-     * @return
+     * @return the FlowFile attributes that should be indexed
      */
     public List<SearchableField> getSearchableAttributes() {
         return Collections.unmodifiableList(searchableAttributes);
     }
 
     /**
-     * Sets the FlowFile attributes to index
-     *
-     * @param searchableAttributes
+     * @param searchableAttributes the FlowFile attributes to index
      */
     public void setSearchableAttributes(final List<SearchableField> searchableAttributes) {
         this.searchableAttributes = new ArrayList<>(searchableAttributes);
     }
 
     /**
-     * Indicates whether or not event files will be compressed when they are
+     * @return whether or not event files will be compressed when they are
      * rolled over
-     *
-     * @return
      */
     public boolean isCompressOnRollover() {
         return compress;
     }
 
     /**
-     * Specifies whether or not to compress event files on rollover
-     *
-     * @param compress
+     * @param compress if true, the data will be compressed when rolled over
      */
     public void setCompressOnRollover(final boolean compress) {
         this.compress = compress;
     }
 
+    /**
+     * @return the number of threads to use to query the repo
+     */
     public int getQueryThreadPoolSize() {
         return queryThreadPoolSize;
     }
@@ -246,27 +227,23 @@ public class RepositoryConfiguration {
      * </li>
      * </ol>
      *
-     * @param bytes
+     * @param bytes the number of bytes to write to an index before beginning a new shard
      */
     public void setDesiredIndexSize(final long bytes) {
         this.desiredIndexBytes = bytes;
     }
 
     /**
-     * Returns the desired size of each index shard. See the
-     * {@Link #setDesiredIndexSize} method for an explanation of why we choose
+     * @return the desired size of each index shard. See the
+     * {@link #setDesiredIndexSize} method for an explanation of why we choose
      * to shard the index.
-     *
-     * @return
      */
     public long getDesiredIndexSize() {
         return desiredIndexBytes;
     }
 
     /**
-     * Sets the number of Journal files to use when persisting records.
-     *
-     * @param numJournals
+     * @param numJournals the number of Journal files to use when persisting records.
      */
     public void setJournalCount(final int numJournals) {
         if (numJournals < 1) {
@@ -277,19 +254,14 @@ public class RepositoryConfiguration {
     }
 
     /**
-     * Returns the number of Journal files that will be used when persisting
-     * records.
-     *
-     * @return
+     * @return the number of Journal files that will be used when persisting records.
      */
     public int getJournalCount() {
         return journalCount;
     }
 
     /**
-     * Specifies whether or not the Repository should sync all updates to disk.
-     *
-     * @return
+     * @return <code>true</code> if the repository will perform an 'fsync' for all updates to disk
      */
     public boolean isAlwaysSync() {
         return alwaysSync;
@@ -301,7 +273,7 @@ public class RepositoryConfiguration {
      * persisted across restarted, even if there is a power failure or a sudden
      * Operating System crash, but it can be very expensive.
      *
-     * @param alwaysSync
+     * @param alwaysSync whether or not to perform an 'fsync' for all updates to disk
      */
     public void setAlwaysSync(boolean alwaysSync) {
         this.alwaysSync = alwaysSync;

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java
index 9bbf195..ca0d5ed 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java
@@ -39,40 +39,40 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class StandardRecordReader implements RecordReader {
-	private static final Logger logger = LoggerFactory.getLogger(StandardRecordReader.class);
-	
-	private final ByteCountingInputStream rawInputStream;
+    private static final Logger logger = LoggerFactory.getLogger(StandardRecordReader.class);
+
+    private final ByteCountingInputStream rawInputStream;
     private final String filename;
     private final int serializationVersion;
     private final boolean compressed;
     private final TocReader tocReader;
     private final int headerLength;
-    
+
     private DataInputStream dis;
     private ByteCountingInputStream byteCountingIn;
 
     public StandardRecordReader(final InputStream in, final String filename) throws IOException {
-    	this(in, filename, null);
+        this(in, filename, null);
     }
-    
+
     public StandardRecordReader(final InputStream in, final String filename, final TocReader tocReader) throws IOException {
-    	logger.trace("Creating RecordReader for {}", filename);
-    	
-    	rawInputStream = new ByteCountingInputStream(in);
+        logger.trace("Creating RecordReader for {}", filename);
+
+        rawInputStream = new ByteCountingInputStream(in);
 
         final InputStream limitedStream;
         if ( tocReader == null ) {
-        	limitedStream = rawInputStream;
+            limitedStream = rawInputStream;
         } else {
-        	final long offset1 = tocReader.getBlockOffset(1);
-        	if ( offset1 < 0 ) {
-        		limitedStream = rawInputStream;
-        	} else {
-        		limitedStream = new LimitingInputStream(rawInputStream, offset1 - rawInputStream.getBytesConsumed());
-        	}
-        }
-        
-    	final InputStream readableStream;
+            final long offset1 = tocReader.getBlockOffset(1);
+            if ( offset1 < 0 ) {
+                limitedStream = rawInputStream;
+            } else {
+                limitedStream = new LimitingInputStream(rawInputStream, offset1 - rawInputStream.getBytesConsumed());
+            }
+        }
+
+        final InputStream readableStream;
         if (filename.endsWith(".gz")) {
             readableStream = new BufferedInputStream(new GZIPInputStream(limitedStream));
             compressed = true;
@@ -83,11 +83,11 @@ public class StandardRecordReader implements RecordReader {
 
         byteCountingIn = new ByteCountingInputStream(readableStream);
         dis = new DataInputStream(byteCountingIn);
-        
+
         final String repoClassName = dis.readUTF();
         final int serializationVersion = dis.readInt();
-        headerLength = repoClassName.getBytes(StandardCharsets.UTF_8).length + 2 + 4;	// 2 bytes for string length, 4 for integer.
-        
+        headerLength = repoClassName.getBytes(StandardCharsets.UTF_8).length + 2 + 4; // 2 bytes for string length, 4 for integer.
+
         if (serializationVersion < 1 || serializationVersion > 8) {
             throw new IllegalArgumentException("Unable to deserialize record because the version is " + serializationVersion + " and supported versions are 1-8");
         }
@@ -99,52 +99,52 @@ public class StandardRecordReader implements RecordReader {
 
     @Override
     public void skipToBlock(final int blockIndex) throws IOException {
-    	if ( tocReader == null ) {
-    		throw new IllegalStateException("Cannot skip to block " + blockIndex + " for Provenance Log " + filename + " because no Table-of-Contents file was found for this Log");
-    	}
-    	
-    	if ( blockIndex < 0 ) {
-    		throw new IllegalArgumentException("Cannot skip to block " + blockIndex + " because the value is negative");
-    	}
-    	
-    	if ( blockIndex == getBlockIndex() ) {
-    		return;
-    	}
-    	
-    	final long offset = tocReader.getBlockOffset(blockIndex);
-    	if ( offset < 0 ) {
-    		throw new IOException("Unable to find block " + blockIndex + " in Provenance Log " + filename);
-    	}
-    	
-    	final long curOffset = rawInputStream.getBytesConsumed();
-    	
-    	final long bytesToSkip = offset - curOffset;
-    	if ( bytesToSkip >= 0 ) {
-	    	try {
-	    		StreamUtils.skip(rawInputStream, bytesToSkip);
-	    		logger.debug("Skipped stream from offset {} to {} ({} bytes skipped)", curOffset, offset, bytesToSkip);
-	    	} catch (final IOException e) {
-	    		throw new IOException("Failed to skip to offset " + offset + " for block " + blockIndex + " of Provenance Log " + filename, e);
-	    	}
-	
-	    	resetStreamForNextBlock();
-    	}
+        if ( tocReader == null ) {
+            throw new IllegalStateException("Cannot skip to block " + blockIndex + " for Provenance Log " + filename + " because no Table-of-Contents file was found for this Log");
+        }
+
+        if ( blockIndex < 0 ) {
+            throw new IllegalArgumentException("Cannot skip to block " + blockIndex + " because the value is negative");
+        }
+
+        if ( blockIndex == getBlockIndex() ) {
+            return;
+        }
+
+        final long offset = tocReader.getBlockOffset(blockIndex);
+        if ( offset < 0 ) {
+            throw new IOException("Unable to find block " + blockIndex + " in Provenance Log " + filename);
+        }
+
+        final long curOffset = rawInputStream.getBytesConsumed();
+
+        final long bytesToSkip = offset - curOffset;
+        if ( bytesToSkip >= 0 ) {
+            try {
+                StreamUtils.skip(rawInputStream, bytesToSkip);
+                logger.debug("Skipped stream from offset {} to {} ({} bytes skipped)", curOffset, offset, bytesToSkip);
+            } catch (final IOException e) {
+                throw new IOException("Failed to skip to offset " + offset + " for block " + blockIndex + " of Provenance Log " + filename, e);
+            }
+
+            resetStreamForNextBlock();
+        }
     }
-    
+
     private void resetStreamForNextBlock() throws IOException {
-    	final InputStream limitedStream;
+        final InputStream limitedStream;
         if ( tocReader == null ) {
-        	limitedStream = rawInputStream;
+            limitedStream = rawInputStream;
         } else {
-        	final long offset = tocReader.getBlockOffset(1 + getBlockIndex());
-        	if ( offset < 0 ) {
-        		limitedStream = rawInputStream;
-        	} else {
-        		limitedStream = new LimitingInputStream(rawInputStream, offset - rawInputStream.getBytesConsumed());
-        	}
-        }
-    	
-    	final InputStream readableStream;
+            final long offset = tocReader.getBlockOffset(1 + getBlockIndex());
+            if ( offset < 0 ) {
+                limitedStream = rawInputStream;
+            } else {
+                limitedStream = new LimitingInputStream(rawInputStream, offset - rawInputStream.getBytesConsumed());
+            }
+        }
+
+        final InputStream readableStream;
         if (compressed) {
             readableStream = new BufferedInputStream(new GZIPInputStream(limitedStream));
         } else {
@@ -154,32 +154,32 @@ public class StandardRecordReader implements RecordReader {
         byteCountingIn = new ByteCountingInputStream(readableStream, rawInputStream.getBytesConsumed());
         dis = new DataInputStream(byteCountingIn);
     }
-    
-    
+
+
     @Override
     public TocReader getTocReader() {
-    	return tocReader;
+        return tocReader;
     }
-    
+
     @Override
     public boolean isBlockIndexAvailable() {
-    	return tocReader != null;
+        return tocReader != null;
     }
-    
+
     @Override
     public int getBlockIndex() {
-    	if ( tocReader == null ) {
-    		throw new IllegalStateException("Cannot determine Block Index because no Table-of-Contents could be found for Provenance Log " + filename);
-    	}
-    	
-    	return tocReader.getBlockIndex(rawInputStream.getBytesConsumed());
+        if ( tocReader == null ) {
+            throw new IllegalStateException("Cannot determine Block Index because no Table-of-Contents could be found for Provenance Log " + filename);
+        }
+
+        return tocReader.getBlockIndex(rawInputStream.getBytesConsumed());
     }
-    
+
     @Override
     public long getBytesConsumed() {
-    	return byteCountingIn.getBytesConsumed();
+        return byteCountingIn.getBytesConsumed();
     }
-    
+
     private StandardProvenanceEventRecord readPreVersion6Record() throws IOException {
         final long startOffset = byteCountingIn.getBytesConsumed();
 
@@ -374,17 +374,17 @@ public class StandardRecordReader implements RecordReader {
     }
 
     private String readUUID(final DataInputStream in) throws IOException {
-    	if ( serializationVersion < 8 ) {
-	        final long msb = in.readLong();
-	        final long lsb = in.readLong();
-	        return new UUID(msb, lsb).toString();
-    	} else {
-    		// before version 8, we serialized UUID's as two longs in order to
-    		// write less data. However, in version 8 we changed to just writing
-    		// out the string because it's extremely expensive to call UUID.fromString.
-    		// In the end, since we generally compress, the savings in minimal anyway.
-    		return in.readUTF();
-    	}
+        if ( serializationVersion < 8 ) {
+            final long msb = in.readLong();
+            final long lsb = in.readLong();
+            return new UUID(msb, lsb).toString();
+        } else {
+            // before version 8, we serialized UUID's as two longs in order to
+            // write less data. However, in version 8 we changed to just writing
+            // out the string because it's extremely expensive to call UUID.fromString.
+            // In the end, since we generally compress, the savings in minimal anyway.
+            return in.readUTF();
+        }
     }
 
     private String readNullableString(final DataInputStream in) throws IOException {
@@ -416,53 +416,53 @@ public class StandardRecordReader implements RecordReader {
         byteCountingIn.mark(1);
         int nextByte = byteCountingIn.read();
         byteCountingIn.reset();
-        
+
         if ( nextByte < 0 ) {
-        	try {
-        		resetStreamForNextBlock();
-        	} catch (final EOFException eof) {
-        		return false;
-        	}
-        	
+            try {
+                resetStreamForNextBlock();
+            } catch (final EOFException eof) {
+                return false;
+            }
+
             byteCountingIn.mark(1);
             nextByte = byteCountingIn.read();
             byteCountingIn.reset();
         }
-        
+
         return (nextByte >= 0);
     }
-    
+
     @Override
     public long getMaxEventId() throws IOException {
-    	if ( tocReader != null ) {
-    		final long lastBlockOffset = tocReader.getLastBlockOffset();
-    		skipToBlock(tocReader.getBlockIndex(lastBlockOffset));
-    	}
-    	
-    	ProvenanceEventRecord record;
-    	ProvenanceEventRecord lastRecord = null;
-    	try {
-	    	while ((record = nextRecord()) != null) {
-	    		lastRecord = record;
-	    	}
-    	} catch (final EOFException eof) {
-    		// This can happen if we stop NIFi while the record is being written.
-    		// This is OK, we just ignore this record. The session will not have been
-    		// committed, so we can just process the FlowFile again.
-    	}
-    	
-    	return (lastRecord == null) ? -1L : lastRecord.getEventId();
+        if ( tocReader != null ) {
+            final long lastBlockOffset = tocReader.getLastBlockOffset();
+            skipToBlock(tocReader.getBlockIndex(lastBlockOffset));
+        }
+
+        ProvenanceEventRecord record;
+        ProvenanceEventRecord lastRecord = null;
+        try {
+            while ((record = nextRecord()) != null) {
+                lastRecord = record;
+            }
+        } catch (final EOFException eof) {
+            // This can happen if we stop NIFi while the record is being written.
+            // This is OK, we just ignore this record. The session will not have been
+            // committed, so we can just process the FlowFile again.
+        }
+
+        return (lastRecord == null) ? -1L : lastRecord.getEventId();
     }
 
     @Override
     public void close() throws IOException {
-    	logger.trace("Closing Record Reader for {}", filename);
-    	
+        logger.trace("Closing Record Reader for {}", filename);
+
         dis.close();
         rawInputStream.close();
-        
+
         if ( tocReader != null ) {
-        	tocReader.close();
+            tocReader.close();
         }
     }
 
@@ -473,9 +473,9 @@ public class StandardRecordReader implements RecordReader {
 
     @Override
     public void skipTo(final long position) throws IOException {
-    	// we are subtracting headerLength from the number of bytes consumed because we used to 
-    	// consider the offset of the first record "0" - now we consider it whatever position it
-    	// it really is in the stream.
+        // we are subtracting headerLength from the number of bytes consumed because we used to
+        // consider the offset of the first record "0" - now we consider it whatever position it
+        // it really is in the stream.
         final long currentPosition = byteCountingIn.getBytesConsumed() - headerLength;
         if (currentPosition == position) {
             return;

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
index dbb2c48..3095f13 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
@@ -36,15 +36,15 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class StandardRecordWriter implements RecordWriter {
-	private static final Logger logger = LoggerFactory.getLogger(StandardRecordWriter.class);
-	
+    private static final Logger logger = LoggerFactory.getLogger(StandardRecordWriter.class);
+
     private final File file;
     private final FileOutputStream fos;
     private final ByteCountingOutputStream rawOutStream;
     private final TocWriter tocWriter;
     private final boolean compressed;
     private final int uncompressedBlockSize;
-    
+
     private DataOutputStream out;
     private ByteCountingOutputStream byteCountingOut;
     private long lastBlockOffset = 0L;
@@ -52,21 +52,21 @@ public class StandardRecordWriter implements RecordWriter {
 
     private final Lock lock = new ReentrantLock();
 
-    
+
     public StandardRecordWriter(final File file, final TocWriter writer, final boolean compressed, final int uncompressedBlockSize) throws IOException {
-    	logger.trace("Creating Record Writer for {}", file.getName());
-    	
+        logger.trace("Creating Record Writer for {}", file.getName());
+
         this.file = file;
         this.compressed = compressed;
         this.fos = new FileOutputStream(file);
         rawOutStream = new ByteCountingOutputStream(fos);
         this.uncompressedBlockSize = uncompressedBlockSize;
-        
+
         this.tocWriter = writer;
     }
 
     static void writeUUID(final DataOutputStream out, final String uuid) throws IOException {
-    	out.writeUTF(uuid);
+        out.writeUTF(uuid);
     }
 
     static void writeUUIDs(final DataOutputStream out, final Collection<String> list) throws IOException {
@@ -85,49 +85,49 @@ public class StandardRecordWriter implements RecordWriter {
         return file;
     }
 
-	@Override
+    @Override
     public synchronized void writeHeader() throws IOException {
         lastBlockOffset = rawOutStream.getBytesWritten();
         resetWriteStream();
-        
+
         out.writeUTF(PersistentProvenanceRepository.class.getName());
         out.writeInt(PersistentProvenanceRepository.SERIALIZATION_VERSION);
         out.flush();
     }
-    
+
     private void resetWriteStream() throws IOException {
-    	if ( out != null ) {
-    		out.flush();
-    	}
-
-    	final long byteOffset = (byteCountingOut == null) ? rawOutStream.getBytesWritten() : byteCountingOut.getBytesWritten();
-    	
-    	final OutputStream writableStream;
-    	if ( compressed ) {
-    		// because of the way that GZIPOutputStream works, we need to call close() on it in order for it
-    		// to write its trailing bytes. But we don't want to close the underlying OutputStream, so we wrap
-    		// the underlying OutputStream in a NonCloseableOutputStream
-    		if ( out != null ) {
-    			out.close();
-    		}
-
-        	if ( tocWriter != null ) {
-        		tocWriter.addBlockOffset(rawOutStream.getBytesWritten());
-        	}
-
-    		writableStream = new BufferedOutputStream(new GZIPOutputStream(new NonCloseableOutputStream(rawOutStream), 1), 65536);
-    	} else {
-        	if ( tocWriter != null ) {
-        		tocWriter.addBlockOffset(rawOutStream.getBytesWritten());
-        	}
-
-    		writableStream = new BufferedOutputStream(rawOutStream, 65536);
-    	}
-    	
+        if ( out != null ) {
+            out.flush();
+        }
+
+        final long byteOffset = (byteCountingOut == null) ? rawOutStream.getBytesWritten() : byteCountingOut.getBytesWritten();
+
+        final OutputStream writableStream;
+        if ( compressed ) {
+            // because of the way that GZIPOutputStream works, we need to call close() on it in order for it
+            // to write its trailing bytes. But we don't want to close the underlying OutputStream, so we wrap
+            // the underlying OutputStream in a NonCloseableOutputStream
+            if ( out != null ) {
+                out.close();
+            }
+
+            if ( tocWriter != null ) {
+                tocWriter.addBlockOffset(rawOutStream.getBytesWritten());
+            }
+
+            writableStream = new BufferedOutputStream(new GZIPOutputStream(new NonCloseableOutputStream(rawOutStream), 1), 65536);
+        } else {
+            if ( tocWriter != null ) {
+                tocWriter.addBlockOffset(rawOutStream.getBytesWritten());
+            }
+
+            writableStream = new BufferedOutputStream(rawOutStream, 65536);
+        }
+
         this.byteCountingOut = new ByteCountingOutputStream(writableStream, byteOffset);
         this.out = new DataOutputStream(byteCountingOut);
     }
-    
+
 
     @Override
     public synchronized long writeRecord(final ProvenanceEventRecord record, long recordIdentifier) throws IOException {
@@ -136,16 +136,16 @@ public class StandardRecordWriter implements RecordWriter {
 
         // add a new block to the TOC if needed.
         if ( tocWriter != null && (startBytes - lastBlockOffset >= uncompressedBlockSize) ) {
-        	lastBlockOffset = startBytes;
-        	
-        	if ( compressed ) {
-        		// because of the way that GZIPOutputStream works, we need to call close() on it in order for it
-        		// to write its trailing bytes. But we don't want to close the underlying OutputStream, so we wrap
-        		// the underlying OutputStream in a NonCloseableOutputStream
-        		resetWriteStream();
-        	}
+            lastBlockOffset = startBytes;
+
+            if ( compressed ) {
+                // because of the way that GZIPOutputStream works, we need to call close() on it in order for it
+                // to write its trailing bytes. But we don't want to close the underlying OutputStream, so we wrap
+                // the underlying OutputStream in a NonCloseableOutputStream
+                resetWriteStream();
+            }
         }
-        
+
         out.writeLong(recordIdentifier);
         out.writeUTF(record.getEventType().name());
         out.writeLong(record.getEventTime());
@@ -175,7 +175,7 @@ public class StandardRecordWriter implements RecordWriter {
             writeLongNullableString(out, entry.getValue());
         }
 
-        // If Content Claim Info is present, write out a 'TRUE' followed by claim info. Else, write out 'false'. 
+        // If Content Claim Info is present, write out a 'TRUE' followed by claim info. Else, write out 'false'.
         if (record.getContentClaimSection() != null && record.getContentClaimContainer() != null && record.getContentClaimIdentifier() != null) {
             out.writeBoolean(true);
             out.writeUTF(record.getContentClaimContainer());
@@ -261,24 +261,24 @@ public class StandardRecordWriter implements RecordWriter {
 
     @Override
     public synchronized void close() throws IOException {
-    	logger.trace("Closing Record Writer for {}", file.getName());
-    	
+        logger.trace("Closing Record Writer for {}", file.getName());
+
         lock();
         try {
-        	try {
-        		out.flush();
-        		out.close();
-        	} finally {
-        		rawOutStream.close();
-            
-	            if ( tocWriter != null ) {
-	            	tocWriter.close();
-	            }
-        	}
+            try {
+                out.flush();
+                out.close();
+            } finally {
+                rawOutStream.close();
+
+                if ( tocWriter != null ) {
+                    tocWriter.close();
+                }
+            }
         } finally {
             unlock();
         }
-        
+
     }
 
     @Override
@@ -308,14 +308,14 @@ public class StandardRecordWriter implements RecordWriter {
 
     @Override
     public void sync() throws IOException {
-    	if ( tocWriter != null ) {
-    		tocWriter.sync();
-    	}
-    	fos.getFD().sync();
+        if ( tocWriter != null ) {
+            tocWriter.sync();
+        }
+        fos.getFD().sync();
     }
-    
+
     @Override
     public TocWriter getTocWriter() {
-    	return tocWriter;
+        return tocWriter;
     }
 }


[2/4] incubator-nifi git commit: NIFI-527: Code cleanup

Posted by ma...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/expiration/ExpirationAction.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/expiration/ExpirationAction.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/expiration/ExpirationAction.java
index 8c266d1..0ffa5e6 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/expiration/ExpirationAction.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/expiration/ExpirationAction.java
@@ -25,9 +25,9 @@ public interface ExpirationAction {
      * Performs some action against the given File and returns the new File that
      * contains the modified version
      *
-     * @param expiredFile
-     * @return
-     * @throws IOException
+     * @param expiredFile the file that was expired
+     * @return the new file after the file has been renamed, or the expiredFile if the file was not renamed
+     * @throws IOException if there was an IO problem
      */
     File execute(File expiredFile) throws IOException;
 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java
index 7db04aa..70bf36e 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DeleteIndexAction.java
@@ -49,9 +49,9 @@ public class DeleteIndexAction implements ExpirationAction {
         long numDeleted = 0;
         long maxEventId = -1L;
         try (final RecordReader reader = RecordReaders.newRecordReader(expiredFile, repository.getAllLogFiles())) {
-        	maxEventId = reader.getMaxEventId();
+            maxEventId = reader.getMaxEventId();
         } catch (final IOException ioe) {
-        	logger.warn("Failed to obtain max ID present in journal file {}", expiredFile.getAbsolutePath());
+            logger.warn("Failed to obtain max ID present in journal file {}", expiredFile.getAbsolutePath());
         }
 
         // remove the records from the index
@@ -68,19 +68,19 @@ public class DeleteIndexAction implements ExpirationAction {
                 deleteDir = (docsLeft <= 0);
                 logger.debug("After expiring {}, there are {} docs left for index {}", expiredFile, docsLeft, indexingDirectory);
             } finally {
-            	indexManager.returnIndexWriter(indexingDirectory, writer);
+                indexManager.returnIndexWriter(indexingDirectory, writer);
             }
 
             // we've confirmed that all documents have been removed. Delete the index directory.
             if (deleteDir) {
-            	indexManager.removeIndex(indexingDirectory);
+                indexManager.removeIndex(indexingDirectory);
                 indexConfiguration.removeIndexDirectory(indexingDirectory);
-                
+
                 deleteDirectory(indexingDirectory);
                 logger.info("Removed empty index directory {}", indexingDirectory);
             }
         }
-        
+
         // Update the minimum index to 1 more than the max Event ID in this file.
         if (maxEventId > -1L) {
             indexConfiguration.setMinIdIndexed(maxEventId + 1L);

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
index 5a77f42..98137fb 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
@@ -45,12 +45,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class DocsReader {
-	private final Logger logger = LoggerFactory.getLogger(DocsReader.class);
-	
+    private final Logger logger = LoggerFactory.getLogger(DocsReader.class);
+
     public DocsReader(final List<File> storageDirectories) {
     }
 
-    public Set<ProvenanceEventRecord> read(final TopDocs topDocs, final IndexReader indexReader, final Collection<Path> allProvenanceLogFiles, final AtomicInteger retrievalCount, final int maxResults) throws IOException {
+    public Set<ProvenanceEventRecord> read(final TopDocs topDocs, final IndexReader indexReader, final Collection<Path> allProvenanceLogFiles,
+            final AtomicInteger retrievalCount, final int maxResults) throws IOException {
         if (retrievalCount.get() >= maxResults) {
             return Collections.emptySet();
         }
@@ -73,42 +74,42 @@ public class DocsReader {
         return read(docs, allProvenanceLogFiles);
     }
 
-    
+
     private long getByteOffset(final Document d, final RecordReader reader) {
         final IndexableField blockField = d.getField(FieldNames.BLOCK_INDEX);
         if ( blockField != null ) {
-        	final int blockIndex = blockField.numericValue().intValue();
-        	final TocReader tocReader = reader.getTocReader();
-        	return tocReader.getBlockOffset(blockIndex);
+            final int blockIndex = blockField.numericValue().intValue();
+            final TocReader tocReader = reader.getTocReader();
+            return tocReader.getBlockOffset(blockIndex);
         }
-        
-    	return d.getField(FieldNames.STORAGE_FILE_OFFSET).numericValue().longValue();
+
+        return d.getField(FieldNames.STORAGE_FILE_OFFSET).numericValue().longValue();
     }
-    
-    
+
+
     private ProvenanceEventRecord getRecord(final Document d, final RecordReader reader) throws IOException {
-    	IndexableField blockField = d.getField(FieldNames.BLOCK_INDEX);
-    	if ( blockField == null ) {
-    		reader.skipTo(getByteOffset(d, reader));
-    	} else {
-    		reader.skipToBlock(blockField.numericValue().intValue());
-    	}
-    	
+        IndexableField blockField = d.getField(FieldNames.BLOCK_INDEX);
+        if ( blockField == null ) {
+            reader.skipTo(getByteOffset(d, reader));
+        } else {
+            reader.skipToBlock(blockField.numericValue().intValue());
+        }
+
         StandardProvenanceEventRecord record;
         while ( (record = reader.nextRecord()) != null) {
-        	IndexableField idField = d.getField(SearchableFields.Identifier.getSearchableFieldName());
-        	if ( idField == null || idField.numericValue().longValue() == record.getEventId() ) {
-        		break;
-        	}
+            IndexableField idField = d.getField(SearchableFields.Identifier.getSearchableFieldName());
+            if ( idField == null || idField.numericValue().longValue() == record.getEventId() ) {
+                break;
+            }
         }
-        
+
         if ( record == null ) {
-        	throw new IOException("Failed to find Provenance Event " + d);
+            throw new IOException("Failed to find Provenance Event " + d);
         } else {
-        	return record;
+            return record;
         }
     }
-    
+
 
     public Set<ProvenanceEventRecord> read(final List<Document> docs, final Collection<Path> allProvenanceLogFiles) throws IOException {
         LuceneUtil.sortDocsForRetrieval(docs);
@@ -119,23 +120,23 @@ public class DocsReader {
 
         final long start = System.nanoTime();
         int logFileCount = 0;
-        
+
         final Set<String> storageFilesToSkip = new HashSet<>();
-        
+
         try {
             for (final Document d : docs) {
                 final String storageFilename = d.getField(FieldNames.STORAGE_FILENAME).stringValue();
                 if ( storageFilesToSkip.contains(storageFilename) ) {
-                	continue;
+                    continue;
                 }
-                
+
                 try {
                     if (reader != null && storageFilename.equals(lastStorageFilename)) {
-                       	matchingRecords.add(getRecord(d, reader));
+                        matchingRecords.add(getRecord(d, reader));
                     } else {
-                    	logger.debug("Opening log file {}", storageFilename);
-                    	
-                    	logFileCount++;
+                        logger.debug("Opening log file {}", storageFilename);
+
+                        logFileCount++;
                         if (reader != null) {
                             reader.close();
                         }
@@ -143,20 +144,20 @@ public class DocsReader {
                         List<File> potentialFiles = LuceneUtil.getProvenanceLogFiles(storageFilename, allProvenanceLogFiles);
                         if (potentialFiles.isEmpty()) {
                             logger.warn("Could not find Provenance Log File with basename {} in the "
-                            		+ "Provenance Repository; assuming file has expired and continuing without it", storageFilename);
+                                    + "Provenance Repository; assuming file has expired and continuing without it", storageFilename);
                             storageFilesToSkip.add(storageFilename);
                             continue;
                         }
 
                         if (potentialFiles.size() > 1) {
-                            throw new FileNotFoundException("Found multiple Provenance Log Files with basename " + 
-                            		storageFilename + " in the Provenance Repository");
+                            throw new FileNotFoundException("Found multiple Provenance Log Files with basename " +
+                                    storageFilename + " in the Provenance Repository");
                         }
 
                         for (final File file : potentialFiles) {
                             try {
-                            	reader = RecordReaders.newRecordReader(file, allProvenanceLogFiles);
-                               	matchingRecords.add(getRecord(d, reader));
+                                reader = RecordReaders.newRecordReader(file, allProvenanceLogFiles);
+                                matchingRecords.add(getRecord(d, reader));
                             } catch (final IOException e) {
                                 throw new IOException("Failed to retrieve record " + d + " from Provenance File " + file + " due to " + e, e);
                             }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
index 3943504..9c3ec31 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java
@@ -41,65 +41,65 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class IndexManager implements Closeable {
-	private static final Logger logger = LoggerFactory.getLogger(IndexManager.class);
-	
-	private final Lock lock = new ReentrantLock();
-	private final Map<File, IndexWriterCount> writerCounts = new HashMap<>();
-	private final Map<File, List<ActiveIndexSearcher>> activeSearchers = new HashMap<>();
-	
-	
-	public void removeIndex(final File indexDirectory) {
-		final File absoluteFile = indexDirectory.getAbsoluteFile();
-		logger.info("Removing index {}", indexDirectory);
-		
-		lock.lock();
-		try {
-			final IndexWriterCount count = writerCounts.remove(absoluteFile);
-			if ( count != null ) {
-				try {
-					count.close();
-				} catch (final IOException ioe) {
-					logger.warn("Failed to close Index Writer {} for {}", count.getWriter(), absoluteFile);
-					if ( logger.isDebugEnabled() ) {
-						logger.warn("", ioe);
-					}
-				}
-			}
-			
-			for ( final List<ActiveIndexSearcher> searcherList : activeSearchers.values() ) {
-				for ( final ActiveIndexSearcher searcher : searcherList ) {
-					try {
-						searcher.close();
-					} catch (final IOException ioe) {
-						logger.warn("Failed to close Index Searcher {} for {} due to {}", 
-								searcher.getSearcher(), absoluteFile, ioe);
-						if ( logger.isDebugEnabled() ) {
-							logger.warn("", ioe);
-						}
-					}
-				}
-			}
-		} finally {
-			lock.unlock();
-		}
-	}
-	
-	public IndexWriter borrowIndexWriter(final File indexingDirectory) throws IOException {
-		final File absoluteFile = indexingDirectory.getAbsoluteFile();
-		logger.debug("Borrowing index writer for {}", indexingDirectory);
-		
-		lock.lock();
-		try {
-			IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
-			if ( writerCount == null ) {
-				final List<Closeable> closeables = new ArrayList<>();
+    private static final Logger logger = LoggerFactory.getLogger(IndexManager.class);
+
+    private final Lock lock = new ReentrantLock();
+    private final Map<File, IndexWriterCount> writerCounts = new HashMap<>();
+    private final Map<File, List<ActiveIndexSearcher>> activeSearchers = new HashMap<>();
+
+
+    public void removeIndex(final File indexDirectory) {
+        final File absoluteFile = indexDirectory.getAbsoluteFile();
+        logger.info("Removing index {}", indexDirectory);
+
+        lock.lock();
+        try {
+            final IndexWriterCount count = writerCounts.remove(absoluteFile);
+            if ( count != null ) {
+                try {
+                    count.close();
+                } catch (final IOException ioe) {
+                    logger.warn("Failed to close Index Writer {} for {}", count.getWriter(), absoluteFile);
+                    if ( logger.isDebugEnabled() ) {
+                        logger.warn("", ioe);
+                    }
+                }
+            }
+
+            for ( final List<ActiveIndexSearcher> searcherList : activeSearchers.values() ) {
+                for ( final ActiveIndexSearcher searcher : searcherList ) {
+                    try {
+                        searcher.close();
+                    } catch (final IOException ioe) {
+                        logger.warn("Failed to close Index Searcher {} for {} due to {}",
+                                searcher.getSearcher(), absoluteFile, ioe);
+                        if ( logger.isDebugEnabled() ) {
+                            logger.warn("", ioe);
+                        }
+                    }
+                }
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public IndexWriter borrowIndexWriter(final File indexingDirectory) throws IOException {
+        final File absoluteFile = indexingDirectory.getAbsoluteFile();
+        logger.debug("Borrowing index writer for {}", indexingDirectory);
+
+        lock.lock();
+        try {
+            IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
+            if ( writerCount == null ) {
+                final List<Closeable> closeables = new ArrayList<>();
                 final Directory directory = FSDirectory.open(indexingDirectory);
                 closeables.add(directory);
-                
+
                 try {
-                	final Analyzer analyzer = new StandardAnalyzer();
-                	closeables.add(analyzer);
-                	
+                    final Analyzer analyzer = new StandardAnalyzer();
+                    closeables.add(analyzer);
+
                     final IndexWriterConfig config = new IndexWriterConfig(LuceneUtil.LUCENE_VERSION, analyzer);
                     config.setWriteLockTimeout(300000L);
 
@@ -107,361 +107,361 @@ public class IndexManager implements Closeable {
                     writerCount = new IndexWriterCount(indexWriter, analyzer, directory, 1);
                     logger.debug("Providing new index writer for {}", indexingDirectory);
                 } catch (final IOException ioe) {
-                	for ( final Closeable closeable : closeables ) {
-                		try {
-                			closeable.close();
-                		} catch (final IOException ioe2) {
-                			ioe.addSuppressed(ioe2);
-                		}
-                	}
-                	
-                	throw ioe;
+                    for ( final Closeable closeable : closeables ) {
+                        try {
+                            closeable.close();
+                        } catch (final IOException ioe2) {
+                            ioe.addSuppressed(ioe2);
+                        }
+                    }
+
+                    throw ioe;
                 }
-                
+
                 writerCounts.put(absoluteFile, writerCount);
-			} else {
-				logger.debug("Providing existing index writer for {} and incrementing count to {}", indexingDirectory, writerCount.getCount() + 1);
-				writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
-						writerCount.getAnalyzer(), writerCount.getDirectory(), writerCount.getCount() + 1));
-			}
-			
-			return writerCount.getWriter();
-		} finally {
-			lock.unlock();
-		}
-	}
-	
-	public void returnIndexWriter(final File indexingDirectory, final IndexWriter writer) {
-		final File absoluteFile = indexingDirectory.getAbsoluteFile();
-		logger.debug("Returning Index Writer for {} to IndexManager", indexingDirectory);
-		
-		lock.lock();
-		try {
-			IndexWriterCount count = writerCounts.remove(absoluteFile);
-			
-			try {
-				if ( count == null ) {
-					logger.warn("Index Writer {} was returned to IndexManager for {}, but this writer is not known. "
-							+ "This could potentially lead to a resource leak", writer, indexingDirectory);
-					writer.close();
-				} else if ( count.getCount() <= 1 ) {
-					// we are finished with this writer.
-					logger.debug("Closing Index Writer for {}", indexingDirectory);
-					count.close();
-				} else {
-					// decrement the count.
-					logger.debug("Decrementing count for Index Writer for {} to {}", indexingDirectory, count.getCount() - 1);
-					writerCounts.put(absoluteFile, new IndexWriterCount(count.getWriter(), count.getAnalyzer(), count.getDirectory(), count.getCount() - 1));
-				}
-			} catch (final IOException ioe) {
-				logger.warn("Failed to close Index Writer {} due to {}", writer, ioe);
-				if ( logger.isDebugEnabled() ) {
-					logger.warn("", ioe);
-				}
-			}
-		} finally {
-			lock.unlock();
-		}
-	}
-
-	
-	public IndexSearcher borrowIndexSearcher(final File indexDir) throws IOException {
-		final File absoluteFile = indexDir.getAbsoluteFile();
-		logger.debug("Borrowing index searcher for {}", indexDir);
-		
-		lock.lock();
-		try {
-			// check if we already have a reader cached.
-			List<ActiveIndexSearcher> currentlyCached = activeSearchers.get(absoluteFile);
-			if ( currentlyCached == null ) {
-				currentlyCached = new ArrayList<>();
-				activeSearchers.put(absoluteFile, currentlyCached);
-			} else {
-				// keep track of any searchers that have been closed so that we can remove them
-				// from our cache later.
-				final Set<ActiveIndexSearcher> expired = new HashSet<>();
-				
-				try {
-					for ( final ActiveIndexSearcher searcher : currentlyCached ) {
-						if ( searcher.isCache() ) {
-							final int refCount = searcher.getSearcher().getIndexReader().getRefCount();
-							if ( refCount <= 0 ) {
-								// if refCount == 0, then the reader has been closed, so we need to discard the searcher
-								logger.debug("Reference count for cached Index Searcher for {} is currently {}; "
-									+ "removing cached searcher", absoluteFile, refCount);
-								expired.add(searcher);
-								continue;
-							}
-							
-							logger.debug("Providing previously cached index searcher for {}", indexDir);
-							return searcher.getSearcher();
-						}
-					}
-				} finally {
-					// if we have any expired index searchers, we need to close them and remove them
-					// from the cache so that we don't try to use them again later.
-					for ( final ActiveIndexSearcher searcher : expired ) {
-						try {
-							searcher.close();
-						} catch (final Exception e) {
-							logger.debug("Failed to close 'expired' IndexSearcher {}", searcher);
-						}
-						
-						currentlyCached.remove(searcher);
-					}
-				}
-			}
-			
-			IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
-			if ( writerCount == null ) {
-				final Directory directory = FSDirectory.open(absoluteFile);
-				logger.debug("No Index Writer currently exists for {}; creating a cachable reader", indexDir);
-				
-				try {
-					final DirectoryReader directoryReader = DirectoryReader.open(directory);
-					final IndexSearcher searcher = new IndexSearcher(directoryReader);
-					
-					// we want to cache the searcher that we create, since it's just a reader.
-					final ActiveIndexSearcher cached = new ActiveIndexSearcher(searcher, directoryReader, directory, true);
-					currentlyCached.add(cached);
-					
-					return cached.getSearcher();
-				} catch (final IOException e) {
-					try {
-						directory.close();
-					} catch (final IOException ioe) {
-						e.addSuppressed(ioe);
-					}
-					
-					throw e;
-				}
-			} else {
-				logger.debug("Index Writer currently exists for {}; creating a non-cachable reader and incrementing "
-						+ "counter to {}", indexDir, writerCount.getCount() + 1);
-
-				// increment the writer count to ensure that it's kept open.
-				writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
-						writerCount.getAnalyzer(), writerCount.getDirectory(), writerCount.getCount() + 1));
-				
-				// create a new Index Searcher from the writer so that we don't have an issue with trying
-				// to read from a directory that's locked. If we get the "no segments* file found" with
-				// Lucene, this indicates that an IndexWriter already has the directory open.
-				final IndexWriter writer = writerCount.getWriter();
-				final DirectoryReader directoryReader = DirectoryReader.open(writer, false);
-				final IndexSearcher searcher = new IndexSearcher(directoryReader);
-				
-				// we don't want to cache this searcher because it's based on a writer, so we want to get
-				// new values the next time that we search.
-				final ActiveIndexSearcher activeSearcher = new ActiveIndexSearcher(searcher, directoryReader, null, false);
-				
-				currentlyCached.add(activeSearcher);
-				return activeSearcher.getSearcher();
-			}
-		} finally {
-			lock.unlock();
-		}
-	}
-	
-	
-	public void returnIndexSearcher(final File indexDirectory, final IndexSearcher searcher) {
-		final File absoluteFile = indexDirectory.getAbsoluteFile();
-		logger.debug("Returning index searcher for {} to IndexManager", indexDirectory);
-		
-		lock.lock();
-		try {
-			// check if we already have a reader cached.
-			List<ActiveIndexSearcher> currentlyCached = activeSearchers.get(absoluteFile);
-			if ( currentlyCached == null ) {
-				logger.warn("Received Index Searcher for {} but no searcher was provided for that directory; this could "
-						+ "result in a resource leak", indexDirectory);
-				return;
-			}
-			
-			final Iterator<ActiveIndexSearcher> itr = currentlyCached.iterator();
-			while (itr.hasNext()) {
-				final ActiveIndexSearcher activeSearcher = itr.next();
-				if ( activeSearcher.getSearcher().equals(searcher) ) {
-					if ( activeSearcher.isCache() ) {
-						// the searcher is cached. Just leave it open.
-						logger.debug("Index searcher for {} is cached; leaving open", indexDirectory);
-						return;
-					} else {
-						// searcher is not cached. It was created from a writer, and we want
-						// the newest updates the next time that we get a searcher, so we will
-						// go ahead and close this one out.
-						itr.remove();
-						
-						// decrement the writer count because we incremented it when creating the searcher
-						final IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
-						if ( writerCount != null ) {
-							if ( writerCount.getCount() <= 1 ) {
-								try {
-									logger.debug("Index searcher for {} is not cached. Writer count is "
-											+ "decremented to {}; closing writer", indexDirectory, writerCount.getCount() - 1);
-									
-									writerCount.close();
-								} catch (final IOException ioe) {
-									logger.warn("Failed to close Index Writer for {} due to {}", absoluteFile, ioe);
-									if ( logger.isDebugEnabled() ) {
-										logger.warn("", ioe);
-									}
-								}
-							} else {
-								logger.debug("Index searcher for {} is not cached. Writer count is decremented "
-										+ "to {}; leaving writer open", indexDirectory, writerCount.getCount() - 1);
-								
-								writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
-									writerCount.getAnalyzer(), writerCount.getDirectory(), 
-									writerCount.getCount() - 1));
-							}
-						}
-
-						try {
-							logger.debug("Closing Index Searcher for {}", indexDirectory);
-							activeSearcher.close();
-						} catch (final IOException ioe) {
-							logger.warn("Failed to close Index Searcher for {} due to {}", absoluteFile, ioe);
-							if ( logger.isDebugEnabled() ) {
-								logger.warn("", ioe);
-							}
-						}
-					}
-				}
-			}
-		} finally {
-			lock.unlock();
-		}
-	}
-	
-	@Override
-	public void close() throws IOException {
-		logger.debug("Closing Index Manager");
-		
-		lock.lock();
-		try {
-			IOException ioe = null;
-			
-			for ( final IndexWriterCount count : writerCounts.values() ) {
-				try {
-					count.close();
-				} catch (final IOException e) {
-					if ( ioe == null ) {
-						ioe = e;
-					} else {
-						ioe.addSuppressed(e);
-					}
-				}
-			}
-			
-			for (final List<ActiveIndexSearcher> searcherList : activeSearchers.values()) {
-				for (final ActiveIndexSearcher searcher : searcherList) {
-					try {
-						searcher.close();
-					} catch (final IOException e) {
-						if ( ioe == null ) {
-							ioe = e;
-						} else {
-							ioe.addSuppressed(e);
-						}
-					}
-				}
-			}
-			
-			if ( ioe != null ) {
-				throw ioe;
-			}
-		} finally {
-			lock.unlock();
-		}
-	}
-
-	
-	private static void close(final Closeable... closeables) throws IOException {
-		IOException ioe = null;
-		for ( final Closeable closeable : closeables ) {
-			if ( closeable == null ) {
-				continue;
-			}
-			
-			try {
-				closeable.close();
-			} catch (final IOException e) {
-				if ( ioe == null ) {
-					ioe = e;
-				} else {
-					ioe.addSuppressed(e);
-				}
-			}
-		}
-		
-		if ( ioe != null ) {
-			throw ioe;
-		}
-	}
-	
-	
-	private static class ActiveIndexSearcher implements Closeable {
-		private final IndexSearcher searcher;
-		private final DirectoryReader directoryReader;
-		private final Directory directory;
-		private final boolean cache;
-		
-		public ActiveIndexSearcher(IndexSearcher searcher, DirectoryReader directoryReader, 
-				Directory directory, final boolean cache) {
-			this.searcher = searcher;
-			this.directoryReader = directoryReader;
-			this.directory = directory;
-			this.cache = cache;
-		}
-
-		public boolean isCache() {
-			return cache;
-		}
-
-		public IndexSearcher getSearcher() {
-			return searcher;
-		}
-		
-		@Override
-		public void close() throws IOException {
-			IndexManager.close(directoryReader, directory);
-		}
-	}
-	
-	
-	private static class IndexWriterCount implements Closeable {
-		private final IndexWriter writer;
-		private final Analyzer analyzer;
-		private final Directory directory;
-		private final int count;
-		
-		public IndexWriterCount(final IndexWriter writer, final Analyzer analyzer, final Directory directory, final int count) {
-			this.writer = writer;
-			this.analyzer = analyzer;
-			this.directory = directory;
-			this.count = count;
-		}
-
-		public Analyzer getAnalyzer() {
-			return analyzer;
-		}
-
-		public Directory getDirectory() {
-			return directory;
-		}
-
-		public IndexWriter getWriter() {
-			return writer;
-		}
-
-		public int getCount() {
-			return count;
-		}
-
-		@Override
-		public void close() throws IOException {
-			IndexManager.close(writer, analyzer, directory);
-		}
-	}
+            } else {
+                logger.debug("Providing existing index writer for {} and incrementing count to {}", indexingDirectory, writerCount.getCount() + 1);
+                writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
+                        writerCount.getAnalyzer(), writerCount.getDirectory(), writerCount.getCount() + 1));
+            }
+
+            return writerCount.getWriter();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public void returnIndexWriter(final File indexingDirectory, final IndexWriter writer) {
+        final File absoluteFile = indexingDirectory.getAbsoluteFile();
+        logger.debug("Returning Index Writer for {} to IndexManager", indexingDirectory);
+
+        lock.lock();
+        try {
+            IndexWriterCount count = writerCounts.remove(absoluteFile);
+
+            try {
+                if ( count == null ) {
+                    logger.warn("Index Writer {} was returned to IndexManager for {}, but this writer is not known. "
+                            + "This could potentially lead to a resource leak", writer, indexingDirectory);
+                    writer.close();
+                } else if ( count.getCount() <= 1 ) {
+                    // we are finished with this writer.
+                    logger.debug("Closing Index Writer for {}", indexingDirectory);
+                    count.close();
+                } else {
+                    // decrement the count.
+                    logger.debug("Decrementing count for Index Writer for {} to {}", indexingDirectory, count.getCount() - 1);
+                    writerCounts.put(absoluteFile, new IndexWriterCount(count.getWriter(), count.getAnalyzer(), count.getDirectory(), count.getCount() - 1));
+                }
+            } catch (final IOException ioe) {
+                logger.warn("Failed to close Index Writer {} due to {}", writer, ioe);
+                if ( logger.isDebugEnabled() ) {
+                    logger.warn("", ioe);
+                }
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+
+    public IndexSearcher borrowIndexSearcher(final File indexDir) throws IOException {
+        final File absoluteFile = indexDir.getAbsoluteFile();
+        logger.debug("Borrowing index searcher for {}", indexDir);
+
+        lock.lock();
+        try {
+            // check if we already have a reader cached.
+            List<ActiveIndexSearcher> currentlyCached = activeSearchers.get(absoluteFile);
+            if ( currentlyCached == null ) {
+                currentlyCached = new ArrayList<>();
+                activeSearchers.put(absoluteFile, currentlyCached);
+            } else {
+                // keep track of any searchers that have been closed so that we can remove them
+                // from our cache later.
+                final Set<ActiveIndexSearcher> expired = new HashSet<>();
+
+                try {
+                    for ( final ActiveIndexSearcher searcher : currentlyCached ) {
+                        if ( searcher.isCache() ) {
+                            final int refCount = searcher.getSearcher().getIndexReader().getRefCount();
+                            if ( refCount <= 0 ) {
+                                // if refCount == 0, then the reader has been closed, so we need to discard the searcher
+                                logger.debug("Reference count for cached Index Searcher for {} is currently {}; "
+                                        + "removing cached searcher", absoluteFile, refCount);
+                                expired.add(searcher);
+                                continue;
+                            }
+
+                            logger.debug("Providing previously cached index searcher for {}", indexDir);
+                            return searcher.getSearcher();
+                        }
+                    }
+                } finally {
+                    // if we have any expired index searchers, we need to close them and remove them
+                    // from the cache so that we don't try to use them again later.
+                    for ( final ActiveIndexSearcher searcher : expired ) {
+                        try {
+                            searcher.close();
+                        } catch (final Exception e) {
+                            logger.debug("Failed to close 'expired' IndexSearcher {}", searcher);
+                        }
+
+                        currentlyCached.remove(searcher);
+                    }
+                }
+            }
+
+            IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
+            if ( writerCount == null ) {
+                final Directory directory = FSDirectory.open(absoluteFile);
+                logger.debug("No Index Writer currently exists for {}; creating a cachable reader", indexDir);
+
+                try {
+                    final DirectoryReader directoryReader = DirectoryReader.open(directory);
+                    final IndexSearcher searcher = new IndexSearcher(directoryReader);
+
+                    // we want to cache the searcher that we create, since it's just a reader.
+                    final ActiveIndexSearcher cached = new ActiveIndexSearcher(searcher, directoryReader, directory, true);
+                    currentlyCached.add(cached);
+
+                    return cached.getSearcher();
+                } catch (final IOException e) {
+                    try {
+                        directory.close();
+                    } catch (final IOException ioe) {
+                        e.addSuppressed(ioe);
+                    }
+
+                    throw e;
+                }
+            } else {
+                logger.debug("Index Writer currently exists for {}; creating a non-cachable reader and incrementing "
+                        + "counter to {}", indexDir, writerCount.getCount() + 1);
+
+                // increment the writer count to ensure that it's kept open.
+                writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
+                        writerCount.getAnalyzer(), writerCount.getDirectory(), writerCount.getCount() + 1));
+
+                // create a new Index Searcher from the writer so that we don't have an issue with trying
+                // to read from a directory that's locked. If we get the "no segments* file found" with
+                // Lucene, this indicates that an IndexWriter already has the directory open.
+                final IndexWriter writer = writerCount.getWriter();
+                final DirectoryReader directoryReader = DirectoryReader.open(writer, false);
+                final IndexSearcher searcher = new IndexSearcher(directoryReader);
+
+                // we don't want to cache this searcher because it's based on a writer, so we want to get
+                // new values the next time that we search.
+                final ActiveIndexSearcher activeSearcher = new ActiveIndexSearcher(searcher, directoryReader, null, false);
+
+                currentlyCached.add(activeSearcher);
+                return activeSearcher.getSearcher();
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+
+    public void returnIndexSearcher(final File indexDirectory, final IndexSearcher searcher) {
+        final File absoluteFile = indexDirectory.getAbsoluteFile();
+        logger.debug("Returning index searcher for {} to IndexManager", indexDirectory);
+
+        lock.lock();
+        try {
+            // check if we already have a reader cached.
+            List<ActiveIndexSearcher> currentlyCached = activeSearchers.get(absoluteFile);
+            if ( currentlyCached == null ) {
+                logger.warn("Received Index Searcher for {} but no searcher was provided for that directory; this could "
+                        + "result in a resource leak", indexDirectory);
+                return;
+            }
+
+            final Iterator<ActiveIndexSearcher> itr = currentlyCached.iterator();
+            while (itr.hasNext()) {
+                final ActiveIndexSearcher activeSearcher = itr.next();
+                if ( activeSearcher.getSearcher().equals(searcher) ) {
+                    if ( activeSearcher.isCache() ) {
+                        // the searcher is cached. Just leave it open.
+                        logger.debug("Index searcher for {} is cached; leaving open", indexDirectory);
+                        return;
+                    } else {
+                        // searcher is not cached. It was created from a writer, and we want
+                        // the newest updates the next time that we get a searcher, so we will
+                        // go ahead and close this one out.
+                        itr.remove();
+
+                        // decrement the writer count because we incremented it when creating the searcher
+                        final IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
+                        if ( writerCount != null ) {
+                            if ( writerCount.getCount() <= 1 ) {
+                                try {
+                                    logger.debug("Index searcher for {} is not cached. Writer count is "
+                                            + "decremented to {}; closing writer", indexDirectory, writerCount.getCount() - 1);
+
+                                    writerCount.close();
+                                } catch (final IOException ioe) {
+                                    logger.warn("Failed to close Index Writer for {} due to {}", absoluteFile, ioe);
+                                    if ( logger.isDebugEnabled() ) {
+                                        logger.warn("", ioe);
+                                    }
+                                }
+                            } else {
+                                logger.debug("Index searcher for {} is not cached. Writer count is decremented "
+                                        + "to {}; leaving writer open", indexDirectory, writerCount.getCount() - 1);
+
+                                writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
+                                        writerCount.getAnalyzer(), writerCount.getDirectory(),
+                                        writerCount.getCount() - 1));
+                            }
+                        }
+
+                        try {
+                            logger.debug("Closing Index Searcher for {}", indexDirectory);
+                            activeSearcher.close();
+                        } catch (final IOException ioe) {
+                            logger.warn("Failed to close Index Searcher for {} due to {}", absoluteFile, ioe);
+                            if ( logger.isDebugEnabled() ) {
+                                logger.warn("", ioe);
+                            }
+                        }
+                    }
+                }
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        logger.debug("Closing Index Manager");
+
+        lock.lock();
+        try {
+            IOException ioe = null;
+
+            for ( final IndexWriterCount count : writerCounts.values() ) {
+                try {
+                    count.close();
+                } catch (final IOException e) {
+                    if ( ioe == null ) {
+                        ioe = e;
+                    } else {
+                        ioe.addSuppressed(e);
+                    }
+                }
+            }
+
+            for (final List<ActiveIndexSearcher> searcherList : activeSearchers.values()) {
+                for (final ActiveIndexSearcher searcher : searcherList) {
+                    try {
+                        searcher.close();
+                    } catch (final IOException e) {
+                        if ( ioe == null ) {
+                            ioe = e;
+                        } else {
+                            ioe.addSuppressed(e);
+                        }
+                    }
+                }
+            }
+
+            if ( ioe != null ) {
+                throw ioe;
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+
+    private static void close(final Closeable... closeables) throws IOException {
+        IOException ioe = null;
+        for ( final Closeable closeable : closeables ) {
+            if ( closeable == null ) {
+                continue;
+            }
+
+            try {
+                closeable.close();
+            } catch (final IOException e) {
+                if ( ioe == null ) {
+                    ioe = e;
+                } else {
+                    ioe.addSuppressed(e);
+                }
+            }
+        }
+
+        if ( ioe != null ) {
+            throw ioe;
+        }
+    }
+
+
+    private static class ActiveIndexSearcher implements Closeable {
+        private final IndexSearcher searcher;
+        private final DirectoryReader directoryReader;
+        private final Directory directory;
+        private final boolean cache;
+
+        public ActiveIndexSearcher(IndexSearcher searcher, DirectoryReader directoryReader,
+                Directory directory, final boolean cache) {
+            this.searcher = searcher;
+            this.directoryReader = directoryReader;
+            this.directory = directory;
+            this.cache = cache;
+        }
+
+        public boolean isCache() {
+            return cache;
+        }
+
+        public IndexSearcher getSearcher() {
+            return searcher;
+        }
+
+        @Override
+        public void close() throws IOException {
+            IndexManager.close(directoryReader, directory);
+        }
+    }
+
+
+    private static class IndexWriterCount implements Closeable {
+        private final IndexWriter writer;
+        private final Analyzer analyzer;
+        private final Directory directory;
+        private final int count;
+
+        public IndexWriterCount(final IndexWriter writer, final Analyzer analyzer, final Directory directory, final int count) {
+            this.writer = writer;
+            this.analyzer = analyzer;
+            this.directory = directory;
+            this.count = count;
+        }
+
+        public Analyzer getAnalyzer() {
+            return analyzer;
+        }
+
+        public Directory getDirectory() {
+            return directory;
+        }
+
+        public IndexWriter getWriter() {
+            return writer;
+        }
+
+        public int getCount() {
+            return count;
+        }
+
+        @Override
+        public void close() throws IOException {
+            IndexManager.close(writer, analyzer, directory);
+        }
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
index dcb6e08..53869f4 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
@@ -35,7 +35,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class IndexSearch {
-	private final Logger logger = LoggerFactory.getLogger(IndexSearch.class);
+    private final Logger logger = LoggerFactory.getLogger(IndexSearch.class);
     private final PersistentProvenanceRepository repository;
     private final File indexDirectory;
     private final IndexManager indexManager;
@@ -65,17 +65,17 @@ public class IndexSearch {
         final long start = System.nanoTime();
         IndexSearcher searcher = null;
         try {
-        	searcher = indexManager.borrowIndexSearcher(indexDirectory);
+            searcher = indexManager.borrowIndexSearcher(indexDirectory);
             final long searchStartNanos = System.nanoTime();
             final long openSearcherNanos = searchStartNanos - start;
-            
+
             final TopDocs topDocs = searcher.search(luceneQuery, provenanceQuery.getMaxResults());
             final long finishSearch = System.nanoTime();
             final long searchNanos = finishSearch - searchStartNanos;
-            
-            logger.debug("Searching {} took {} millis; opening searcher took {} millis", this, 
-            		TimeUnit.NANOSECONDS.toMillis(searchNanos), TimeUnit.NANOSECONDS.toMillis(openSearcherNanos));
-            
+
+            logger.debug("Searching {} took {} millis; opening searcher took {} millis", this,
+                    TimeUnit.NANOSECONDS.toMillis(searchNanos), TimeUnit.NANOSECONDS.toMillis(openSearcherNanos));
+
             if (topDocs.totalHits == 0) {
                 sqr.update(Collections.<ProvenanceEventRecord>emptyList(), 0);
                 return sqr;
@@ -83,31 +83,31 @@ public class IndexSearch {
 
             final DocsReader docsReader = new DocsReader(repository.getConfiguration().getStorageDirectories());
             matchingRecords = docsReader.read(topDocs, searcher.getIndexReader(), repository.getAllLogFiles(), retrievedCount, provenanceQuery.getMaxResults());
-            
+
             final long readRecordsNanos = System.nanoTime() - finishSearch;
             logger.debug("Reading {} records took {} millis for {}", matchingRecords.size(), TimeUnit.NANOSECONDS.toMillis(readRecordsNanos), this);
-            
+
             sqr.update(matchingRecords, topDocs.totalHits);
             return sqr;
         } catch (final FileNotFoundException e) {
             // nothing has been indexed yet, or the data has already aged off
-        	logger.warn("Attempted to search Provenance Index {} but could not find the file due to {}", indexDirectory, e);
-        	if ( logger.isDebugEnabled() ) {
-        		logger.warn("", e);
-        	}
-        	
+            logger.warn("Attempted to search Provenance Index {} but could not find the file due to {}", indexDirectory, e);
+            if ( logger.isDebugEnabled() ) {
+                logger.warn("", e);
+            }
+
             sqr.update(Collections.<ProvenanceEventRecord>emptyList(), 0);
             return sqr;
         } finally {
-        	if ( searcher != null ) {
-        		indexManager.returnIndexSearcher(indexDirectory, searcher);
-        	}
+            if ( searcher != null ) {
+                indexManager.returnIndexSearcher(indexDirectory, searcher);
+            }
         }
     }
 
-    
+
     @Override
     public String toString() {
-    	return "IndexSearcher[" + indexDirectory + "]";
+        return "IndexSearcher[" + indexDirectory + "]";
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java
index 5e87913..46be391 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java
@@ -16,50 +16,30 @@
  */
 package org.apache.nifi.provenance.lucene;
 
-import java.io.EOFException;
-import java.io.File;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.lucene.analysis.Analyzer;
-import org.apache.lucene.analysis.standard.StandardAnalyzer;
 import org.apache.lucene.document.Document;
 import org.apache.lucene.document.Field.Store;
 import org.apache.lucene.document.IntField;
 import org.apache.lucene.document.LongField;
 import org.apache.lucene.document.StringField;
 import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.index.IndexWriterConfig;
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.FSDirectory;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.provenance.IndexConfiguration;
 import org.apache.nifi.provenance.PersistentProvenanceRepository;
 import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.provenance.SearchableFields;
 import org.apache.nifi.provenance.StandardProvenanceEventRecord;
-import org.apache.nifi.provenance.rollover.RolloverAction;
 import org.apache.nifi.provenance.search.SearchableField;
-import org.apache.nifi.provenance.serialization.RecordReader;
-import org.apache.nifi.provenance.serialization.RecordReaders;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-public class IndexingAction implements RolloverAction {
-
-    private final PersistentProvenanceRepository repository;
+public class IndexingAction {
     private final Set<SearchableField> nonAttributeSearchableFields;
     private final Set<SearchableField> attributeSearchableFields;
-    private final IndexConfiguration indexConfiguration;
-    private static final Logger logger = LoggerFactory.getLogger(IndexingAction.class);
-
-    public IndexingAction(final PersistentProvenanceRepository repo, final IndexConfiguration indexConfig) {
-        repository = repo;
-        indexConfiguration = indexConfig;
 
+    public IndexingAction(final PersistentProvenanceRepository repo) {
         attributeSearchableFields = Collections.unmodifiableSet(new HashSet<>(repo.getConfiguration().getSearchableAttributes()));
         nonAttributeSearchableFields = Collections.unmodifiableSet(new HashSet<>(repo.getConfiguration().getSearchableFields()));
     }
@@ -72,7 +52,7 @@ public class IndexingAction implements RolloverAction {
         doc.add(new StringField(field.getSearchableFieldName(), value.toLowerCase(), store));
     }
 
-    
+
     public void index(final StandardProvenanceEventRecord record, final IndexWriter indexWriter, final Integer blockIndex) throws IOException {
         final Map<String, String> attributes = record.getAttributes();
 
@@ -105,14 +85,14 @@ public class IndexingAction implements RolloverAction {
             doc.add(new LongField(SearchableFields.EventTime.getSearchableFieldName(), record.getEventTime(), Store.NO));
             doc.add(new LongField(SearchableFields.FileSize.getSearchableFieldName(), record.getFileSize(), Store.NO));
             doc.add(new StringField(FieldNames.STORAGE_FILENAME, storageFilename, Store.YES));
-            
+
             if ( blockIndex == null ) {
-            	doc.add(new LongField(FieldNames.STORAGE_FILE_OFFSET, record.getStorageByteOffset(), Store.YES));
+                doc.add(new LongField(FieldNames.STORAGE_FILE_OFFSET, record.getStorageByteOffset(), Store.YES));
             } else {
-	            doc.add(new IntField(FieldNames.BLOCK_INDEX, blockIndex, Store.YES));
-	            doc.add(new LongField(SearchableFields.Identifier.getSearchableFieldName(), record.getEventId(), Store.YES));
+                doc.add(new IntField(FieldNames.BLOCK_INDEX, blockIndex, Store.YES));
+                doc.add(new LongField(SearchableFields.Identifier.getSearchableFieldName(), record.getEventId(), Store.YES));
             }
-            
+
             for (final String lineageIdentifier : record.getLineageIdentifiers()) {
                 addField(doc, SearchableFields.LineageIdentifier, lineageIdentifier, Store.NO);
             }
@@ -150,87 +130,4 @@ public class IndexingAction implements RolloverAction {
             indexWriter.addDocument(doc);
         }
     }
-    
-    @Override
-    public File execute(final File fileRolledOver) throws IOException {
-        final File indexingDirectory = indexConfiguration.getWritableIndexDirectory(fileRolledOver);
-        int indexCount = 0;
-        long maxId = -1L;
-
-        try (final Directory directory = FSDirectory.open(indexingDirectory);
-                final Analyzer analyzer = new StandardAnalyzer()) {
-
-            final IndexWriterConfig config = new IndexWriterConfig(LuceneUtil.LUCENE_VERSION, analyzer);
-            config.setWriteLockTimeout(300000L);
-
-            try (final IndexWriter indexWriter = new IndexWriter(directory, config);
-                    final RecordReader reader = RecordReaders.newRecordReader(fileRolledOver, repository.getAllLogFiles())) {
-                StandardProvenanceEventRecord record;
-                while (true) {
-                	final Integer blockIndex;
-                	if ( reader.isBlockIndexAvailable() ) {
-                		blockIndex = reader.getBlockIndex();
-                	} else {
-                		blockIndex = null;
-                	}
-                	
-                    try {
-                        record = reader.nextRecord();
-                    } catch (final EOFException eof) {
-                        // system was restarted while writing to the log file. Nothing we can do here, so ignore this record.
-                        // On system restart, the FlowFiles should be back in their "original" queues, so the events will be re-created
-                        // when the data is re-processed
-                        break;
-                    }
-
-                    if (record == null) {
-                        break;
-                    }
-
-                    maxId = record.getEventId();
-
-                    index(record, indexWriter, blockIndex);
-                    indexCount++;
-                }
-
-                indexWriter.commit();
-            } catch (final EOFException eof) {
-                // nothing in the file. Move on.
-            }
-        } finally {
-            if (maxId >= -1) {
-                indexConfiguration.setMaxIdIndexed(maxId);
-            }
-        }
-
-        final File newFile = new File(fileRolledOver.getParent(),
-                LuceneUtil.substringBeforeLast(fileRolledOver.getName(), ".")
-                + ".indexed."
-                + LuceneUtil.substringAfterLast(fileRolledOver.getName(), "."));
-
-        boolean renamed = false;
-        for (int i = 0; i < 10 && !renamed; i++) {
-            renamed = fileRolledOver.renameTo(newFile);
-            if (!renamed) {
-                try {
-                    Thread.sleep(25L);
-                } catch (final InterruptedException e) {
-                }
-            }
-        }
-
-        if (renamed) {
-            logger.info("Finished indexing Provenance Log File {} to index {} with {} records indexed and renamed file to {}",
-                    fileRolledOver, indexingDirectory, indexCount, newFile);
-            return newFile;
-        } else {
-            logger.warn("Finished indexing Provenance Log File {} with {} records indexed but failed to rename file to {}; indexed {} records", new Object[]{fileRolledOver, indexCount, newFile, indexCount});
-            return fileRolledOver;
-        }
-    }
-
-    @Override
-    public boolean hasBeenPerformed(final File fileRolledOver) {
-        return fileRolledOver.getName().contains(".indexed.");
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java
index 54cde15..3f75c00 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java
@@ -48,7 +48,8 @@ public class LineageQuery {
     public static final int MAX_LINEAGE_UUIDS = 100;
     private static final Logger logger = LoggerFactory.getLogger(LineageQuery.class);
 
-    public static Set<ProvenanceEventRecord> computeLineageForFlowFiles(final PersistentProvenanceRepository repo, final File indexDirectory, final String lineageIdentifier, final Collection<String> flowFileUuids) throws IOException {
+    public static Set<ProvenanceEventRecord> computeLineageForFlowFiles(final PersistentProvenanceRepository repo, final File indexDirectory,
+            final String lineageIdentifier, final Collection<String> flowFileUuids) throws IOException {
         if (requireNonNull(flowFileUuids).size() > MAX_LINEAGE_UUIDS) {
             throw new IllegalArgumentException(String.format("Cannot compute lineage for more than %s FlowFiles. This lineage contains %s.", MAX_LINEAGE_UUIDS, flowFileUuids.size()));
         }
@@ -99,7 +100,8 @@ public class LineageQuery {
             final DocsReader docsReader = new DocsReader(repo.getConfiguration().getStorageDirectories());
             final Set<ProvenanceEventRecord> recs = docsReader.read(uuidQueryTopDocs, indexReader, repo.getAllLogFiles(), new AtomicInteger(0), Integer.MAX_VALUE);
             final long readDocsEnd = System.nanoTime();
-            logger.debug("Finished Lineage Query; Lucene search took {} millis, reading records took {} millis", TimeUnit.NANOSECONDS.toMillis(searchEnd - searchStart), TimeUnit.NANOSECONDS.toMillis(readDocsEnd - searchEnd));
+            logger.debug("Finished Lineage Query; Lucene search took {} millis, reading records took {} millis",
+                    TimeUnit.NANOSECONDS.toMillis(searchEnd - searchStart), TimeUnit.NANOSECONDS.toMillis(readDocsEnd - searchEnd));
 
             return recs;
         }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java
index 59dc10b..c622ea1 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LuceneUtil.java
@@ -78,16 +78,16 @@ public class LuceneUtil {
         final String searchString = baseName + ".";
         for (final Path path : allProvenanceLogs) {
             if (path.toFile().getName().startsWith(searchString)) {
-            	final File file = path.toFile();
-            	if ( file.exists() ) {
-            		matchingFiles.add(file);
-            	} else {
-            		final File dir = file.getParentFile();
-            		final File gzFile = new File(dir, file.getName() + ".gz");
-            		if ( gzFile.exists() ) {
-            			matchingFiles.add(gzFile);
-            		}
-            	}
+                final File file = path.toFile();
+                if ( file.exists() ) {
+                    matchingFiles.add(file);
+                } else {
+                    final File dir = file.getParentFile();
+                    final File gzFile = new File(dir, file.getName() + ".gz");
+                    if ( gzFile.exists() ) {
+                        matchingFiles.add(gzFile);
+                    }
+                }
             }
         }
 
@@ -144,16 +144,16 @@ public class LuceneUtil {
                 final IndexableField fileOffset1 = o1.getField(FieldNames.BLOCK_INDEX);
                 final IndexableField fileOffset2 = o1.getField(FieldNames.BLOCK_INDEX);
                 if ( fileOffset1 != null && fileOffset2 != null ) {
-                	final int blockIndexResult = Long.compare(fileOffset1.numericValue().longValue(), fileOffset2.numericValue().longValue());
-                	if ( blockIndexResult != 0 ) {
-                		return blockIndexResult;
-                	}
-                	
-                	final long eventId1 = o1.getField(SearchableFields.Identifier.getSearchableFieldName()).numericValue().longValue();
-                	final long eventId2 = o2.getField(SearchableFields.Identifier.getSearchableFieldName()).numericValue().longValue();
-                	return Long.compare(eventId1, eventId2);
+                    final int blockIndexResult = Long.compare(fileOffset1.numericValue().longValue(), fileOffset2.numericValue().longValue());
+                    if ( blockIndexResult != 0 ) {
+                        return blockIndexResult;
+                    }
+
+                    final long eventId1 = o1.getField(SearchableFields.Identifier.getSearchableFieldName()).numericValue().longValue();
+                    final long eventId2 = o2.getField(SearchableFields.Identifier.getSearchableFieldName()).numericValue().longValue();
+                    return Long.compare(eventId1, eventId2);
                 }
-                
+
                 final long offset1 = o1.getField(FieldNames.STORAGE_FILE_OFFSET).numericValue().longValue();
                 final long offset2 = o2.getField(FieldNames.STORAGE_FILE_OFFSET).numericValue().longValue();
                 return Long.compare(offset1, offset2);

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/rollover/CompressionAction.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/rollover/CompressionAction.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/rollover/CompressionAction.java
deleted file mode 100644
index d014618..0000000
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/rollover/CompressionAction.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.provenance.rollover;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-
-import org.apache.nifi.stream.io.GZIPOutputStream;
-import org.apache.nifi.stream.io.StreamUtils;
-import org.apache.nifi.provenance.lucene.IndexingAction;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class CompressionAction implements RolloverAction {
-
-    private static final Logger logger = LoggerFactory.getLogger(IndexingAction.class);
-
-    @Override
-    public File execute(final File fileRolledOver) throws IOException {
-        final File gzFile = new File(fileRolledOver.getParent(), fileRolledOver.getName() + ".gz");
-        try (final FileInputStream in = new FileInputStream(fileRolledOver);
-                final OutputStream fos = new FileOutputStream(gzFile);
-                final GZIPOutputStream gzipOut = new GZIPOutputStream(fos, 1)) {
-            StreamUtils.copy(in, gzipOut);
-            in.getFD().sync();
-        }
-
-        boolean deleted = false;
-        for (int i = 0; i < 10 && !deleted; i++) {
-            deleted = fileRolledOver.delete();
-        }
-
-        logger.info("Finished compressing Provenance Log File {}", fileRolledOver);
-        return gzFile;
-    }
-
-    @Override
-    public boolean hasBeenPerformed(final File fileRolledOver) {
-        return fileRolledOver.getName().contains(".gz");
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/rollover/RolloverAction.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/rollover/RolloverAction.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/rollover/RolloverAction.java
deleted file mode 100644
index 33401e9..0000000
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/rollover/RolloverAction.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.provenance.rollover;
-
-import java.io.File;
-import java.io.IOException;
-
-public interface RolloverAction {
-
-    /**
-     * Performs some action against the given File and returns the new File that
-     * contains the modified version
-     *
-     * @param fileRolledOver
-     * @return
-     * @throws IOException
-     */
-    File execute(File fileRolledOver) throws IOException;
-
-    boolean hasBeenPerformed(File fileRolledOver);
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReader.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReader.java
index 8bdc88a..91c8222 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReader.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReader.java
@@ -24,75 +24,80 @@ import org.apache.nifi.provenance.toc.TocReader;
 
 public interface RecordReader extends Closeable {
 
-	/**
-	 * Returns the next record in the reader, or <code>null</code> if there is no more data available.
-	 * @return
-	 * @throws IOException
-	 */
+    /**
+     * Returns the next record in the reader, or <code>null</code> if there is no more data available.
+     * @return the next Provenance event in the stream
+     * @throws IOException if unable to read the next event from the stream
+     */
     StandardProvenanceEventRecord nextRecord() throws IOException;
 
     /**
      * Skips the specified number of bytes
-     * @param bytesToSkip
-     * @throws IOException
+     * @param bytesToSkip the number of bytes to skip ahead
+     * @throws IOException if unable to skip ahead the specified number of bytes (e.g., the stream does
+     *  not contain this many more bytes)
      */
     void skip(long bytesToSkip) throws IOException;
 
     /**
      * Skips to the specified byte offset in the underlying stream.
-     * @param position
+     * @param position the byte offset to skip to
      * @throws IOException if the underlying stream throws IOException, or if the reader has already
      * passed the specified byte offset
      */
     void skipTo(long position) throws IOException;
-    
+
     /**
      * Skips to the specified compression block
-     * 
-     * @param blockIndex
+     *
+     * @param blockIndex the byte index to skip to
      * @throws IOException if the underlying stream throws IOException, or if the reader has already
      * read passed the specified compression block index
      * @throws IllegalStateException if the RecordReader does not have a TableOfContents associated with it
      */
     void skipToBlock(int blockIndex) throws IOException;
-    
+
     /**
      * Returns the block index that the Reader is currently reading from.
      * Note that the block index is incremented at the beginning of the {@link #nextRecord()}
-     * method. This means that this method will return the block from which the previous record was read, 
+     * method. This means that this method will return the block from which the previous record was read,
      * if calling {@link #nextRecord()} continually, not the block from which the next record will be read.
-     * @return
+     *
+     * @return the current block index
+     * @throws IllegalStateException if the reader is reading a provenance event file that does not contain
+     * a Table of Contents
      */
     int getBlockIndex();
-    
+
     /**
      * Returns <code>true</code> if the compression block index is available. It will be available
      * if and only if the reader is created with a TableOfContents
-     * 
-     * @return
+     *
+     * @return true if the reader is reading from an event file that has a Table of Contents
      */
     boolean isBlockIndexAvailable();
-    
+
     /**
      * Returns the {@link TocReader} that is used to keep track of compression blocks, if one exists,
      * <code>null</code> otherwise
-     * @return
+     *
+     * @return the TocReader if the underlying event file has an Table of Contents, <code>null</code> otherwise.
      */
     TocReader getTocReader();
-    
+
     /**
-     * Returns the number of bytes that have been consumed from the stream (read or skipped).
-     * @return
+     * @return the number of bytes that have been consumed from the stream (read or skipped).
      */
     long getBytesConsumed();
-    
+
     /**
      * Returns the ID of the last event in this record reader, or -1 if the reader has no records or
      * has already read through all records. Note: This method will consume the stream until the end,
      * so no more records will be available on this reader after calling this method.
-     * 
-     * @return
-     * @throws IOException
+     *
+     * @return the ID of the last event in this record reader, or -1 if the reader has no records or
+     * has already read through all records
+     * @throws IOException if unable to get id of the last event
      */
     long getMaxEventId() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
index dff281c..cab5e6f 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
@@ -37,75 +37,75 @@ public class RecordReaders {
         InputStream fis = null;
 
         try {
-	        if (!file.exists()) {
-	            if (provenanceLogFiles != null) {
-		            final String baseName = LuceneUtil.substringBefore(file.getName(), ".") + ".";
-		            for (final Path path : provenanceLogFiles) {
-		                if (path.toFile().getName().startsWith(baseName)) {
-		                    file = path.toFile();
-		                    break;
-		                }
-		            }
-	            }
-	        }
-	
-	        if ( file.exists() ) {
-	            try {
-	                fis = new FileInputStream(file);
-	            } catch (final FileNotFoundException fnfe) {
-	                fis = null;
-	            }
-	        }
-	        
-	        String filename = file.getName();
-	        openStream: while ( fis == null ) {
-	            final File dir = file.getParentFile();
-	            final String baseName = LuceneUtil.substringBefore(file.getName(), ".");
-	            
-	            // depending on which rollover actions have occurred, we could have 3 possibilities for the
-	            // filename that we need. The majority of the time, we will use the extension ".prov.indexed.gz"
-	            // because most often we are compressing on rollover and most often we have already finished
-	            // compressing by the time that we are querying the data.
-	            for ( final String extension : new String[] {".prov.gz", ".prov"} ) {
-	                file = new File(dir, baseName + extension);
-	                if ( file.exists() ) {
-	                    try {
-	                        fis = new FileInputStream(file);
-	                        filename = baseName + extension;
-	                        break openStream;
-	                    } catch (final FileNotFoundException fnfe) {
-	                        // file was modified by a RolloverAction after we verified that it exists but before we could
-	                        // create an InputStream for it. Start over.
-	                        fis = null;
-	                        continue openStream;
-	                    }
-	                }
-	            }
-	            
-	            break;
-	        }
-	
-	        if ( fis == null ) {
-	            throw new FileNotFoundException("Unable to locate file " + originalFile);
-	        }
-	
-	    	final File tocFile = TocUtil.getTocFile(file);
-	    	if ( tocFile.exists() ) {
-	    		final TocReader tocReader = new StandardTocReader(tocFile);
-	    		return new StandardRecordReader(fis, filename, tocReader);
-	    	} else {
-	    		return new StandardRecordReader(fis, filename);
-	    	}
+            if (!file.exists()) {
+                if (provenanceLogFiles != null) {
+                    final String baseName = LuceneUtil.substringBefore(file.getName(), ".") + ".";
+                    for (final Path path : provenanceLogFiles) {
+                        if (path.toFile().getName().startsWith(baseName)) {
+                            file = path.toFile();
+                            break;
+                        }
+                    }
+                }
+            }
+
+            if ( file.exists() ) {
+                try {
+                    fis = new FileInputStream(file);
+                } catch (final FileNotFoundException fnfe) {
+                    fis = null;
+                }
+            }
+
+            String filename = file.getName();
+            openStream: while ( fis == null ) {
+                final File dir = file.getParentFile();
+                final String baseName = LuceneUtil.substringBefore(file.getName(), ".");
+
+                // depending on which rollover actions have occurred, we could have 3 possibilities for the
+                // filename that we need. The majority of the time, we will use the extension ".prov.indexed.gz"
+                // because most often we are compressing on rollover and most often we have already finished
+                // compressing by the time that we are querying the data.
+                for ( final String extension : new String[] {".prov.gz", ".prov"} ) {
+                    file = new File(dir, baseName + extension);
+                    if ( file.exists() ) {
+                        try {
+                            fis = new FileInputStream(file);
+                            filename = baseName + extension;
+                            break openStream;
+                        } catch (final FileNotFoundException fnfe) {
+                            // file was modified by a RolloverAction after we verified that it exists but before we could
+                            // create an InputStream for it. Start over.
+                            fis = null;
+                            continue openStream;
+                        }
+                    }
+                }
+
+                break;
+            }
+
+            if ( fis == null ) {
+                throw new FileNotFoundException("Unable to locate file " + originalFile);
+            }
+
+            final File tocFile = TocUtil.getTocFile(file);
+            if ( tocFile.exists() ) {
+                final TocReader tocReader = new StandardTocReader(tocFile);
+                return new StandardRecordReader(fis, filename, tocReader);
+            } else {
+                return new StandardRecordReader(fis, filename);
+            }
         } catch (final IOException ioe) {
-        	if ( fis != null ) {
-        		try {
-        			fis.close();
-        		} catch (final IOException inner) {
-        			ioe.addSuppressed(inner);
-        		}
-        	}
-        	
-        	throw ioe;
+            if ( fis != null ) {
+                try {
+                    fis.close();
+                } catch (final IOException inner) {
+                    ioe.addSuppressed(inner);
+                }
+            }
+
+            throw ioe;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java
index 58f4dc2..d89fd6f 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java
@@ -28,31 +28,27 @@ public interface RecordWriter extends Closeable {
     /**
      * Writes header information to the underlying stream
      *
-     * @throws IOException
+     * @throws IOException if unable to write header information to the underlying stream
      */
     void writeHeader() throws IOException;
 
     /**
      * Writes the given record out to the underlying stream
      *
-     * @param record
-     * @param recordIdentifier
+     * @param record the record to write
+     * @param recordIdentifier the new identifier of the record
      * @return the number of bytes written for the given records
-     * @throws IOException
+     * @throws IOException if unable to write the record to the stream
      */
     long writeRecord(ProvenanceEventRecord record, long recordIdentifier) throws IOException;
 
     /**
-     * Returns the number of Records that have been written to this RecordWriter
-     *
-     * @return
+     * @return the number of Records that have been written to this RecordWriter
      */
     int getRecordsWritten();
 
     /**
-     * Returns the file that this RecordWriter is writing to
-     *
-     * @return
+     * @return the file that this RecordWriter is writing to
      */
     File getFile();
 
@@ -73,19 +69,18 @@ public interface RecordWriter extends Closeable {
      * not immediately available, returns <code>false</code>; otherwise, obtains
      * the lock and returns <code>true</code>.
      *
-     * @return
+     * @return <code>true</code> if the lock was obtained, <code>false</code> otherwise.
      */
     boolean tryLock();
 
     /**
      * Syncs the content written to this writer to disk.
-     * @throws java.io.IOException
+     * @throws IOException if unable to sync content to disk
      */
     void sync() throws IOException;
 
     /**
-     * Returns the TOC Writer that is being used to write the Table of Contents for this journal
-     * @return
+     * @return the TOC Writer that is being used to write the Table of Contents for this journal
      */
     TocWriter getTocWriter();
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cd18b0b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriters.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriters.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriters.java
index 47b7c7e..cf8f7b4 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriters.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriters.java
@@ -25,14 +25,14 @@ import org.apache.nifi.provenance.toc.TocUtil;
 import org.apache.nifi.provenance.toc.TocWriter;
 
 public class RecordWriters {
-	private static final int DEFAULT_COMPRESSION_BLOCK_SIZE = 1024 * 1024;	// 1 MB
+    private static final int DEFAULT_COMPRESSION_BLOCK_SIZE = 1024 * 1024; // 1 MB
 
     public static RecordWriter newRecordWriter(final File file, final boolean compressed, final boolean createToc) throws IOException {
-    	return newRecordWriter(file, compressed, createToc, DEFAULT_COMPRESSION_BLOCK_SIZE);
+        return newRecordWriter(file, compressed, createToc, DEFAULT_COMPRESSION_BLOCK_SIZE);
     }
-    
+
     public static RecordWriter newRecordWriter(final File file, final boolean compressed, final boolean createToc, final int compressionBlockBytes) throws IOException {
-    	final TocWriter tocWriter = createToc ? new StandardTocWriter(TocUtil.getTocFile(file), false, false) : null;
+        final TocWriter tocWriter = createToc ? new StandardTocWriter(TocUtil.getTocFile(file), false, false) : null;
         return new StandardRecordWriter(file, tocWriter, compressed, compressionBlockBytes);
     }