You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/02/27 21:47:34 UTC

[07/12] incubator-nifi git commit: continuing to implement

continuing to implement


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/a5f557ad
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/a5f557ad
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/a5f557ad

Branch: refs/heads/journaling-prov-repo
Commit: a5f557ad966c4fa70ae0a0239e3bf70dcd788ff0
Parents: b95e756
Author: Mark Payne <ma...@hotmail.com>
Authored: Sun Feb 15 10:45:29 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Sun Feb 15 10:45:29 2015 -0500

----------------------------------------------------------------------
 .../JournalingProvenanceRepository.java         |  30 ++-
 .../config/JournalingRepositoryConfig.java      |   5 +
 .../journaling/index/EventIndexSearcher.java    |   2 +
 .../journaling/index/IndexManager.java          |  41 +++
 .../journaling/index/LuceneIndexManager.java    | 178 +++++++++++++
 .../journaling/index/LuceneIndexSearcher.java   |  23 ++
 .../journaling/index/LuceneIndexWriter.java     |   7 +-
 .../journaling/index/MultiIndexSearcher.java    | 112 ++++++++
 .../journals/StandardJournalWriter.java         |  40 ++-
 .../partition/JournalingPartition.java          | 254 ++++++++++---------
 .../journaling/partition/PartitionManager.java  |  13 +
 .../partition/QueuingPartitionManager.java      |  16 +-
 .../journaling/tasks/CompressionTask.java       |   2 +-
 .../journaling/toc/StandardTocWriter.java       |  25 +-
 14 files changed, 607 insertions(+), 141 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5f557ad/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java
index 7911d73..cc97ee9 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java
@@ -30,9 +30,10 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -47,6 +48,8 @@ import org.apache.nifi.provenance.StandardProvenanceEventRecord;
 import org.apache.nifi.provenance.StorageLocation;
 import org.apache.nifi.provenance.StoredProvenanceEvent;
 import org.apache.nifi.provenance.journaling.config.JournalingRepositoryConfig;
+import org.apache.nifi.provenance.journaling.index.IndexManager;
+import org.apache.nifi.provenance.journaling.index.LuceneIndexManager;
 import org.apache.nifi.provenance.journaling.index.QueryUtils;
 import org.apache.nifi.provenance.journaling.journals.JournalReader;
 import org.apache.nifi.provenance.journaling.journals.StandardJournalReader;
@@ -75,11 +78,12 @@ public class JournalingProvenanceRepository implements ProvenanceEventRepository
     
     private final JournalingRepositoryConfig config;
     private final AtomicLong idGenerator = new AtomicLong(0L);
-    private final ExecutorService executor;
+    private final ScheduledExecutorService executor;
     
     private EventReporter eventReporter;    // effectively final
     private PartitionManager partitionManager;  // effectively final
     private QueryManager queryManager;    // effectively final
