You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/02/27 21:47:37 UTC

[10/12] incubator-nifi git commit: NIFI-388: Initial implementation of prov repo; not yet finished but pushing so that the code is not lost

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f23f36d7/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/config/JournalingRepositoryConfig.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/config/JournalingRepositoryConfig.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/config/JournalingRepositoryConfig.java
index 18871c7..c1a7de8 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/config/JournalingRepositoryConfig.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/config/JournalingRepositoryConfig.java
@@ -28,6 +28,8 @@ import org.apache.nifi.provenance.search.SearchableField;
 
 public class JournalingRepositoryConfig {
     private Map<String, File> containers = new HashMap<>();
+    private Map<String, Long> containerCapacities = new HashMap<>();
+    
     private long expirationMillis = TimeUnit.MILLISECONDS.convert(24, TimeUnit.HOURS);
     private long storageCapacity = 1024L * 1024L * 1024L;   // 1 GB
     private long rolloverMillis = TimeUnit.MILLISECONDS.convert(5, TimeUnit.MINUTES);
@@ -35,13 +37,16 @@ public class JournalingRepositoryConfig {
     private long desiredIndexBytes = 1024L * 1024L * 500L; // 500 MB
     private int partitionCount = 16;
     private int blockSize = 5000;
-    private int indexesPerContainer = 2;
-
+    private int indexesPerContainer = 1;
+    private long expirationFrequency = TimeUnit.MINUTES.toNanos(2L);
+    
     private List<SearchableField> searchableFields = new ArrayList<>();
     private List<SearchableField> searchableAttributes = new ArrayList<>();
     private boolean compress = true;
     private boolean alwaysSync = false;
-    private int threadPoolSize = 4;
+    private int workerThreadPoolSize = 2;
+    private int queryThreadPoolSize = 2;
+    private int compressionThreadPoolSize = 1;
     private boolean readOnly = false;
 
     public void setReadOnly(final boolean readOnly) {
@@ -74,6 +79,27 @@ public class JournalingRepositoryConfig {
         this.containers = new HashMap<>(containers);
     }
 
+    
+    public long getMaxCapacity(final String containerName) {
+        final Long maxCapacity = containerCapacities.get(containerName);
+        if ( maxCapacity == null ) {
+            return getMaxStorageCapacity();
+        } else {
+            return maxCapacity;
+        }
+    }
+
+    public void setMaxContainerCapacities(final Map<String, Long> containerCapacities) {
+        this.containerCapacities = new HashMap<>(containerCapacities);
+    }
+
+    public void setMaxContainerCapacity(final String containerName, final long maxCapacity) {
+        if ( maxCapacity < 1 ) {
+            throw new IllegalArgumentException("Cannot set max container capacity for " + containerName + " to " + maxCapacity + " bytes");
+        }
+
+        this.containerCapacities.put(containerName, maxCapacity);
+    }
     /**
      * Returns the maximum amount of time that a given record will stay in the
      * repository
@@ -207,15 +233,28 @@ public class JournalingRepositoryConfig {
         this.compress = compress;
     }
 
-    public int getThreadPoolSize() {
-        return threadPoolSize;
+    public int getWorkerThreadPoolSize() {
+        return workerThreadPoolSize;
+    }
+
+    public void setWorkerThreadPoolSize(final int workerThreadPoolSize) {
+        if (workerThreadPoolSize < 1) {
+            throw new IllegalArgumentException();
+        }
+        this.workerThreadPoolSize = workerThreadPoolSize;
     }
+    
+    
 
-    public void setThreadPoolSize(final int queryThreadPoolSize) {
+    public int getQueryThreadPoolSize() {
+        return queryThreadPoolSize;
+    }
+
+    public void setQueryThreadPoolSize(int queryThreadPoolSize) {
         if (queryThreadPoolSize < 1) {
             throw new IllegalArgumentException();
         }
-        this.threadPoolSize = queryThreadPoolSize;
+        this.queryThreadPoolSize = queryThreadPoolSize;
     }
 
     /**
@@ -308,7 +347,7 @@ public class JournalingRepositoryConfig {
      * Returns the minimum number of Provenance Events that should be written to a single Block.
      * Events are written out in blocks, which are later optionally compressed. A larger block size
      * will potentially result in better compression. However, a smaller block size will result
-     * in better performance when reading the data. The default value is 100 events per block.
+     * in better performance when reading the data. The default value is 5000 events per block.
      * 
      * @return
      */
@@ -320,7 +359,7 @@ public class JournalingRepositoryConfig {
      * Sets the minimum number of Provenance Events that should be written to a single Block.
      * Events are written out in blocks, which are later optionally compressed. A larger block size
      * will potentially result in better compression. However, a smaller block size will result
-     * in better performance when reading the data. The default value is 100 events per block.
+     * in better performance when reading the data. The default value is 5000 events per block.
      * 
      * @return
      */
@@ -330,4 +369,20 @@ public class JournalingRepositoryConfig {
         }
         this.blockSize = blockSize;
     }
+
+    public int getCompressionThreadPoolSize() {
+        return compressionThreadPoolSize;
+    }
+
+    public void setCompressionThreadPoolSize(final int compressionThreadPoolSize) {
+        this.compressionThreadPoolSize = compressionThreadPoolSize;
+    }
+    
+    public long getExpirationFrequency(final TimeUnit unit) {
+        return unit.convert(expirationFrequency, TimeUnit.NANOSECONDS);
+    }
+    
+    public void setExpirationFrequency(final long period, final TimeUnit unit) {
+        this.expirationFrequency = unit.toNanos(period);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f23f36d7/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/EventIndexSearcher.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/EventIndexSearcher.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/EventIndexSearcher.java
index 753378d..85e02c0 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/EventIndexSearcher.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/EventIndexSearcher.java
@@ -18,6 +18,7 @@ package org.apache.nifi.provenance.journaling.index;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.Collection;
 import java.util.List;
 
 import org.apache.nifi.provenance.journaling.JournaledStorageLocation;
@@ -32,7 +33,53 @@ public interface EventIndexSearcher extends Closeable {
      */
     SearchResult search(Query query) throws IOException;
     
+    /**
+     * Returns the locations of all events for a FlowFile that has a FlowFile UUID in the collection of
+     * UUIDs provided, if the event time occurs between earliestTime and latestTime. The return value is 
+     * ordered in the order in which the records should be read from the journals in order to obtain 
+     * maximum efficiency
+     * 
+     * @param flowFileUuids
+     * @param earliestTime
+     * @param latestTime
+     * 
+     * @return
+     * @throws IOException
+     */
+    List<JournaledStorageLocation> getEventsForFlowFiles(Collection<String> flowFileUuids, long earliestTime, long latestTime) throws IOException;
+    
+    /**
+     * Returns the locations of events that have Event ID's at least equal to minEventId, and returns
+     * up to the given number of results
+     * 
+     * @param minEventId
+     * @param maxResults
+     * @return
+     * @throws IOException
+     */
     List<JournaledStorageLocation> getEvents(long minEventId, int maxResults) throws IOException;
     
+    /**
+     * Returns the largest event id that is known by the index being searched
+     * @param container
+     * @param section
+     * @return
+     * @throws IOException
+     */
     Long getMaxEventId(String container, String section) throws IOException;
+    
+    /**
+     * Returns the locations of the latest events for the index being searched
+     * @param numEvents
+     * @return
+     * @throws IOException
+     */
+    List<JournaledStorageLocation> getLatestEvents(int numEvents) throws IOException;
+    
+    /**
+     * Returns the total number of events that exist for the index being searched
+     * @return
+     * @throws IOException
+     */
+    long getNumberOfEvents() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f23f36d7/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/EventIndexWriter.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/EventIndexWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/EventIndexWriter.java
index 1f231e9..a151838 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/EventIndexWriter.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/EventIndexWriter.java
@@ -44,5 +44,23 @@ public interface EventIndexWriter extends Closeable {
      * @param journalId
      * @throws IOException
      */
-    void delete(String containerName, String section, String journalId) throws IOException;
+    void delete(String containerName, String section, Long journalId) throws IOException;
+    
+    /**
+     * Deletes any records that belong to the given container and section but have a journal Id less
+     * than the specified value
+     * @param containerName
+     * @param section
+     * @param journalId
+     * @throws IOException
+     */
+    void deleteEventsBefore(String containerName, String section, Long journalId) throws IOException;
+    
+    
+    /**
+     * Removes all events from the index that occurred before the given time
+     * @param earliestEventTimeToDelete
+     * @throws IOException
+     */
+    void deleteOldEvents(long earliestEventTimeToDelete) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f23f36d7/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/IndexAction.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/IndexAction.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/IndexAction.java
new file mode 100644
index 0000000..6486d56
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/IndexAction.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.journaling.index;
+
+import java.io.IOException;
+
+public interface IndexAction<T> {
+    T perform(EventIndexSearcher searcher) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f23f36d7/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/IndexManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/IndexManager.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/IndexManager.java
index 141b84a..34d1b18 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/IndexManager.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/IndexManager.java
@@ -17,7 +17,9 @@
 package org.apache.nifi.provenance.journaling.index;
 
 import java.io.Closeable;
+import java.io.File;
 import java.io.IOException;
+import java.util.Set;
 
 public interface IndexManager extends Closeable {
 
@@ -37,5 +39,112 @@ public interface IndexManager extends Closeable {
      */
     Long getMaxEventId(String container, String section) throws IOException;
     
+    /**
+     * Returns the total number of indices for all containers
+     * @return
+     */
+    int getNumberOfIndices();
+    
+    /**
+     * Returns a new {@link EventIndexSearcher} that can be used to search the indices for
+     * the given container
+     * 
+     * @param containerName the containerName to search
+     * 
+     * @return
+     * @throws IOException
+     */
     EventIndexSearcher newIndexSearcher(String containerName) throws IOException;
+    
+    /**
+     * Executes the given action against each index and returns a Set of results,
+     * where each result is obtained from performing the given action against one index
+     * 
+     * @param action the action to perform
+     * @return
+     * @throws IOException
+     */
+    <T> Set<T> withEachIndex(IndexAction<T> action) throws IOException;
+    
+    /**
+     * Performs the given action against each index, waiting for the action to complete
+     * against each index before returning
+     * 
+     * @param action the action to perform against each index
+     * @throws IOException
+     */
+    void withEachIndex(VoidIndexAction action) throws IOException;
+    
+    /**
+     * Performs the given action against each index
+     * 
+     * @param action the action to perform
+     * 
+     * @param async if true, the method will return immediatley and the actions will occur
+     * in the background. If <code>false</code>, the method will not return until the action
+     * has been performed against all indices
+     * 
+     * @throws IOException
+     */
+    void withEachIndex(VoidIndexAction action, boolean async) throws IOException;
+    
+    /**
+     * Removes any events that have a Storage Location that includes the provided containerName, secitonIndex, and journalId,
+     * and then re-adds all of the events that are in the given journalFile.
+     * @param containerName
+     * @param sectionIndex
+     * @param journalId
+     * @param journalFile
+     * @throws IOException
+     */
+    void reindex(final String containerName, final int sectionIndex, final Long journalId, final File journalFile) throws IOException;
+    
+    /**
+     * Syncs all indices
+     * @throws IOException
+     */
+    void sync() throws IOException;
+    
+    /**
+     * Returns the total number of events in all indices
+     * @return
+     */
+    long getNumberOfEvents() throws IOException;
+    
+    /**
+     * Removes all events from all indices that occurred before the given time
+     * @param earliestEventTimeToDelete
+     * @throws IOException
+     */
+    void deleteOldEvents(long earliestEventTimeToDelete) throws IOException;
+    
+    /**
+     * Deletes any events from the index that belong to the given container, section, and journal id
+     * 
+     * @param containerName
+     * @param sectionIndex
+     * @param journalId
+     * 
+     * @throws IOException
+     */
+    void deleteEvents(String containerName, int sectionIndex, Long journalId) throws IOException;
+    
+    /**
+     * Deletes any events from the index that belong to the given container and section but have
+     * a journal id before the value specified
+     * 
+     * @param containerName
+     * @param sectionIndex
+     * @param journalId
+     * 
+     * @throws IOException
+     */
+    void deleteEventsBefore(String containerName, int sectionIndex, Long journalId) throws IOException;
+    
+    /**
+     * Returns the size (in bytes) of the index for the given container
+     * @param containerName
+     * @return
+     */
+    long getSize(String containerName);
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f23f36d7/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexManager.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexManager.java
index d10fd00..e212342 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexManager.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexManager.java
@@ -20,13 +20,21 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.nifi.provenance.journaling.JournaledProvenanceEvent;
 import org.apache.nifi.provenance.journaling.config.JournalingRepositoryConfig;
+import org.apache.nifi.provenance.journaling.toc.TocJournalReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,14 +42,14 @@ public class LuceneIndexManager implements IndexManager {
     private static final Logger logger = LoggerFactory.getLogger(LuceneIndexManager.class);
     
     private final JournalingRepositoryConfig config;
-    private final ScheduledExecutorService executor;
+    private final ExecutorService queryExecutor;
     
     private final Map<String, List<LuceneIndexWriter>> writers = new HashMap<>();
     private final Map<String, AtomicLong> writerIndexes = new HashMap<>();
     
-    public LuceneIndexManager(final JournalingRepositoryConfig config, final ScheduledExecutorService executor) throws IOException {
+    public LuceneIndexManager(final JournalingRepositoryConfig config, final ScheduledExecutorService workerExecutor, final ExecutorService queryExecutor) throws IOException {
         this.config = config;
-        this.executor = executor;
+        this.queryExecutor = queryExecutor;
         
         final int rolloverSeconds = (int) config.getJournalRolloverPeriod(TimeUnit.SECONDS);
         if ( !config.isReadOnly() ) {
@@ -58,7 +66,7 @@ public class LuceneIndexManager implements IndexManager {
                     writerList.add(new LuceneIndexWriter(indexDir, config));
                 }
                 
-                executor.scheduleWithFixedDelay(new Runnable() {
+                workerExecutor.scheduleWithFixedDelay(new Runnable() {
                     @Override
                     public void run() {
                         try {
@@ -150,6 +158,15 @@ public class LuceneIndexManager implements IndexManager {
         return max;
     }
 
+    @Override
+    public void sync() throws IOException {
+        for ( final List<LuceneIndexWriter> writerList : writers.values() ) {
+            for ( final LuceneIndexWriter writer : writerList ) {
+                writer.sync();
+            }
+        }
+    }
+    
     
     private void sync(final String containerName) throws IOException {
         final AtomicLong index = writerIndexes.get(containerName);
@@ -175,4 +192,190 @@ public class LuceneIndexManager implements IndexManager {
             }
         }        
     }
+    
+    @Override
+    public <T> Set<T> withEachIndex(final IndexAction<T> action) throws IOException {
+        final Set<T> results = new HashSet<>();
+        final Map<String, Future<T>> futures = new HashMap<>();
+        final Set<String> containerNames = config.getContainers().keySet();
+        for (final String containerName : containerNames) {
+            final Callable<T> callable = new Callable<T>() {
+                @Override
+                public T call() throws Exception {
+                    try (final EventIndexSearcher searcher = newIndexSearcher(containerName)) {
+                        return action.perform(searcher);
+                    }
+                }
+            };
+            
+            final Future<T> future = queryExecutor.submit(callable);
+            futures.put(containerName, future);
+        }
+        
+        for ( final Map.Entry<String, Future<T>> entry : futures.entrySet() ) {
+            try {
+                final T result = entry.getValue().get();
+                results.add(result);
+            } catch (final ExecutionException ee) {
+                final Throwable cause = ee.getCause();
+                if ( cause instanceof IOException ) {
+                    throw (IOException) cause;
+                } else {
+                    throw new RuntimeException("Failed to query Container " + entry.getKey() + " due to " + cause, cause);
+                }
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        }
+        
+        return results;
+    }
+    
+    @Override
+    public void withEachIndex(final VoidIndexAction action) throws IOException {
+        withEachIndex(action, false);
+    }
+    
+    @Override
+    public void withEachIndex(final VoidIndexAction action, final boolean async) throws IOException {
+        final Map<String, Future<?>> futures = new HashMap<>();
+        final Set<String> containerNames = config.getContainers().keySet();
+
+        for (final String containerName : containerNames) {
+            final Callable<Object> callable = new Callable<Object>() {
+                @Override
+                public Object call() throws IOException {
+                    try (final EventIndexSearcher searcher = newIndexSearcher(containerName)) {
+                        action.perform(searcher);
+                        return null;
+                    } catch (final Throwable t) {
+                        if ( async ) {
+                            logger.error("Failed to perform action against container " + containerName + " due to " + t, t);
+                            if ( logger.isDebugEnabled() ) {
+                                logger.error("", t);
+                            }
+                            
+                            return null;
+                        } else {
+                            throw new IOException("Failed to perform action against container " + containerName + " due to " + t, t);
+                        }
+                    }
+                }
+            };
+            
+            final Future<?> future = queryExecutor.submit(callable);
+            futures.put(containerName, future);
+        }
+        
+        if ( !async ) {
+            for ( final Map.Entry<String, Future<?>> entry : futures.entrySet() ) {
+                try {
+                    // throw any exception thrown by runnable
+                    entry.getValue().get();
+                } catch (final ExecutionException ee) {
+                    final Throwable cause = ee.getCause();
+                    if ( cause instanceof IOException ) {
+                        throw ((IOException) cause);
+                    }
+                    
+                    throw new RuntimeException("Failed to query Partition " + entry.getKey() + " due to " + cause, cause);
+                } catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+    }
+    
+    @Override
+    public int getNumberOfIndices() {
+        return config.getContainers().size();
+    }
+    
+    @Override
+    public void deleteEvents(final String containerName, final int sectionIndex, final Long journalId) throws IOException {
+        final List<LuceneIndexWriter> writerList = writers.get(containerName);
+        for ( final LuceneIndexWriter writer : writerList ) {
+            writer.delete(containerName, String.valueOf(sectionIndex), journalId);
+        }
+    }
+    
+    @Override
+    public void deleteEventsBefore(final String containerName, final int sectionIndex, final Long journalId) throws IOException {
+        final List<LuceneIndexWriter> writerList = writers.get(containerName);
+        for ( final LuceneIndexWriter writer : writerList ) {
+            writer.deleteEventsBefore(containerName, String.valueOf(sectionIndex), journalId);
+        }
+    }
+    
+    @Override
+    public void reindex(final String containerName, final int sectionIndex, final Long journalId, final File journalFile) throws IOException {
+        deleteEvents(containerName, sectionIndex, journalId);
+        
+        final LuceneIndexWriter writer = getIndexWriter(containerName);
+        try (final TocJournalReader reader = new TocJournalReader(containerName, String.valueOf(sectionIndex), journalId, journalFile)) {
+            final List<JournaledProvenanceEvent> events = new ArrayList<>(1000);
+            JournaledProvenanceEvent event;
+            
+            while ((event = reader.nextJournaledEvent()) != null) {
+                events.add(event);
+                if ( events.size() >= 1000 ) {
+                    writer.index(events);
+                    events.clear();
+                }
+            }
+            
+            if (!events.isEmpty() ) {
+                writer.index(events);
+            }
+        }
+    }
+    
+    @Override
+    public long getNumberOfEvents() throws IOException {
+        final AtomicLong totalCount = new AtomicLong(0L);
+        withEachIndex(new VoidIndexAction() {
+            @Override
+            public void perform(final EventIndexSearcher searcher) throws IOException {
+                totalCount.addAndGet(searcher.getNumberOfEvents());
+            }
+        });
+        
+        return totalCount.get();
+    }
+    
+    @Override
+    public void deleteOldEvents(final long earliestEventTimeToDelete) throws IOException {
+        for ( final String containerName : config.getContainers().keySet() ) {
+            final List<LuceneIndexWriter> writerList = writers.get(containerName);
+            for ( final LuceneIndexWriter writer : writerList ) {
+                writer.deleteOldEvents(earliestEventTimeToDelete);
+            }
+        }
+    }
+    
+    
+    @Override
+    public long getSize(final String containerName) {
+        final File containerFile = config.getContainers().get(containerName);
+        final File indicesDir = new File(containerFile, "indices");
+        
+        return getSize(indicesDir);
+    }
+    
+    private long getSize(final File file) {
+        if ( file.isDirectory() ) {
+            long totalSize = 0L;
+            
+            final File[] children = file.listFiles();
+            if ( children != null ) {
+                for ( final File child : children ) {
+                    totalSize += getSize(child);
+                }
+            }
+            
+            return totalSize;
+        } else {
+            return file.length();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f23f36d7/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexSearcher.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexSearcher.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexSearcher.java
index a9dd1a5..94bd3f8 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexSearcher.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexSearcher.java
@@ -19,6 +19,7 @@ package org.apache.nifi.provenance.journaling.index;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 
 import org.apache.lucene.document.Document;
@@ -27,6 +28,7 @@ import org.apache.lucene.index.Term;
 import org.apache.lucene.search.BooleanClause.Occur;
 import org.apache.lucene.search.BooleanQuery;
 import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.MatchAllDocsQuery;
 import org.apache.lucene.search.NumericRangeQuery;
 import org.apache.lucene.search.ScoreDoc;
 import org.apache.lucene.search.Sort;
@@ -34,7 +36,9 @@ import org.apache.lucene.search.SortField;
 import org.apache.lucene.search.SortField.Type;
 import org.apache.lucene.search.TermQuery;
 import org.apache.lucene.search.TopDocs;
+import org.apache.lucene.search.TopFieldDocs;
 import org.apache.lucene.store.FSDirectory;
+import org.apache.nifi.provenance.SearchableFields;
 import org.apache.nifi.provenance.journaling.JournaledStorageLocation;
 import org.apache.nifi.provenance.search.Query;
 
@@ -43,16 +47,20 @@ public class LuceneIndexSearcher implements EventIndexSearcher {
     private final IndexSearcher searcher;
     private final FSDirectory fsDirectory;
     
+    private final String description;
+    
     public LuceneIndexSearcher(final File indexDirectory) throws IOException {
         this.fsDirectory = FSDirectory.open(indexDirectory);
         this.reader = DirectoryReader.open(fsDirectory);
         this.searcher = new IndexSearcher(reader);
+        this.description = "LuceneIndexSearcher[indexDirectory=" + indexDirectory + "]";
     }
     
-    public LuceneIndexSearcher(final DirectoryReader reader) {
+    public LuceneIndexSearcher(final DirectoryReader reader, final File indexDirectory) {
         this.reader = reader;
         this.searcher = new IndexSearcher(reader);
         this.fsDirectory = null;
+        this.description = "LuceneIndexSearcher[indexDirectory=" + indexDirectory + "]";
     }
     
     @Override
@@ -76,29 +84,35 @@ public class LuceneIndexSearcher implements EventIndexSearcher {
     private JournaledStorageLocation createLocation(final Document document) {
         final String containerName = document.get(IndexedFieldNames.CONTAINER_NAME);
         final String sectionName = document.get(IndexedFieldNames.SECTION_NAME);
-        final String journalId = document.get(IndexedFieldNames.JOURNAL_ID);
+        final long journalId = document.getField(IndexedFieldNames.JOURNAL_ID).numericValue().longValue();
         final int blockIndex = document.getField(IndexedFieldNames.BLOCK_INDEX).numericValue().intValue();
         final long eventId = document.getField(IndexedFieldNames.EVENT_ID).numericValue().longValue();
         
         return new JournaledStorageLocation(containerName, sectionName, journalId, blockIndex, eventId);
     }
     
-    private List<JournaledStorageLocation> getLocations(final TopDocs topDocs) throws IOException {
+    private List<JournaledStorageLocation> getOrderedLocations(final TopDocs topDocs) throws IOException {
         final ScoreDoc[] scoreDocs = topDocs.scoreDocs;
         final List<JournaledStorageLocation> locations = new ArrayList<>(scoreDocs.length);
-        for ( final ScoreDoc scoreDoc : scoreDocs ) {
+        populateLocations(topDocs, locations);
+        
+        return locations;
+    }
+    
+    
+    private void populateLocations(final TopDocs topDocs, final Collection<JournaledStorageLocation> locations) throws IOException {
+        for ( final ScoreDoc scoreDoc : topDocs.scoreDocs ) {
             final Document document = reader.document(scoreDoc.doc);
             locations.add(createLocation(document));
         }
-        
-        return locations;
     }
     
+    
     @Override
     public SearchResult search(final Query provenanceQuery) throws IOException {
         final org.apache.lucene.search.Query luceneQuery = QueryUtils.convertQueryToLucene(provenanceQuery);
         final TopDocs topDocs = searcher.search(luceneQuery, provenanceQuery.getMaxResults());
-        final List<JournaledStorageLocation> locations = getLocations(topDocs);
+        final List<JournaledStorageLocation> locations = getOrderedLocations(topDocs);
         
         return new SearchResult(locations, topDocs.totalHits);
     }
@@ -109,7 +123,7 @@ public class LuceneIndexSearcher implements EventIndexSearcher {
         query.add(NumericRangeQuery.newLongRange(IndexedFieldNames.EVENT_ID, minEventId, null, true, true), Occur.MUST);
         
         final TopDocs topDocs = searcher.search(query, maxResults, new Sort(new SortField(IndexedFieldNames.EVENT_ID, Type.LONG)));
-        return getLocations(topDocs);
+        return getOrderedLocations(topDocs);
     }
 
     @Override
@@ -125,7 +139,7 @@ public class LuceneIndexSearcher implements EventIndexSearcher {
         }
         
         final TopDocs topDocs = searcher.search(query, 1, new Sort(new SortField(IndexedFieldNames.EVENT_ID, Type.LONG, true)));
-        final List<JournaledStorageLocation> locations = getLocations(topDocs);
+        final List<JournaledStorageLocation> locations = getOrderedLocations(topDocs);
         if ( locations.isEmpty() ) {
             return null;
         }
@@ -133,4 +147,45 @@ public class LuceneIndexSearcher implements EventIndexSearcher {
         return locations.get(0).getEventId();
     }
 
+    @Override
+    public List<JournaledStorageLocation> getEventsForFlowFiles(final Collection<String> flowFileUuids, final long earliestTime, final long latestTime) throws IOException {
+        // Create a query for all Events related to the FlowFiles of interest. We do this by adding all ID's as
+        // "SHOULD" clauses and then setting the minimum required to 1.
+        final BooleanQuery flowFileIdQuery;
+        if (flowFileUuids == null || flowFileUuids.isEmpty()) {
+            flowFileIdQuery = null;
+        } else {
+            flowFileIdQuery = new BooleanQuery();
+            for (final String flowFileUuid : flowFileUuids) {
+                flowFileIdQuery.add(new TermQuery(new Term(SearchableFields.FlowFileUUID.getSearchableFieldName(), flowFileUuid)), Occur.SHOULD);
+            }
+            flowFileIdQuery.setMinimumNumberShouldMatch(1);
+        }
+        
+        flowFileIdQuery.add(NumericRangeQuery.newLongRange(SearchableFields.EventTime.getSearchableFieldName(), 
+                earliestTime, latestTime, true, true), Occur.MUST);
+        
+        final TopDocs topDocs = searcher.search(flowFileIdQuery, 1000);
+        return getOrderedLocations(topDocs);
+    }
+    
+    
+    @Override
+    public List<JournaledStorageLocation> getLatestEvents(final int numEvents) throws IOException {
+        final MatchAllDocsQuery query = new MatchAllDocsQuery();
+        
+        final TopFieldDocs topDocs = searcher.search(query, numEvents, new Sort(new SortField(IndexedFieldNames.EVENT_ID, Type.LONG, true)));
+        final List<JournaledStorageLocation> locations = getOrderedLocations(topDocs);
+        return locations;
+    }
+    
+    @Override
+    public String toString() {
+        return description;
+    }
+
+    @Override
+    public long getNumberOfEvents() {
+        return reader.numDocs();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f23f36d7/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexWriter.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexWriter.java
index b61ad34..d1b2c0e 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexWriter.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexWriter.java
@@ -21,10 +21,12 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Date;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.lucene.analysis.Analyzer;
@@ -33,6 +35,7 @@ import org.apache.lucene.document.Document;
 import org.apache.lucene.document.Field.Store;
 import org.apache.lucene.document.LongField;
 import org.apache.lucene.document.StringField;
+import org.apache.lucene.index.ConcurrentMergeScheduler;
 import org.apache.lucene.index.DirectoryReader;
 import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.index.IndexWriterConfig;
@@ -40,6 +43,8 @@ import org.apache.lucene.index.Term;
 import org.apache.lucene.search.BooleanClause;
 import org.apache.lucene.search.BooleanClause.Occur;
 import org.apache.lucene.search.BooleanQuery;
+import org.apache.lucene.search.NumericRangeQuery;
+import org.apache.lucene.search.Query;
 import org.apache.lucene.search.TermQuery;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.FSDirectory;
@@ -61,6 +66,7 @@ public class LuceneIndexWriter implements EventIndexWriter {
     private final JournalingRepositoryConfig config;
     private final Set<SearchableField> nonAttributeSearchableFields;
     private final Set<SearchableField> attributeSearchableFields;
+    private final File indexDir;
     
     private final Directory directory;
     private final Analyzer analyzer;
@@ -68,6 +74,7 @@ public class LuceneIndexWriter implements EventIndexWriter {
     private final AtomicLong indexMaxId = new AtomicLong(-1L);
     
     public LuceneIndexWriter(final File indexDir, final JournalingRepositoryConfig config) throws IOException {
+        this.indexDir = indexDir;
         this.config = config;
         
         attributeSearchableFields = Collections.unmodifiableSet(new HashSet<>(config.getSearchableAttributes()));
@@ -76,12 +83,20 @@ public class LuceneIndexWriter implements EventIndexWriter {
         directory = FSDirectory.open(indexDir);
         analyzer = new StandardAnalyzer();
         final IndexWriterConfig writerConfig = new IndexWriterConfig(Version.LATEST, analyzer);
+        // Increase number of concurrent merges since we are on SSD:
+        final ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler();
+        writerConfig.setMergeScheduler(cms);
+        final int mergeThreads = Math.max(2, Math.min(4, config.getWorkerThreadPoolSize() / 2));
+        cms.setMaxMergesAndThreads(mergeThreads, mergeThreads);
+        
         indexWriter = new IndexWriter(directory, writerConfig);
     }
     
     public EventIndexSearcher newIndexSearcher() throws IOException {
+        logger.trace("Creating index searcher for {}", indexWriter);
+        
         final DirectoryReader reader = DirectoryReader.open(indexWriter, false);
-        return new LuceneIndexSearcher(reader);
+        return new LuceneIndexSearcher(reader, indexDir);
     }
 
     @Override
@@ -119,6 +134,8 @@ public class LuceneIndexWriter implements EventIndexWriter {
     public void index(final Collection<JournaledProvenanceEvent> events) throws IOException {
         long maxId = this.indexMaxId.get();
         
+        final long startNanos = System.nanoTime();
+        
         final List<Document> documents = new ArrayList<>(events.size());
         for ( final JournaledProvenanceEvent event : events ) {
             maxId = event.getEventId();
@@ -154,7 +171,7 @@ public class LuceneIndexWriter implements EventIndexWriter {
             final JournaledStorageLocation location = event.getStorageLocation();
             doc.add(new StringField(IndexedFieldNames.CONTAINER_NAME, location.getContainerName(), Store.YES));
             doc.add(new StringField(IndexedFieldNames.SECTION_NAME, location.getSectionName(), Store.YES));
-            doc.add(new StringField(IndexedFieldNames.JOURNAL_ID, location.getJournalId(), Store.YES));
+            doc.add(new LongField(IndexedFieldNames.JOURNAL_ID, location.getJournalId(), Store.YES));
             doc.add(new LongField(IndexedFieldNames.BLOCK_INDEX, location.getBlockIndex(), Store.YES));
             doc.add(new LongField(IndexedFieldNames.EVENT_ID, location.getEventId(), Store.YES));
 
@@ -207,22 +224,59 @@ public class LuceneIndexWriter implements EventIndexWriter {
                 updated = true;
             }
         } while (!updated);
+        
+        final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+        logger.debug("Indexed {} events in {} millis with {}", events.size(), millis, this);
     }
     
     
     @Override
-    public void delete(final String containerName, final String section, final String journalId) throws IOException {
+    public void delete(final String containerName, final String section, final Long journalId) throws IOException {
+        final BooleanQuery query = new BooleanQuery();
+        query.add(new BooleanClause(new TermQuery(new Term(IndexedFieldNames.CONTAINER_NAME, containerName)), Occur.MUST));
+        query.add(new BooleanClause(new TermQuery(new Term(IndexedFieldNames.SECTION_NAME, section)), Occur.MUST));
+        query.add(NumericRangeQuery.newLongRange(IndexedFieldNames.JOURNAL_ID, journalId, journalId, true, true), Occur.MUST);
+        
+        final long start = System.nanoTime();
+        indexWriter.deleteDocuments(query);
+        final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+        logger.info("Deleted events from {} that matched container={}, section={}, journal={} in {} millis", indexWriter, containerName, section, journalId, millis);
+    }
+    
+    @Override
+    public void deleteEventsBefore(final String containerName, final String section, final Long journalId) throws IOException {
         final BooleanQuery query = new BooleanQuery();
         query.add(new BooleanClause(new TermQuery(new Term(IndexedFieldNames.CONTAINER_NAME, containerName)), Occur.MUST));
         query.add(new BooleanClause(new TermQuery(new Term(IndexedFieldNames.SECTION_NAME, section)), Occur.MUST));
-        query.add(new BooleanClause(new TermQuery(new Term(IndexedFieldNames.JOURNAL_ID, journalId)), Occur.MUST));
+        query.add(NumericRangeQuery.newLongRange(IndexedFieldNames.JOURNAL_ID, 0L, journalId, true, false), Occur.MUST);
         
+        final long start = System.nanoTime();
         indexWriter.deleteDocuments(query);
+        final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+        logger.info("Deleted events from {} that matched container={}, section={}, journal less than {} in {} millis", indexWriter, containerName, section, journalId, millis);
     }
     
     
     @Override
+    public void deleteOldEvents(final long earliestEventTimeToDelete) throws IOException {
+        final Query query = NumericRangeQuery.newLongRange(SearchableFields.EventTime.getSearchableFieldName(), 0L, earliestEventTimeToDelete, true, true);
+        
+        final long start = System.nanoTime();
+        indexWriter.deleteDocuments(query);
+        final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+        logger.info("Deleted events from {} that ocurred before {}; deletion took {} millis", this, new Date(earliestEventTimeToDelete), millis);
+    }
+    
+    @Override
     public void sync() throws IOException {
+        final long start = System.nanoTime();
         indexWriter.commit();
+        final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+        logger.info("Successfully sync'ed {} in {} millis", this, millis);
+    }
+    
+    @Override
+    public String toString() {
+        return "LuceneIndexWriter[indexDir=" + indexDir + "]";
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f23f36d7/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/MultiIndexSearcher.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/MultiIndexSearcher.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/MultiIndexSearcher.java
index d086ff5..4accf50 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/MultiIndexSearcher.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/MultiIndexSearcher.java
@@ -18,7 +18,9 @@ package org.apache.nifi.provenance.journaling.index;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
 
 import org.apache.nifi.provenance.journaling.JournaledStorageLocation;
@@ -63,6 +65,7 @@ public class MultiIndexSearcher implements EventIndexSearcher {
             locations.addAll(result.getLocations());
         }
         
+        Collections.sort(locations);
         return new SearchResult(locations, totalHitCount);
     }
 
@@ -109,4 +112,52 @@ public class MultiIndexSearcher implements EventIndexSearcher {
         return max;
     }
 
+    @Override
+    public List<JournaledStorageLocation> getEventsForFlowFiles(final Collection<String> flowFileUuids, final long earliestTime, final long latestTime) throws IOException {
+        final List<JournaledStorageLocation> locations = new ArrayList<>();
+        for ( final EventIndexSearcher searcher : searchers ) {
+            final List<JournaledStorageLocation> indexLocations = searcher.getEventsForFlowFiles(flowFileUuids, earliestTime, latestTime);
+            if ( indexLocations != null && !indexLocations.isEmpty() ) {
+                locations.addAll(indexLocations);
+            }
+        }
+        
+        Collections.sort(locations);
+        return locations;
+    }
+    
+    @Override
+    public List<JournaledStorageLocation> getLatestEvents(final int numEvents) throws IOException {
+        final List<JournaledStorageLocation> locations = new ArrayList<>();
+        for ( final EventIndexSearcher searcher : searchers ) {
+            final List<JournaledStorageLocation> indexLocations = searcher.getLatestEvents(numEvents);
+            if ( indexLocations != null && !indexLocations.isEmpty() ) {
+                locations.addAll(indexLocations);
+            }
+        }
+        
+        Collections.sort(locations, new Comparator<JournaledStorageLocation>() {
+            @Override
+            public int compare(final JournaledStorageLocation o1, final JournaledStorageLocation o2) {
+                return Long.compare(o1.getEventId(), o2.getEventId());
+            }
+        });
+        return locations;
+    }
+    
+    @Override
+    public long getNumberOfEvents() throws IOException {
+        long totalCount = 0;
+        
+        for ( final EventIndexSearcher searcher : searchers ) {
+            totalCount += searcher.getNumberOfEvents();
+        }
+        
+        return totalCount;
+    }
+    
+    @Override
+    public String toString() {
+        return searchers.toString();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f23f36d7/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/QueryUtils.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/QueryUtils.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/QueryUtils.java
index 4ae4b16..d8dd5eb 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/QueryUtils.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/QueryUtils.java
@@ -91,6 +91,15 @@ public class QueryUtils {
     }
     
     
+    /**
+     * Orders the given StorageLocations so that we have a Map where the Key is a Journal file and the value is a List of JournaledStorageLocation in the order
+     * in which they should be read from the journal for optimal performance
+     * @param locations
+     * @param config
+     * @return
+     * @throws FileNotFoundException
+     * @throws IOException
+     */
     public static Map<File, List<JournaledStorageLocation>> orderLocations(final List<StorageLocation> locations, final JournalingRepositoryConfig config) throws FileNotFoundException, IOException {
         final Map<File, List<JournaledStorageLocation>> map = new HashMap<>();
         

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f23f36d7/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/VoidIndexAction.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/VoidIndexAction.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/VoidIndexAction.java
new file mode 100644
index 0000000..a1ae22f
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/VoidIndexAction.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.journaling.index;
+
+import java.io.IOException;
+
+public interface VoidIndexAction {
+    void perform(EventIndexSearcher searcher) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f23f36d7/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/JournalReader.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/JournalReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/JournalReader.java
index 535d1dd..3345f50 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/JournalReader.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/JournalReader.java
@@ -44,4 +44,12 @@ public interface JournalReader extends Closeable {
      * @return
      */
     long getPosition();
+    
+    /**
+     * Retrieves the last event from the journal, given the offset of the last Block in the journal
+     * @param blockOffset
+     * @return
+     * @throws IOException
+     */
+    ProvenanceEventRecord getLastEvent(long blockOffset) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f23f36d7/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalReader.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalReader.java
index 2ec5131..df4a2d0 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalReader.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalReader.java
@@ -62,13 +62,21 @@ public class StandardJournalReader implements JournalReader {
     private void resetStreams() throws IOException {
         final InputStream bufferedIn = new BufferedInputStream(new FileInputStream(file));
         compressedStream = new ByteCountingInputStream(bufferedIn);
-        final DataInputStream dis = new DataInputStream(compressedStream);
-        final String codecName = dis.readUTF();
-        serializationVersion = dis.readInt();
-        compressed = dis.readBoolean();
-        deserializer = Deserializers.getDeserializer(codecName);
-        
-        resetDecompressedStream();
+        try {
+            final DataInputStream dis = new DataInputStream(compressedStream);
+            final String codecName = dis.readUTF();
+            serializationVersion = dis.readInt();
+            compressed = dis.readBoolean();
+            deserializer = Deserializers.getDeserializer(codecName);
+            
+            resetDecompressedStream();
+        } catch (final Exception e) {
+            try {
+                compressedStream.close();
+            } catch (final IOException ignore) {}
+            
+            throw new IOException("Failed to reset data stream when reading" + file, e);
+        }
     }
     
     private void resetDecompressedStream() throws IOException {
@@ -80,6 +88,15 @@ public class StandardJournalReader implements JournalReader {
     }
     
     @Override
+    public void close() throws IOException {
+        compressedStream.close();
+        
+        if ( compressedStream != decompressedStream ) {
+            decompressedStream.close();
+        }
+    }
+    
+    @Override
     public ProvenanceEventRecord nextEvent() throws IOException {
         return nextEvent(true);
     }
@@ -145,7 +162,7 @@ public class StandardJournalReader implements JournalReader {
         // of the file. We do this because we know that the ID's are always increasing, so if we need an ID less
         // than the previous ID, we have to go backward in the file. We can't do this with streams, so start the
         // stream over.
-        if ( eventId < lastEventIdRead ) {
+        if ( eventId <= lastEventIdRead ) {
             close();
             resetStreams();
         }
@@ -167,12 +184,36 @@ public class StandardJournalReader implements JournalReader {
     }
 
     @Override
-    public void close() throws IOException {
-        decompressedStream.close();
-    }
-    
-    @Override
     public String toString() {
         return "StandardJournalReader[" + file + "]";
     }
+
+    @Override
+    public ProvenanceEventRecord getLastEvent(final long blockOffset) throws IOException {
+        if ( blockOffset > compressedStream.getBytesConsumed() ) {
+            close();
+            resetStreams();
+        }
+        
+        final long bytesToSkip = blockOffset - compressedStream.getBytesConsumed();
+        if ( bytesToSkip > 0 ) {
+            StreamUtils.skip(compressedStream, bytesToSkip);
+            resetDecompressedStream();
+        }
+        
+        ProvenanceEventRecord lastReadRecord = null;
+        ProvenanceEventRecord event;
+        while ((event = nextEvent()) != null) {
+            lastReadRecord = event;
+        }
+        
+        // If we weren't able to read anything and the block offset was given, just start over
+        // and read the entire thing, returning the last event.
+        if ( lastReadRecord == null && blockOffset > 0L ) {
+            return getLastEvent(0L);
+        }
+        
+        // return the last even that we read, whether or not it was null
+        return lastReadRecord;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f23f36d7/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalWriter.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalWriter.java
index af5f8de..8d322b9 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalWriter.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalWriter.java
@@ -16,11 +16,17 @@
  */
 package org.apache.nifi.provenance.journaling.journals;
 
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
 import java.io.DataOutputStream;
+import java.io.EOFException;
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.OutputStream;
+import java.nio.file.FileAlreadyExistsException;
 import java.util.Collection;
 import java.util.concurrent.TimeUnit;
 
@@ -30,6 +36,8 @@ import org.apache.nifi.remote.io.CompressionOutputStream;
 import org.apache.nifi.stream.io.BufferedOutputStream;
 import org.apache.nifi.stream.io.ByteArrayOutputStream;
 import org.apache.nifi.stream.io.ByteCountingOutputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
@@ -82,11 +90,14 @@ import org.apache.nifi.stream.io.ByteCountingOutputStream;
  * 
  */
 public class StandardJournalWriter implements JournalWriter {
+    private static final Logger logger = LoggerFactory.getLogger(StandardJournalWriter.class);
+    
     private final long journalId;
     private final File journalFile;
     private final boolean compressed;
     private final Serializer serializer;
     private final long creationTime = System.nanoTime();
+    private final String description;
     
     private int eventCount;
     private boolean blockStarted = false;
@@ -101,10 +112,28 @@ public class StandardJournalWriter implements JournalWriter {
     
     
     public StandardJournalWriter(final long journalId, final File journalFile, final boolean compressed, final Serializer serializer) throws IOException {
+        if ( journalFile.exists() ) {
+            // Check if there is actually any data here.
+            try (final InputStream fis = new FileInputStream(journalFile);
+                 final InputStream bufferedIn = new BufferedInputStream(fis);
+                 final DataInputStream dis = new DataInputStream(bufferedIn) ) {
+                dis.readUTF();
+                dis.readInt();
+                dis.readBoolean();
+                final int nextByte = dis.read();
+                if ( nextByte > -1 ) {
+                    throw new FileAlreadyExistsException(journalFile.getAbsolutePath());
+                }
+            } catch (final EOFException eof) {
+                // If we catch an EOF, there's no real data here, so we can overwrite the file.
+            }
+        }
+        
         this.journalId = journalId;
         this.journalFile = journalFile;
         this.compressed = compressed;
         this.serializer = serializer;
+        this.description = "Journal Writer for " + journalFile;
         this.fos = new FileOutputStream(journalFile);
         
         uncompressedStream = new ByteCountingOutputStream(fos);
@@ -164,6 +193,7 @@ public class StandardJournalWriter implements JournalWriter {
 
     @Override
     public void write(final Collection<ProvenanceEventRecord> events, final long firstEventId) throws IOException {
+        final long start = System.nanoTime();
         final int avgRecordSize = (int) (recordBytes / recordCount);
         
         final ByteArrayOutputStream baos = new ByteArrayOutputStream(avgRecordSize);
@@ -192,6 +222,9 @@ public class StandardJournalWriter implements JournalWriter {
         } finally {
             outDos.flush();
         }
+        
+        final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+        logger.debug("Finished writing {} events to {} in {} millis", events.size(), this, millis);
     }
     
 
@@ -202,7 +235,10 @@ public class StandardJournalWriter implements JournalWriter {
 
     @Override
     public void sync() throws IOException {
+        final long start = System.nanoTime();
         fos.getFD().sync();
+        final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()- start);
+        logger.debug("Successfully sync'ed {} in {} millis", this, millis);
     }
 
     @Override
@@ -259,6 +295,6 @@ public class StandardJournalWriter implements JournalWriter {
     
     @Override
     public String toString() {
-        return "Journal Writer for " + journalFile;
+        return description;
     }
 }