+    private IndexManager indexManager;    // effectively final
     
     public JournalingProvenanceRepository() throws IOException {
         this(createConfig());
@@ -87,7 +91,16 @@ public class JournalingProvenanceRepository implements ProvenanceEventRepository
     
     public JournalingProvenanceRepository(final JournalingRepositoryConfig config) throws IOException {
         this.config = config;
-        this.executor = Executors.newFixedThreadPool(config.getThreadPoolSize());
+        this.executor = Executors.newScheduledThreadPool(config.getThreadPoolSize(), new ThreadFactory() {
+            private final ThreadFactory defaultFactory = Executors.defaultThreadFactory();
+            
+            @Override
+            public Thread newThread(final Runnable r) {
+                final Thread thread = defaultFactory.newThread(r);
+                thread.setName("Provenance Repository Worker Thread");
+                return thread;
+            }
+        });
     }
     
     
@@ -156,7 +169,8 @@ public class JournalingProvenanceRepository implements ProvenanceEventRepository
     public synchronized void initialize(final EventReporter eventReporter) throws IOException {
         this.eventReporter = eventReporter;
         
-        this.partitionManager = new QueuingPartitionManager(config, executor);
+        this.indexManager = new LuceneIndexManager(config, executor);
+        this.partitionManager = new QueuingPartitionManager(indexManager, config, executor);
         this.queryManager = new StandardQueryManager(partitionManager, config, 10);
     }
 
@@ -312,7 +326,7 @@ public class JournalingProvenanceRepository implements ProvenanceEventRepository
 
     @Override
     public Long getMaxEventId() throws IOException {
-        final Set<Long> maxIds = partitionManager.withEachPartition(new PartitionAction<Long>() {
+        final Set<Long> maxIds = partitionManager.withEachPartitionSerially(new PartitionAction<Long>() {
             @Override
             public Long perform(final Partition partition) throws IOException {
                 return partition.getMaxEventId();
@@ -374,6 +388,10 @@ public class JournalingProvenanceRepository implements ProvenanceEventRepository
             partitionManager.shutdown();
         }
         
+        indexManager.close();
+        
+        // TODO: make sure that all are closed here!
+        
         executor.shutdown();
     }
 
@@ -390,7 +408,7 @@ public class JournalingProvenanceRepository implements ProvenanceEventRepository
     @Override
     public Long getEarliestEventTime() throws IOException {
         // Get the earliest event timestamp for each partition
-        final Set<Long> earliestTimes = partitionManager.withEachPartition(new PartitionAction<Long>() {
+        final Set<Long> earliestTimes = partitionManager.withEachPartitionSerially(new PartitionAction<Long>() {
             @Override
             public Long perform(final Partition partition) throws IOException {
                 return partition.getEarliestEventTime();

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5f557ad/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/config/JournalingRepositoryConfig.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/config/JournalingRepositoryConfig.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/config/JournalingRepositoryConfig.java
index 8998932..18871c7 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/config/JournalingRepositoryConfig.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/config/JournalingRepositoryConfig.java
@@ -35,6 +35,7 @@ public class JournalingRepositoryConfig {
     private long desiredIndexBytes = 1024L * 1024L * 500L; // 500 MB
     private int partitionCount = 16;
     private int blockSize = 5000;
+    private int indexesPerContainer = 2;
 
     private List<SearchableField> searchableFields = new ArrayList<>();
     private List<SearchableField> searchableAttributes = new ArrayList<>();
@@ -51,6 +52,10 @@ public class JournalingRepositoryConfig {
         return readOnly;
     }
 
+    public int getIndexesPerContainer() {
+        return indexesPerContainer;
+    }
+    
     /**
      * Specifies where the repository will store data
      *

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5f557ad/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/EventIndexSearcher.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/EventIndexSearcher.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/EventIndexSearcher.java
index b669c53..753378d 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/EventIndexSearcher.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/EventIndexSearcher.java
@@ -33,4 +33,6 @@ public interface EventIndexSearcher extends Closeable {
     SearchResult search(Query query) throws IOException;
     
     List<JournaledStorageLocation> getEvents(long minEventId, int maxResults) throws IOException;
+    
+    Long getMaxEventId(String container, String section) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5f557ad/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/IndexManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/IndexManager.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/IndexManager.java
new file mode 100644
index 0000000..141b84a
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/IndexManager.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.journaling.index;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+public interface IndexManager extends Closeable {
+
+    /**
+     * Returns an EventIndexWriter for the given container.
+     * @param container
+     * @return
+     */
+    EventIndexWriter getIndexWriter(final String container);
+
+    /**
+     * Returns the max event ID that has been indexed for the given container and section.
+     * 
+     * @param container
+     * @param section
+     * @return
+     */
+    Long getMaxEventId(String container, String section) throws IOException;
+    
+    EventIndexSearcher newIndexSearcher(String containerName) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5f557ad/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexManager.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexManager.java
new file mode 100644
index 0000000..d10fd00
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexManager.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.journaling.index;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.nifi.provenance.journaling.config.JournalingRepositoryConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LuceneIndexManager implements IndexManager {
+    private static final Logger logger = LoggerFactory.getLogger(LuceneIndexManager.class);
+    
+    private final JournalingRepositoryConfig config;
+    private final ScheduledExecutorService executor;
+    
+    private final Map<String, List<LuceneIndexWriter>> writers = new HashMap<>();
+    private final Map<String, AtomicLong> writerIndexes = new HashMap<>();
+    
+    public LuceneIndexManager(final JournalingRepositoryConfig config, final ScheduledExecutorService executor) throws IOException {
+        this.config = config;
+        this.executor = executor;
+        
+        final int rolloverSeconds = (int) config.getJournalRolloverPeriod(TimeUnit.SECONDS);
+        if ( !config.isReadOnly() ) {
+            for ( final Map.Entry<String, File> entry : config.getContainers().entrySet() ) {
+                final String containerName = entry.getKey();
+                final File container = entry.getValue();
+                
+                final List<LuceneIndexWriter> writerList = new ArrayList<>(config.getIndexesPerContainer());
+                writers.put(containerName, writerList);
+                writerIndexes.put(containerName, new AtomicLong(0L));
+                
+                for ( int i=0; i < config.getIndexesPerContainer(); i++ ){
+                    final File indexDir = new File(container, "indices/" + i);
+                    writerList.add(new LuceneIndexWriter(indexDir, config));
+                }
+                
+                executor.scheduleWithFixedDelay(new Runnable() {
+                    @Override
+                    public void run() {
+                        try {
+                            sync(containerName);
+                        } catch (final Throwable t) {
+                            logger.error("Failed to sync Provenance Repository Container {} due to {}", containerName, t);
+                            if ( logger.isDebugEnabled() ) {
+                                logger.error("", t);
+                            }
+                        }
+                    }
+                }, rolloverSeconds, rolloverSeconds, TimeUnit.SECONDS);
+            }
+        }
+    }
+    
+    @Override
+    public EventIndexSearcher newIndexSearcher(final String containerName) throws IOException {
+        final File containerDir = config.getContainers().get(containerName);
+        if ( containerDir == null ) {
+            throw new IllegalArgumentException();
+        }
+        
+        final List<EventIndexSearcher> searchers = new ArrayList<>();
+        
+        try {
+            if (config.isReadOnly()) {
+                for (int i=0; i < config.getIndexesPerContainer(); i++) {
+                    final File indexDir = new File(containerName, "indices/" + i);
+                    searchers.add(new LuceneIndexSearcher(indexDir));
+                }
+            } else {
+                final List<LuceneIndexWriter> writerList = writers.get(containerName);
+                for ( final LuceneIndexWriter writer : writerList ) {
+                    searchers.add(writer.newIndexSearcher());
+                }
+            }
+        } catch (final IOException ioe) {
+            // If we failed to create a searcher, we need to close all that we've already created.
+            for ( final EventIndexSearcher searcher : searchers ) {
+                try {
+                    searcher.close();
+                } catch (final IOException ioe2) {
+                    ioe.addSuppressed(ioe2);
+                }
+            }
+            
+            throw ioe;
+        }
+        
+        return new MultiIndexSearcher(searchers);
+    }
+    
+    @Override
+    public LuceneIndexWriter getIndexWriter(final String container) {
+        if (config.isReadOnly() ) {
+            throw new IllegalStateException("Cannot obtain Index Writer because repository is read-only");
+        }
+        
+        final AtomicLong index = writerIndexes.get(container);
+        if (index == null ) {
+            throw new IllegalArgumentException();
+        }
+        
+        final long curVal = index.get();
+        final List<LuceneIndexWriter> writerList = writers.get(container);
+        return writerList.get((int) (curVal % writerList.size()));
+    }
+
+    @Override
+    public Long getMaxEventId(final String container, final String section) throws IOException {
+        final List<LuceneIndexWriter> writerList = writers.get(container);
+        if ( writerList == null ) {
+            return null;
+        }
+
+        Long max = null;
+        for ( final LuceneIndexWriter writer : writerList ) {
+            try (final EventIndexSearcher searcher = writer.newIndexSearcher()) {
+                final Long maxForWriter = searcher.getMaxEventId(container, section);
+                if ( maxForWriter != null ) {
+                    if (max == null || maxForWriter.longValue() > max.longValue() ) {
+                        max = maxForWriter;
+                    }
+                }
+            }
+        }
+        
+        return max;
+    }
+
+    
+    private void sync(final String containerName) throws IOException {
+        final AtomicLong index = writerIndexes.get(containerName);
+        final long curIndex = index.getAndIncrement();
+        
+        final List<LuceneIndexWriter> writerList = writers.get(containerName);
+        final EventIndexWriter toSync = writerList.get((int) (curIndex % writerList.size()));
+        toSync.sync();
+    }
+
+    @Override
+    public void close() throws IOException {
+        for ( final List<LuceneIndexWriter> writerList : writers.values() ) {
+            for ( final LuceneIndexWriter writer : writerList ) {
+                try {
+                    writer.close();
+                } catch (final IOException ioe) {
+                    logger.warn("Failed to close {} due to {}", writer, ioe);
+                    if ( logger.isDebugEnabled() ) {
+                        logger.warn("", ioe);
+                    }
+                }
+            }
+        }        
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5f557ad/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexSearcher.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexSearcher.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexSearcher.java
index 32dc7c3..a9dd1a5 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexSearcher.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexSearcher.java
@@ -23,6 +23,7 @@ import java.util.List;
 
 import org.apache.lucene.document.Document;
 import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.Term;
 import org.apache.lucene.search.BooleanClause.Occur;
 import org.apache.lucene.search.BooleanQuery;
 import org.apache.lucene.search.IndexSearcher;
@@ -31,6 +32,7 @@ import org.apache.lucene.search.ScoreDoc;
 import org.apache.lucene.search.Sort;
 import org.apache.lucene.search.SortField;
 import org.apache.lucene.search.SortField.Type;
+import org.apache.lucene.search.TermQuery;
 import org.apache.lucene.search.TopDocs;
 import org.apache.lucene.store.FSDirectory;
 import org.apache.nifi.provenance.journaling.JournaledStorageLocation;
@@ -110,4 +112,25 @@ public class LuceneIndexSearcher implements EventIndexSearcher {
         return getLocations(topDocs);
     }
 
+    @Override
+    public Long getMaxEventId(final String container, final String section) throws IOException {
+        final BooleanQuery query = new BooleanQuery();
+        
+        if ( container != null ) {
+            query.add(new TermQuery(new Term(IndexedFieldNames.CONTAINER_NAME, container)), Occur.MUST);
+        }
+        
+        if ( section != null ) {
+            query.add(new TermQuery(new Term(IndexedFieldNames.SECTION_NAME, section)), Occur.MUST);
+        }
+        
+        final TopDocs topDocs = searcher.search(query, 1, new Sort(new SortField(IndexedFieldNames.EVENT_ID, Type.LONG, true)));
+        final List<JournaledStorageLocation> locations = getLocations(topDocs);
+        if ( locations.isEmpty() ) {
+            return null;
+        }
+        
+        return locations.get(0).getEventId();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5f557ad/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexWriter.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexWriter.java
index e955ae5..b61ad34 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexWriter.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/LuceneIndexWriter.java
@@ -18,9 +18,11 @@ package org.apache.nifi.provenance.journaling.index;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
@@ -117,6 +119,7 @@ public class LuceneIndexWriter implements EventIndexWriter {
     public void index(final Collection<JournaledProvenanceEvent> events) throws IOException {
         long maxId = this.indexMaxId.get();
         
+        final List<Document> documents = new ArrayList<>(events.size());
         for ( final JournaledProvenanceEvent event : events ) {
             maxId = event.getEventId();
 
@@ -189,8 +192,10 @@ public class LuceneIndexWriter implements EventIndexWriter {
                 }
             }
 
-            indexWriter.addDocument(doc);
+            documents.add(doc);
         }
+        
+        indexWriter.addDocuments(documents);
 
         // Update the index's max id
         boolean updated = false;

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5f557ad/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/MultiIndexSearcher.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/MultiIndexSearcher.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/MultiIndexSearcher.java
new file mode 100644
index 0000000..d086ff5
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/index/MultiIndexSearcher.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.journaling.index;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.nifi.provenance.journaling.JournaledStorageLocation;
+import org.apache.nifi.provenance.search.Query;
+
+public class MultiIndexSearcher implements EventIndexSearcher {
+    private final List<EventIndexSearcher> searchers;
+    
+    public MultiIndexSearcher(final List<EventIndexSearcher> searchers) {
+        this.searchers = searchers;
+    }
+
+    @Override
+    public void close() throws IOException {
+        IOException suppressed = null;
+        
+        for ( final EventIndexSearcher searcher : searchers ) {
+            try {
+                searcher.close();
+            } catch (final IOException ioe) {
+                if ( suppressed == null ) {
+                    suppressed = ioe;
+                } else {
+                    suppressed.addSuppressed(ioe);
+                }
+            }
+        }
+        
+        if ( suppressed != null ) {
+            throw suppressed;
+        }
+    }
+
+    @Override
+    public SearchResult search(final Query query) throws IOException {
+        int totalHitCount = 0;
+        final List<JournaledStorageLocation> locations = new ArrayList<>();
+        
+        for ( final EventIndexSearcher searcher : searchers ) {
+            final SearchResult result = searcher.search(query);
+            totalHitCount += result.getTotalCount();
+            locations.addAll(result.getLocations());
+        }
+        
+        return new SearchResult(locations, totalHitCount);
+    }
+
+    @Override
+    public List<JournaledStorageLocation> getEvents(final long minEventId, final int maxResults) throws IOException {
+        final List<JournaledStorageLocation> locations = new ArrayList<>();
+        int results = 0;
+        
+        // Perform search against all searchers and aggregate results.
+        for ( final EventIndexSearcher searcher : searchers ) {
+            final List<JournaledStorageLocation> searchLocations = searcher.getEvents(minEventId, maxResults);
+            locations.addAll(searchLocations);
+            if ( !searchLocations.isEmpty() ) {
+                results++;
+            }
+        }
+        
+        // Results from this call are sorted. If we have only 0 or 1 searchers that had results, then
+        // we don't need to sort anything. Otherwise, we need to sort and return just the first X
+        // number of results.
+        if ( results > 1 ) {
+            Collections.sort(locations);
+        }
+        
+        if ( locations.size() > maxResults ) {
+            return locations.subList(0, maxResults);
+        }
+        
+        return locations;
+    }
+
+    @Override
+    public Long getMaxEventId(final String container, final String section) throws IOException {
+        Long max = null;
+        for ( final EventIndexSearcher searcher : searchers ) {
+            final Long maxForWriter = searcher.getMaxEventId(container, section);
+            if ( maxForWriter != null ) {
+                if (max == null || maxForWriter.longValue() > max.longValue() ) {
+                    max = maxForWriter;
+                }
+            }
+        }
+        
+        return max;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5f557ad/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalWriter.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalWriter.java
index 5a289fe..af5f8de 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalWriter.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalWriter.java
@@ -16,8 +16,6 @@
  */
 package org.apache.nifi.provenance.journaling.journals;
 
-import java.io.BufferedOutputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileOutputStream;
@@ -29,6 +27,8 @@ import java.util.concurrent.TimeUnit;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.journaling.io.Serializer;
 import org.apache.nifi.remote.io.CompressionOutputStream;
+import org.apache.nifi.stream.io.BufferedOutputStream;
+import org.apache.nifi.stream.io.ByteArrayOutputStream;
 import org.apache.nifi.stream.io.ByteCountingOutputStream;
 
 
@@ -96,6 +96,9 @@ public class StandardJournalWriter implements JournalWriter {
     private OutputStream compressedStream;
     private ByteCountingOutputStream out;
     
+    private long recordBytes = 256L;
+    private long recordCount = 1L;
+    
     
     public StandardJournalWriter(final long journalId, final File journalFile, final boolean compressed, final Serializer serializer) throws IOException {
         this.journalId = journalId;
@@ -132,16 +135,38 @@ public class StandardJournalWriter implements JournalWriter {
     @Override
     public void close() throws IOException {
         finishBlock();
-        
-        if ( compressedStream != null ) {
+
+        IOException suppressed = null;
+        try {
             compressedStream.flush();
             compressedStream.close();
+        } catch (final IOException ioe) {
+            suppressed = ioe;
+        }
+        
+        try {
+            try {
+                uncompressedStream.flush();
+            } finally {
+                uncompressedStream.close();
+            }
+        } catch (final IOException ioe) {
+            if ( suppressed != null ) {
+                ioe.addSuppressed(suppressed);
+            }
+            throw ioe;
+        }
+        
+        if ( suppressed != null ) {
+            throw suppressed;
         }
     }
 
     @Override
     public void write(final Collection<ProvenanceEventRecord> events, final long firstEventId) throws IOException {
-        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        final int avgRecordSize = (int) (recordBytes / recordCount);
+        
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream(avgRecordSize);
         final DataOutputStream serializerDos = new DataOutputStream(baos);
         
         final BufferedOutputStream bos = new BufferedOutputStream(out);
@@ -153,10 +178,13 @@ public class StandardJournalWriter implements JournalWriter {
                 serializer.serialize(event, serializerDos);
                 serializerDos.flush();
                 
-                final int recordLength = 8 + baos.size();   // record length is length of ID (8 bytes) plus length of serialized record
+                final int serializedLength = baos.size();
+                final int recordLength = 8 + serializedLength;   // record length is length of ID (8 bytes) plus length of serialized record
                 outDos.writeInt(recordLength);
                 outDos.writeLong(id++);
                 baos.writeTo(outDos);
+                recordBytes += recordLength;
+                recordCount++;
                 baos.reset();
                 
                 eventCount++;

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5f557ad/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/JournalingPartition.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/JournalingPartition.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/JournalingPartition.java
index 651c41e..1ace37f 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/JournalingPartition.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/JournalingPartition.java
@@ -35,8 +35,11 @@ import org.apache.nifi.provenance.journaling.JournaledProvenanceEvent;
 import org.apache.nifi.provenance.journaling.JournaledStorageLocation;
 import org.apache.nifi.provenance.journaling.config.JournalingRepositoryConfig;
 import org.apache.nifi.provenance.journaling.index.EventIndexSearcher;
+import org.apache.nifi.provenance.journaling.index.EventIndexWriter;
+import org.apache.nifi.provenance.journaling.index.IndexManager;
 import org.apache.nifi.provenance.journaling.index.LuceneIndexSearcher;
 import org.apache.nifi.provenance.journaling.index.LuceneIndexWriter;
+import org.apache.nifi.provenance.journaling.index.MultiIndexSearcher;
 import org.apache.nifi.provenance.journaling.index.QueryUtils;
 import org.apache.nifi.provenance.journaling.io.StandardEventSerializer;
 import org.apache.nifi.provenance.journaling.journals.JournalReader;
@@ -55,13 +58,12 @@ public class JournalingPartition implements Partition {
     private static final String JOURNAL_FILE_EXTENSION = ".journal";
     
     private final String containerName;
-    private final String sectionName;
+    private final int sectionIndex;
     
     private final File section;
     private final File journalsDir;
     private final JournalingRepositoryConfig config;
     private final ExecutorService executor;
-    private final LuceneIndexWriter indexWriter;
     
     private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
     private final Lock readLock = rwLock.readLock();
@@ -73,9 +75,12 @@ public class JournalingPartition implements Partition {
     private volatile long maxEventId = -1L;
     private volatile Long earliestEventTime = null;
     
-    public JournalingPartition(final String containerName, final String sectionName, final File sectionDir, final JournalingRepositoryConfig config, final ExecutorService executor) throws IOException {
+    private final IndexManager indexManager;
+    
+    public JournalingPartition(final IndexManager indexManager, final String containerName, final int sectionIndex, final File sectionDir, final JournalingRepositoryConfig config, final ExecutorService executor) throws IOException {
+        this.indexManager = indexManager;
         this.containerName = containerName;
-        this.sectionName = sectionName;
+        this.sectionIndex = sectionIndex;
         this.section = sectionDir;
         this.journalsDir = new File(section, "journals");
         this.config = config;
@@ -88,22 +93,11 @@ public class JournalingPartition implements Partition {
         if ( journalsDir.exists() && journalsDir.isFile() ) {
             throw new IOException("Could not create directory " + section + " because a file already exists with this name");
         }
-        
-        if ( config.isReadOnly() ) {
-            indexWriter = null;
-        } else {
-            final File indexDir = new File(section, "index");
-            indexWriter = new LuceneIndexWriter(indexDir, config);
-        }
     }
     
     
     public EventIndexSearcher newIndexSearcher() throws IOException {
-        if (config.isReadOnly()) {
-            return new LuceneIndexSearcher(new File(section, "index"));
-        }
-        
-        return indexWriter.newIndexSearcher();
+        return indexManager.newIndexSearcher(containerName);
     }
     
     protected JournalWriter getJournalWriter(final long firstEventId) throws IOException {
@@ -118,6 +112,11 @@ public class JournalingPartition implements Partition {
         return journalWriter;
     }
     
+    // MUST be called with writeLock or readLock held.
+    private EventIndexWriter getIndexWriter() {
+        return indexManager.getIndexWriter(containerName);
+    }
+    
     @Override
     public List<JournaledProvenanceEvent> registerEvents(final Collection<ProvenanceEventRecord> events, final long firstEventId) throws IOException {
         writeLock.lock();
@@ -139,12 +138,13 @@ public class JournalingPartition implements Partition {
             final List<JournaledProvenanceEvent> storedEvents = new ArrayList<>(events.size());
             long id = firstEventId;
             for (final ProvenanceEventRecord event : events) {
-                final JournaledStorageLocation location = new JournaledStorageLocation(containerName, sectionName, 
+                final JournaledStorageLocation location = new JournaledStorageLocation(containerName, String.valueOf(sectionIndex), 
                         String.valueOf(writer.getJournalId()), tocWriter.getCurrentBlockIndex(), id++);
                 final JournaledProvenanceEvent storedEvent = new JournaledProvenanceEvent(event, location);
                 storedEvents.add(storedEvent);
             }
             
+            final EventIndexWriter indexWriter = getIndexWriter();
             indexWriter.index(storedEvents);
             
             if ( config.isAlwaysSync() ) {
@@ -196,13 +196,28 @@ public class JournalingPartition implements Partition {
     
     // MUST be called with write lock held.
     private void rollover(final long firstEventId) throws IOException {
+        // TODO: Rework how rollover works because we now have index manager!!
+        
         // if we have a writer already, close it and initiate rollover actions
         if ( journalWriter != null ) {
             journalWriter.finishBlock();
             journalWriter.close();
             tocWriter.close();
-            indexWriter.sync();
-        
+
+            final EventIndexWriter curWriter = getIndexWriter();
+            executor.submit(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        curWriter.sync();
+                    } catch (final IOException e) {
+                        
+                        // TODO Auto-generated catch block
+                        e.printStackTrace();
+                    }
+                }
+            });
+            
             if ( config.isCompressOnRollover() ) {
                 final File finishedFile = journalWriter.getJournalFile();
                 final File finishedTocFile = tocWriter.getFile();
@@ -213,7 +228,7 @@ public class JournalingPartition implements Partition {
         // create new writers and reset state.
         final File journalFile = new File(journalsDir, firstEventId + JOURNAL_FILE_EXTENSION);
         journalWriter = new StandardJournalWriter(firstEventId, journalFile, false, new StandardEventSerializer());
-        tocWriter = new StandardTocWriter(QueryUtils.getTocFile(journalFile), false);
+        tocWriter = new StandardTocWriter(QueryUtils.getTocFile(journalFile), false, config.isAlwaysSync());
         tocWriter.addBlockOffset(journalWriter.getSize());
         numEventsAtEndOfLastBlock = 0;
     }
@@ -237,112 +252,123 @@ public class JournalingPartition implements Partition {
     
     @Override
     public void restore() throws IOException {
-        // delete or rename files if stopped during rollover; compress any files that haven't been compressed
-        if ( !config.isReadOnly() ) {
-            final File[] children = journalsDir.listFiles();
-            if ( children != null ) {
-                // find the latest journal.
-                File latestJournal = null;
-                long latestJournalId = -1L;
-                
-                final List<File> journalFiles = new ArrayList<>();
-                
-                // find any journal files that either haven't been compressed or were partially compressed when
-                // we last shutdown and then restart compression.
-                for ( final File file : children ) {
-                    final String filename = file.getName();
-                    if ( !filename.contains(JOURNAL_FILE_EXTENSION) ) {
-                        continue;
-                    }
+        writeLock.lock();
+        try {
+            // delete or rename files if stopped during rollover; compress any files that haven't been compressed
+            if ( !config.isReadOnly() ) {
+                final File[] children = journalsDir.listFiles();
+                if ( children != null ) {
+                    // find the latest journal.
+                    File latestJournal = null;
+                    long latestJournalId = -1L;
+                    
+                    final List<File> journalFiles = new ArrayList<>();
                     
-                    final Long journalId = getJournalId(file);
-                    if ( journalId != null && journalId > latestJournalId ) {
-                        latestJournal = file;
-                        latestJournalId = journalId;
+                    // find any journal files that either haven't been compressed or were partially compressed when
+                    // we last shutdown and then restart compression.
+                    for ( final File file : children ) {
+                        final String filename = file.getName();
+                        if ( !filename.contains(JOURNAL_FILE_EXTENSION) ) {
+                            continue;
+                        }
+                        
+                        final Long journalId = getJournalId(file);
+                        if ( journalId != null && journalId > latestJournalId ) {
+                            latestJournal = file;
+                            latestJournalId = journalId;
+                        }
+                        
+                        journalFiles.add(file);
+                        
+                        if ( !config.isCompressOnRollover() ) {
+                            continue;
+                        }
+                        
+                        if ( filename.endsWith(CompressionTask.FILE_EXTENSION) ) {
+                            final File uncompressedFile = new File(journalsDir, filename.replace(CompressionTask.FILE_EXTENSION, ""));
+                            if ( uncompressedFile.exists() ) {
+                                // both the compressed and uncompressed version of this journal exist. The Compression Task was
+                                // not complete when we shutdown. Delete the compressed journal and toc and re-start the Compression Task.
+                                final File tocFile = QueryUtils.getTocFile(uncompressedFile);
+                                executor.submit(new CompressionTask(uncompressedFile, getJournalId(uncompressedFile), tocFile));
+                            } else {
+                                // The compressed file exists but the uncompressed file does not. This means that we have finished
+                                // writing the compressed file and deleted the original journal file but then shutdown before
+                                // renaming the compressed file to the original filename. We can simply rename the compressed file
+                                // to the original file and then address the TOC file.
+                                final boolean rename = CompressionTask.rename(file, uncompressedFile);
+                                if ( !rename ) {
+                                    logger.warn("{} During recovery, failed to rename {} to {}", this, file, uncompressedFile);
+                                    continue;
+                                }
+                                
+                                // Check if the compressed TOC file exists. If not, we are finished.
+                                // If it does exist, then we know that it is complete, as described above, so we will go
+                                // ahead and replace the uncompressed version.
+                                final File tocFile = QueryUtils.getTocFile(uncompressedFile);
+                                final File compressedTocFile = new File(tocFile.getParentFile(), tocFile.getName() + CompressionTask.FILE_EXTENSION);
+                                if ( !compressedTocFile.exists() ) {
+                                    continue;
+                                }
+                                
+                                tocFile.delete();
+                                
+                                final boolean renamedTocFile = CompressionTask.rename(compressedTocFile, tocFile);
+                                if ( !renamedTocFile ) {
+                                    logger.warn("{} During recovery, failed to rename {} to {}", this, compressedTocFile, tocFile);
+                                }
+                            }
+                        }
                     }
                     
-                    journalFiles.add(file);
+                    // Get the first event in the earliest journal file so that we know what the earliest time available is
+                    Collections.sort(journalFiles, new Comparator<File>() {
+                        @Override
+                        public int compare(final File o1, final File o2) {
+                            return Long.compare(getJournalId(o1), getJournalId(o2));
+                        }
+                    });
                     
-                    if ( !config.isCompressOnRollover() ) {
-                        continue;
+                    for ( final File journal : journalFiles ) {
+                        try (final JournalReader reader = new StandardJournalReader(journal)) {
+                            final ProvenanceEventRecord record = reader.nextEvent();
+                            this.earliestEventTime = record.getEventTime();
+                            break;
+                        } catch (final IOException ioe) {
+                        }
                     }
                     
-                    if ( filename.endsWith(CompressionTask.FILE_EXTENSION) ) {
-                        final File uncompressedFile = new File(journalsDir, filename.replace(CompressionTask.FILE_EXTENSION, ""));
-                        if ( uncompressedFile.exists() ) {
-                            // both the compressed and uncompressed version of this journal exist. The Compression Task was
-                            // not complete when we shutdown. Delete the compressed journal and toc and re-start the Compression Task.
-                            final File tocFile = QueryUtils.getTocFile(uncompressedFile);
-                            executor.submit(new CompressionTask(uncompressedFile, getJournalId(uncompressedFile), tocFile));
-                        } else {
-                            // The compressed file exists but the uncompressed file does not. This means that we have finished
-                            // writing the compressed file and deleted the original journal file but then shutdown before
-                            // renaming the compressed file to the original filename. We can simply rename the compressed file
-                            // to the original file and then address the TOC file.
-                            final boolean rename = CompressionTask.rename(file, uncompressedFile);
-                            if ( !rename ) {
-                                logger.warn("{} During recovery, failed to rename {} to {}", this, file, uncompressedFile);
-                                continue;
-                            }
-                            
-                            // Check if the compressed TOC file exists. If not, we are finished.
-                            // If it does exist, then we know that it is complete, as described above, so we will go
-                            // ahead and replace the uncompressed version.
-                            final File tocFile = QueryUtils.getTocFile(uncompressedFile);
-                            final File compressedTocFile = new File(tocFile.getParentFile(), tocFile.getName() + CompressionTask.FILE_EXTENSION);
-                            if ( !compressedTocFile.exists() ) {
-                                continue;
-                            }
-                            
-                            tocFile.delete();
-                            
-                            final boolean renamedTocFile = CompressionTask.rename(compressedTocFile, tocFile);
-                            if ( !renamedTocFile ) {
-                                logger.warn("{} During recovery, failed to rename {} to {}", this, compressedTocFile, tocFile);
-                            }
+                    // Whatever was the last journal for this partition, we need to remove anything for that journal
+                    // from the index and re-add them, and then sync the index. This allows us to avoid syncing
+                    // the index each time (we sync only on rollover) but allows us to still ensure that we index
+                    // all events.
+                    if ( latestJournal != null ) {
+                        try {
+                            reindex(latestJournal);
+                        } catch (final EOFException eof) {
                         }
                     }
                 }
-                
-                // Get the first event in the earliest journal file so that we know what the earliest time available is
-                Collections.sort(journalFiles, new Comparator<File>() {
-                    @Override
-                    public int compare(final File o1, final File o2) {
-                        return Long.compare(getJournalId(o1), getJournalId(o2));
-                    }
-                });
-                
-                for ( final File journal : journalFiles ) {
-                    try (final JournalReader reader = new StandardJournalReader(journal)) {
-                        final ProvenanceEventRecord record = reader.nextEvent();
-                        this.earliestEventTime = record.getEventTime();
-                        break;
-                    } catch (final IOException ioe) {
-                    }
-                }
-                
-                // Whatever was the last journal for this partition, we need to remove anything for that journal
-                // from the index and re-add them, and then sync the index. This allows us to avoid syncing
-                // the index each time (we sync only on rollover) but allows us to still ensure that we index
-                // all events.
-                if ( latestJournal != null ) {
-                    try {
-                        reindex(latestJournal);
-                    } catch (final EOFException eof) {
-                    }
-                }
             }
+        } finally {
+            writeLock.unlock();
         }
     }
 
     
     private void reindex(final File journalFile) throws IOException {
-        try (final TocJournalReader reader = new TocJournalReader(containerName, sectionName, String.valueOf(getJournalId(journalFile)), journalFile)) {
-            indexWriter.delete(containerName, sectionName, String.valueOf(getJournalId(journalFile)));
+        // TODO: Rework how recovery works because we now have index manager!!
+        try (final TocJournalReader reader = new TocJournalReader(containerName, String.valueOf(sectionIndex), String.valueOf(getJournalId(journalFile)), journalFile)) {
+            // We don't know which index contains the data for this journal, so remove the journal
+            // from both.
+            for (final LuceneIndexWriter indexWriter : indexWriters ) {
+                indexWriter.delete(containerName, String.valueOf(sectionIndex), String.valueOf(getJournalId(journalFile)));
+            }
             
             long maxId = -1L;
             final List<JournaledProvenanceEvent> storedEvents = new ArrayList<>(1000);
             JournaledProvenanceEvent event;
+            final LuceneIndexWriter indexWriter = indexWriters[0];
             while ((event = reader.nextJournaledEvent()) != null ) {
                 storedEvents.add(event);
                 maxId = event.getEventId();
@@ -365,7 +391,7 @@ public class JournalingPartition implements Partition {
     
     @Override
     public List<JournaledStorageLocation> getEvents(final long minEventId, final int maxRecords) throws IOException {
-        try (final EventIndexSearcher searcher = indexWriter.newIndexSearcher()) {
+        try (final EventIndexSearcher searcher = newIndexSearcher()) {
             return searcher.getEvents(minEventId, maxRecords);
         }
     }
@@ -401,16 +427,6 @@ public class JournalingPartition implements Partition {
             }
         }
         
-        if ( indexWriter != null ) {
-            try {
-                indexWriter.close();
-            } catch (final IOException ioe) {
-                logger.warn("Failed to close {} due to {}", indexWriter, ioe);
-                if ( logger.isDebugEnabled() ) {
-                    logger.warn("", ioe);
-                }
-            }
-        }
     }
     
     @Override
@@ -425,6 +441,6 @@ public class JournalingPartition implements Partition {
     
     @Override
     public String toString() {
-        return "Partition[section=" + sectionName + "]";
+        return "Partition[section=" + sectionIndex + "]";
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5f557ad/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/PartitionManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/PartitionManager.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/PartitionManager.java
index edbf75b..c0a56c4 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/PartitionManager.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/PartitionManager.java
@@ -55,6 +55,19 @@ public interface PartitionManager {
     <T> Set<T> withEachPartition(PartitionAction<T> action) throws IOException;
     
     /**
+     * Performs the given Action on each partition and returns the set of results. Unlike
+     * {@link #withEachPartition(PartitionAction))}, this method does not use the thread pool
+     * in order to perform the request in parallel. This is desirable for very quick functions,
+     * as the thread pool can be fully utilized, resulting in a quick function taking far longer
+     * than it should.
+     * 
+     * @param action the action to perform
+     * @param writeAction specifies whether or not the action writes to the repository
+     * @return
+     */
+    <T> Set<T> withEachPartitionSerially(PartitionAction<T> action) throws IOException;
+    
+    /**
      * Performs the given Action to each partition, optionally waiting for the action to complete
      * @param action
      * @param writeAction

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5f557ad/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/QueuingPartitionManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/QueuingPartitionManager.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/QueuingPartitionManager.java
index 51d90e2..10af697 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/QueuingPartitionManager.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/QueuingPartitionManager.java
@@ -34,6 +34,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.nifi.provenance.journaling.config.JournalingRepositoryConfig;
+import org.apache.nifi.provenance.journaling.index.IndexManager;
 import org.apache.nifi.util.Tuple;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,7 +51,7 @@ public class QueuingPartitionManager implements PartitionManager {
     
     private final AtomicInteger blacklistedCount = new AtomicInteger(0);
     
-    public QueuingPartitionManager(final JournalingRepositoryConfig config, final ExecutorService executor) throws IOException {
+    public QueuingPartitionManager(final IndexManager indexManager, final JournalingRepositoryConfig config, final ExecutorService executor) throws IOException {
         this.config = config;
         this.partitionQueue = new LinkedBlockingQueue<>(config.getPartitionCount());
         this.partitionArray = new JournalingPartition[config.getPartitionCount()];
@@ -64,7 +65,7 @@ public class QueuingPartitionManager implements PartitionManager {
             final Tuple<String, File> tuple = containerTuples.get(i % containerTuples.size());
             final File section = new File(tuple.getValue(), String.valueOf(i));
             
-            final JournalingPartition partition = new JournalingPartition(tuple.getKey(), String.valueOf(i), section, config, executor);
+            final JournalingPartition partition = new JournalingPartition(indexManager, tuple.getKey(), i, section, config, executor);
             partitionQueue.offer(partition);
             partitionArray[i] = partition;
         }
@@ -183,6 +184,17 @@ public class QueuingPartitionManager implements PartitionManager {
     }
     
     @Override
+    public <T> Set<T> withEachPartitionSerially(final PartitionAction<T> action) throws IOException {
+        // TODO: Do not use blacklisted partitions.
+        final Set<T> results = new HashSet<>(partitionArray.length);
+        for ( final Partition partition : partitionArray ) {
+            results.add( action.perform(partition) );
+        }
+        
+        return results;
+    }
+    
+    @Override
     public void withEachPartition(final VoidPartitionAction action, final boolean async) {
         // TODO: Do not use blacklisted partitions.
         final Map<Partition, Future<?>> futures = new HashMap<>(partitionArray.length);

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5f557ad/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/tasks/CompressionTask.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/tasks/CompressionTask.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/tasks/CompressionTask.java
index c23a405..a6a487b 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/tasks/CompressionTask.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/tasks/CompressionTask.java
@@ -133,7 +133,7 @@ public class CompressionTask implements Runnable {
             try (final JournalReader journalReader = new StandardJournalReader(journalFile);
                 final JournalWriter compressedWriter = new StandardJournalWriter(journalId, compressedFile, true, new StandardEventSerializer());
                 final TocReader tocReader = new StandardTocReader(tocFile);
-                final TocWriter compressedTocWriter = new StandardTocWriter(compressedTocFile, true)) {
+                final TocWriter compressedTocWriter = new StandardTocWriter(compressedTocFile, true, false)) {
                 
                 compress(journalReader, compressedWriter, tocReader, compressedTocWriter);
                 compressedWriter.sync();

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a5f557ad/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/StandardTocWriter.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/StandardTocWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/StandardTocWriter.java
index 6058282..fea6057 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/StandardTocWriter.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/toc/StandardTocWriter.java
@@ -40,6 +40,7 @@ public class StandardTocWriter implements TocWriter {
     
     private final File file;
     private final FileOutputStream fos;
+    private final boolean alwaysSync;
     private int index = 0;
     
     /**
@@ -48,7 +49,7 @@ public class StandardTocWriter implements TocWriter {
      * @param compressionFlag whether or not the journal is compressed
      * @throws FileNotFoundException 
      */
-    public StandardTocWriter(final File file, final boolean compressionFlag) throws IOException {
+    public StandardTocWriter(final File file, final boolean compressionFlag, final boolean alwaysSync) throws IOException {
         if ( file.exists() ) {
             throw new FileAlreadyExistsException(file.getAbsolutePath());
         }
@@ -59,11 +60,17 @@ public class StandardTocWriter implements TocWriter {
         
         this.file = file;
         fos = new FileOutputStream(file);
-        
-        fos.write(VERSION);
-        fos.write(compressionFlag ? 1 : 0);
+        this.alwaysSync = alwaysSync;
+
+        final byte[] header = new byte[2];
+        header[0] = VERSION;
+        header[1] = (byte) (compressionFlag ? 1 : 0);
+        fos.write(header);
         fos.flush();
-        fos.getFD().sync();
+        
+        if ( alwaysSync ) {
+            fos.getFD().sync();
+        }
     }
     
     @Override
@@ -73,7 +80,9 @@ public class StandardTocWriter implements TocWriter {
         dos.writeLong(offset);
         dos.flush();
         
-        fos.getFD().sync();
+        if ( alwaysSync ) {
+            fos.getFD().sync();
+        }
     }
     
     @Override
@@ -83,6 +92,10 @@ public class StandardTocWriter implements TocWriter {
 
     @Override
     public void close() throws IOException {
+        if (alwaysSync) {
+            fos.getFD().sync();
+        }
+        
         fos.close();
     }