You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/02/27 21:47:36 UTC

[09/12] incubator-nifi git commit: NIFI-388: Initial implementation of prov repo; not yet finished but pushing so that the code is not lost

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f23f36d7/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/JournalingPartition.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/JournalingPartition.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/JournalingPartition.java
index 1ace37f..bba6899 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/JournalingPartition.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/JournalingPartition.java
@@ -23,12 +23,16 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.journaling.JournaledProvenanceEvent;
@@ -37,9 +41,6 @@ import org.apache.nifi.provenance.journaling.config.JournalingRepositoryConfig;
 import org.apache.nifi.provenance.journaling.index.EventIndexSearcher;
 import org.apache.nifi.provenance.journaling.index.EventIndexWriter;
 import org.apache.nifi.provenance.journaling.index.IndexManager;
-import org.apache.nifi.provenance.journaling.index.LuceneIndexSearcher;
-import org.apache.nifi.provenance.journaling.index.LuceneIndexWriter;
-import org.apache.nifi.provenance.journaling.index.MultiIndexSearcher;
 import org.apache.nifi.provenance.journaling.index.QueryUtils;
 import org.apache.nifi.provenance.journaling.io.StandardEventSerializer;
 import org.apache.nifi.provenance.journaling.journals.JournalReader;
@@ -47,8 +48,9 @@ import org.apache.nifi.provenance.journaling.journals.JournalWriter;
 import org.apache.nifi.provenance.journaling.journals.StandardJournalReader;
 import org.apache.nifi.provenance.journaling.journals.StandardJournalWriter;
 import org.apache.nifi.provenance.journaling.tasks.CompressionTask;
+import org.apache.nifi.provenance.journaling.toc.StandardTocReader;
 import org.apache.nifi.provenance.journaling.toc.StandardTocWriter;
-import org.apache.nifi.provenance.journaling.toc.TocJournalReader;
+import org.apache.nifi.provenance.journaling.toc.TocReader;
 import org.apache.nifi.provenance.journaling.toc.TocWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -62,29 +64,32 @@ public class JournalingPartition implements Partition {
     
     private final File section;
     private final File journalsDir;
-    private final JournalingRepositoryConfig config;
+    private final IndexManager indexManager;
+    private final AtomicLong containerSize;
     private final ExecutorService executor;
-    
-    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
-    private final Lock readLock = rwLock.readLock();
-    private final Lock writeLock = rwLock.writeLock();
+    private final JournalingRepositoryConfig config;
     
     private JournalWriter journalWriter;
     private TocWriter tocWriter;
     private int numEventsAtEndOfLastBlock = 0;
     private volatile long maxEventId = -1L;
     private volatile Long earliestEventTime = null;
+
+    private final Lock lock = new ReentrantLock();
+    private boolean writable = true;    // guarded by lock
+    private final List<File> timeOrderedJournalFiles = Collections.synchronizedList(new ArrayList<File>());
+    private final AtomicLong partitionSize = new AtomicLong(0L);
     
-    private final IndexManager indexManager;
-    
-    public JournalingPartition(final IndexManager indexManager, final String containerName, final int sectionIndex, final File sectionDir, final JournalingRepositoryConfig config, final ExecutorService executor) throws IOException {
+    public JournalingPartition(final IndexManager indexManager, final String containerName, final int sectionIndex, final File sectionDir, 
+            final JournalingRepositoryConfig config, final AtomicLong containerSize, final ExecutorService compressionExecutor) throws IOException {
         this.indexManager = indexManager;
+        this.containerSize = containerSize;
         this.containerName = containerName;
         this.sectionIndex = sectionIndex;
         this.section = sectionDir;
         this.journalsDir = new File(section, "journals");
         this.config = config;
-        this.executor = executor;
+        this.executor = compressionExecutor;
         
         if (!journalsDir.exists() && !journalsDir.mkdirs()) {
             throw new IOException("Could not create directory " + section);
@@ -119,18 +124,24 @@ public class JournalingPartition implements Partition {
     
     @Override
     public List<JournaledProvenanceEvent> registerEvents(final Collection<ProvenanceEventRecord> events, final long firstEventId) throws IOException {
-        writeLock.lock();
+        if ( events.isEmpty() ) {
+            return Collections.emptyList();
+        }
+        
+        lock.lock();
         try {
+            if ( !writable ) {
+                throw new IOException("Cannot write to partition " + this + " because there was previously a write failure. The partition will fix itself in time if I/O problems are resolved");
+            }
+            
             final JournalWriter writer = getJournalWriter(firstEventId);
     
-            if ( !events.isEmpty() ) {
-                final int eventsWritten = writer.getEventCount();
-                if ( eventsWritten - numEventsAtEndOfLastBlock > config.getBlockSize() ) {
-                    writer.finishBlock();
-                    tocWriter.addBlockOffset(writer.getSize());
-                    numEventsAtEndOfLastBlock = eventsWritten;
-                    writer.beginNewBlock();
-                }
+            final int eventsWritten = writer.getEventCount();
+            if ( eventsWritten - numEventsAtEndOfLastBlock > config.getBlockSize() ) {
+                writer.finishBlock();
+                tocWriter.addBlockOffset(writer.getSize());
+                numEventsAtEndOfLastBlock = eventsWritten;
+                writer.beginNewBlock();
             }
     
             writer.write(events, firstEventId);
@@ -139,7 +150,7 @@ public class JournalingPartition implements Partition {
             long id = firstEventId;
             for (final ProvenanceEventRecord event : events) {
                 final JournaledStorageLocation location = new JournaledStorageLocation(containerName, String.valueOf(sectionIndex), 
-                        String.valueOf(writer.getJournalId()), tocWriter.getCurrentBlockIndex(), id++);
+                        writer.getJournalId(), tocWriter.getCurrentBlockIndex(), id++);
                 final JournaledProvenanceEvent storedEvent = new JournaledProvenanceEvent(event, location);
                 storedEvents.add(storedEvent);
             }
@@ -169,8 +180,11 @@ public class JournalingPartition implements Partition {
             }
             
             return storedEvents;
+        } catch (final IOException ioe) {
+            writable = false;
+            throw ioe;
         } finally {
-            writeLock.unlock();
+            lock.unlock();
         }
     }
 
@@ -194,43 +208,66 @@ public class JournalingPartition implements Partition {
         return false;
     }
     
+    private void updateSize(final long delta) {
+        partitionSize.addAndGet(delta);
+        containerSize.addAndGet(delta);
+    }
+    
     // MUST be called with write lock held.
+    /**
+     * Rolls over the current journal (if any) and begins writing top a new journal.
+     * 
+     * <p>
+     * <b>NOTE:</b> This method MUST be called with the write lock held!!
+     * </p>
+     * 
+     * @param firstEventId the ID of the first event to add to this journal
+     * @throws IOException
+     */
     private void rollover(final long firstEventId) throws IOException {
-        // TODO: Rework how rollover works because we now have index manager!!
-        
         // if we have a writer already, close it and initiate rollover actions
+        final File finishedFile = journalWriter == null ? null : journalWriter.getJournalFile();
         if ( journalWriter != null ) {
             journalWriter.finishBlock();
             journalWriter.close();
             tocWriter.close();
 
-            final EventIndexWriter curWriter = getIndexWriter();
+            final File finishedTocFile = tocWriter.getFile();
+            updateSize(finishedFile.length());
+            
             executor.submit(new Runnable() {
                 @Override
                 public void run() {
-                    try {
-                        curWriter.sync();
-                    } catch (final IOException e) {
-                        
-                        // TODO Auto-generated catch block
-                        e.printStackTrace();
+                    if ( config.isCompressOnRollover() ) {
+                        final long originalSize = finishedFile.length();
+                        final long compressedFileSize = new CompressionTask(finishedFile, journalWriter.getJournalId(), finishedTocFile).call();
+                        final long sizeAdded = compressedFileSize - originalSize;
+                        updateSize(sizeAdded);
                     }
                 }
             });
             
-            if ( config.isCompressOnRollover() ) {
-                final File finishedFile = journalWriter.getJournalFile();
-                final File finishedTocFile = tocWriter.getFile();
-                executor.submit(new CompressionTask(finishedFile, journalWriter.getJournalId(), finishedTocFile));
-            }
+            timeOrderedJournalFiles.add(finishedFile);
         }
         
         // create new writers and reset state.
         final File journalFile = new File(journalsDir, firstEventId + JOURNAL_FILE_EXTENSION);
         journalWriter = new StandardJournalWriter(firstEventId, journalFile, false, new StandardEventSerializer());
-        tocWriter = new StandardTocWriter(QueryUtils.getTocFile(journalFile), false, config.isAlwaysSync());
-        tocWriter.addBlockOffset(journalWriter.getSize());
-        numEventsAtEndOfLastBlock = 0;
+        try {
+            tocWriter = new StandardTocWriter(QueryUtils.getTocFile(journalFile), false, config.isAlwaysSync());
+            tocWriter.addBlockOffset(journalWriter.getSize());
+            numEventsAtEndOfLastBlock = 0;
+        } catch (final Exception e) {
+            try {
+                journalWriter.close();
+            } catch (final IOException ioe) {}
+            
+            journalWriter = null;
+            
+            throw e;
+        }
+        
+        logger.debug("Rolling over {} from {} to {}", this, finishedFile, journalFile);
     }
     
 
@@ -252,33 +289,24 @@ public class JournalingPartition implements Partition {
     
     @Override
     public void restore() throws IOException {
-        writeLock.lock();
+        lock.lock();
         try {
             // delete or rename files if stopped during rollover; compress any files that haven't been compressed
             if ( !config.isReadOnly() ) {
                 final File[] children = journalsDir.listFiles();
                 if ( children != null ) {
-                    // find the latest journal.
-                    File latestJournal = null;
-                    long latestJournalId = -1L;
-                    
                     final List<File> journalFiles = new ArrayList<>();
                     
                     // find any journal files that either haven't been compressed or were partially compressed when
                     // we last shutdown and then restart compression.
                     for ( final File file : children ) {
                         final String filename = file.getName();
-                        if ( !filename.contains(JOURNAL_FILE_EXTENSION) ) {
+                        if ( !filename.endsWith(JOURNAL_FILE_EXTENSION) ) {
                             continue;
                         }
                         
-                        final Long journalId = getJournalId(file);
-                        if ( journalId != null && journalId > latestJournalId ) {
-                            latestJournal = file;
-                            latestJournalId = journalId;
-                        }
-                        
                         journalFiles.add(file);
+                        updateSize(file.length());
                         
                         if ( !config.isCompressOnRollover() ) {
                             continue;
@@ -290,7 +318,15 @@ public class JournalingPartition implements Partition {
                                 // both the compressed and uncompressed version of this journal exist. The Compression Task was
                                 // not complete when we shutdown. Delete the compressed journal and toc and re-start the Compression Task.
                                 final File tocFile = QueryUtils.getTocFile(uncompressedFile);
-                                executor.submit(new CompressionTask(uncompressedFile, getJournalId(uncompressedFile), tocFile));
+                                executor.submit(new Runnable() {
+                                    @Override
+                                    public void run() {
+                                        final long originalSize = uncompressedFile.length();
+                                        final long compressedSize = new CompressionTask(uncompressedFile, getJournalId(uncompressedFile), tocFile).call();
+                                        final long sizeAdded = compressedSize - originalSize;
+                                        updateSize(sizeAdded);
+                                    }
+                                });
                             } else {
                                 // The compressed file exists but the uncompressed file does not. This means that we have finished
                                 // writing the compressed file and deleted the original journal file but then shutdown before
@@ -321,6 +357,22 @@ public class JournalingPartition implements Partition {
                         }
                     }
                     
+                    // we want to sort the list of all journal files.
+                    // we need to create a map of file to last mod time, rather than comparing
+                    // by using File.lastModified() because the File.lastModified() value could potentially
+                    // change while running the comparator, which violates the comparator's contract.
+                    timeOrderedJournalFiles.addAll(journalFiles);
+                    final Map<File, Long> lastModTimes = new HashMap<>();
+                    for ( final File journalFile : journalFiles ) {
+                        lastModTimes.put(journalFile, journalFile.lastModified());
+                    }
+                    Collections.sort(timeOrderedJournalFiles, new Comparator<File>() {
+                        @Override
+                        public int compare(final File o1, final File o2) {
+                            return lastModTimes.get(o1).compareTo(lastModTimes.get(o2));
+                        }
+                    });
+                    
                     // Get the first event in the earliest journal file so that we know what the earliest time available is
                     Collections.sort(journalFiles, new Comparator<File>() {
                         @Override
@@ -332,61 +384,83 @@ public class JournalingPartition implements Partition {
                     for ( final File journal : journalFiles ) {
                         try (final JournalReader reader = new StandardJournalReader(journal)) {
                             final ProvenanceEventRecord record = reader.nextEvent();
-                            this.earliestEventTime = record.getEventTime();
-                            break;
+                            if ( record != null ) {
+                                this.earliestEventTime = record.getEventTime();
+                                break;
+                            }
                         } catch (final IOException ioe) {
                         }
                     }
-                    
-                    // Whatever was the last journal for this partition, we need to remove anything for that journal
-                    // from the index and re-add them, and then sync the index. This allows us to avoid syncing
-                    // the index each time (we sync only on rollover) but allows us to still ensure that we index
-                    // all events.
-                    if ( latestJournal != null ) {
+
+                    // order such that latest journal file is first.
+                    Collections.reverse(journalFiles);
+                    for ( final File journal : journalFiles ) {
+                        try (final JournalReader reader = new StandardJournalReader(journal);
+                             final TocReader tocReader = new StandardTocReader(QueryUtils.getTocFile(journal))) {
+                            
+                            final long lastBlockOffset = tocReader.getLastBlockOffset();
+                            final ProvenanceEventRecord lastEvent = reader.getLastEvent(lastBlockOffset);
+                            if ( lastEvent != null ) {
+                                maxEventId = lastEvent.getEventId() + 1;
+                                break;
+                            }
+                        } catch (final EOFException eof) {}
+                    }
+
+                    // We need to re-index all of the journal files that have not been indexed. We can do this by determining
+                    // what is the largest event id that has been indexed for this container and section, and then re-indexing
+                    // any file that has an event with an id larger than that.
+                    // In order to do that, we iterate over the journal files in the order of newest (largest id) to oldest
+                    // (smallest id). If the first event id in a file is greater than the max indexed, we re-index the file.
+                    // Beyond that, we need to re-index one additional journal file because it's possible that if the first id
+                    // is 10 and the max index id is 15, the file containing 10 could also go up to 20. So we re-index one
+                    // file that has a min id less than what has been indexed; then we are done.
+                    final Long maxIndexedId = indexManager.getMaxEventId(containerName, String.valueOf(sectionIndex));
+                    final List<File> reindexJournals = new ArrayList<>();
+                    for ( final File journalFile : journalFiles ) {
+                        final Long firstEventId;
                         try {
-                            reindex(latestJournal);
-                        } catch (final EOFException eof) {
+                            firstEventId = getJournalId(journalFile);
+                        } catch (final NumberFormatException nfe) {
+                            // not a journal; skip this file
+                            continue;
                         }
+                        
+                        if ( maxIndexedId == null || firstEventId > maxIndexedId ) {
+                            reindexJournals.add(journalFile);
+                        } else {
+                            reindexJournals.add(journalFile);
+                            break;
+                        }
+                    }
+                    
+                    // Make sure that the indexes are not pointing to events that no longer exist.
+                    if ( journalFiles.isEmpty() ) {
+                        indexManager.deleteEventsBefore(containerName, sectionIndex, Long.MAX_VALUE);
+                    } else {
+                        final File firstJournalFile = journalFiles.get(0);
+                        indexManager.deleteEventsBefore(containerName, sectionIndex, getJournalId(firstJournalFile));
+                    }
+                    
+                    // The reindexJournals list is currently in order of newest to oldest. We need to re-index
+                    // in order of oldest to newest, so reverse the list.
+                    Collections.reverse(reindexJournals);
+                    
+                    logger.info("Reindexing {} journal files that were not found in index for container {} and section {}", reindexJournals.size(), containerName, sectionIndex);
+                    final long reindexStart = System.nanoTime();
+                    for ( final File journalFile : reindexJournals ) {
+                        indexManager.reindex(containerName, sectionIndex, getJournalId(journalFile), journalFile);
                     }
+                    final long reindexMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - reindexStart);
+                    logger.info("Finished reindexing {} journal files for container {} and section {}; reindex took {} millis", 
+                            reindexJournals.size(), containerName, sectionIndex, reindexMillis);
                 }
             }
         } finally {
-            writeLock.unlock();
+            lock.unlock();
         }
     }
 
-    
-    private void reindex(final File journalFile) throws IOException {
-        // TODO: Rework how recovery works because we now have index manager!!
-        try (final TocJournalReader reader = new TocJournalReader(containerName, String.valueOf(sectionIndex), String.valueOf(getJournalId(journalFile)), journalFile)) {
-            // We don't know which index contains the data for this journal, so remove the journal
-            // from both.
-            for (final LuceneIndexWriter indexWriter : indexWriters ) {
-                indexWriter.delete(containerName, String.valueOf(sectionIndex), String.valueOf(getJournalId(journalFile)));
-            }
-            
-            long maxId = -1L;
-            final List<JournaledProvenanceEvent> storedEvents = new ArrayList<>(1000);
-            JournaledProvenanceEvent event;
-            final LuceneIndexWriter indexWriter = indexWriters[0];
-            while ((event = reader.nextJournaledEvent()) != null ) {
-                storedEvents.add(event);
-                maxId = event.getEventId();
-                
-                if ( storedEvents.size() == 1000 ) {
-                    indexWriter.index(storedEvents);
-                    storedEvents.clear();
-                }
-            }
-
-            if ( !storedEvents.isEmpty() ) {
-                indexWriter.index(storedEvents);
-            }
-            
-            indexWriter.sync();
-            this.maxEventId = maxId;
-        }
-    }
 
     
     @Override
@@ -443,4 +517,129 @@ public class JournalingPartition implements Partition {
     public String toString() {
         return "Partition[section=" + sectionIndex + "]";
     }
+    
+    @Override
+    public void verifyWritable(final long nextId) throws IOException {
+        final long freeSpace = section.getFreeSpace();
+        final long freeMegs = freeSpace / 1024 / 1024;
+        if (freeMegs < 10) {
+            // if not at least 10 MB, don't even try to write
+            throw new IOException("Not Enough Disk Space: partition housing " + section + " has only " + freeMegs + " MB of storage available");
+        }
+        
+        rollover(nextId);
+        writable = true;
+    }
+    
+    private boolean delete(final File journalFile) {
+        for (int i=0; i < 10; i++) {
+            if ( journalFile.delete() || !journalFile.exists() ) {
+                return true;
+            } else {
+                try {
+                    Thread.sleep(100L);
+                } catch (final InterruptedException ie) {}
+            }
+        }
+        
+        return false;
+    }
+    
+    @Override
+    public void deleteOldEvents(final long earliestEventTimeToDelete) throws IOException {
+        final Set<File> removeFromTimeOrdered = new HashSet<>();
+        
+        final long start = System.nanoTime();
+        try {
+            for ( final File journalFile : timeOrderedJournalFiles ) {
+                // since these are time-ordered, if we find one that we don't want to delete, we're done.
+                if ( journalFile.lastModified() < earliestEventTimeToDelete ) {
+                    return;
+                }
+                
+                final long journalSize;
+                if ( journalFile.exists() ) {
+                    journalSize = journalFile.length();
+                } else {
+                    continue;
+                }
+                
+                if ( delete(journalFile) ) {
+                    removeFromTimeOrdered.add(journalFile);
+                } else {
+                    logger.warn("Failed to remove expired journal file {}; will attempt to delete again later", journalFile);
+                }
+                
+                updateSize(-journalSize);
+                final File tocFile = QueryUtils.getTocFile(journalFile);
+                if ( !delete(tocFile) ) {
+                    logger.warn("Failed to remove TOC file for expired journal file {}; will attempt to delete again later", journalFile);
+                }
+            }
+        } finally {
+            timeOrderedJournalFiles.removeAll(removeFromTimeOrdered);
+        }
+        
+        final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+        logger.info("Removed {} expired journal files from container {}, section {}; total time for deletion was {} millis", 
+                removeFromTimeOrdered.size(), containerName, sectionIndex, millis);
+    }
+    
+    
+    @Override
+    public void deleteOldest() throws IOException {
+        File removeFromTimeOrdered = null;
+        
+        final long start = System.nanoTime();
+        try {
+            for ( final File journalFile : timeOrderedJournalFiles ) {
+                final long journalSize;
+                if ( journalFile.exists() ) {
+                    journalSize = journalFile.length();
+                } else {
+                    continue;
+                }
+                
+                if ( delete(journalFile) ) {
+                    removeFromTimeOrdered = journalFile;
+                } else {
+                    throw new IOException("Cannot delete oldest event file " + journalFile);
+                }
+                
+                final File tocFile = QueryUtils.getTocFile(journalFile);
+                if ( !delete(tocFile) ) {
+                    logger.warn("Failed to remove TOC file for expired journal file {}; will attempt to delete again later", journalFile);
+                }
+                
+                updateSize(-journalSize);
+                indexManager.deleteEvents(containerName, sectionIndex, getJournalId(journalFile));
+            }
+        } finally {
+            if ( removeFromTimeOrdered != null ) {
+                timeOrderedJournalFiles.remove(removeFromTimeOrdered);
+                
+                final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+                logger.info("Removed oldest event file {} from container {}, section {}; total time for deletion was {} millis", 
+                        removeFromTimeOrdered, containerName, sectionIndex, millis);
+            } else {
+                logger.debug("No journals to remove for {}", this);
+            }
+        }
+    }
+    
+    
+    @Override
+    public long getPartitionSize() {
+        return partitionSize.get();
+    }
+    
+    @Override
+    public long getContainerSize() {
+        return containerSize.get();
+    }
+    
+    @Override
+    public String getContainerName() {
+        return containerName;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f23f36d7/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/Partition.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/Partition.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/Partition.java
index e77c8d5..9efae04 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/Partition.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/Partition.java
@@ -66,6 +66,12 @@ public interface Partition {
     long getMaxEventId();
     
     /**
+     * Returns the name of the container that this Partition operates on
+     * @return
+     */
+    String getContainerName();
+    
+    /**
      * Returns the locations of events that have an id at least equal to minEventId, returning the events
      * with the smallest ID's possible that are greater than minEventId
      *
@@ -82,4 +88,39 @@ public interface Partition {
      * @throws IOException
      */
     Long getEarliestEventTime() throws IOException;
+    
+    /**
+     * Verifies that the partition is able to be written to. A Partition may need to create a new journal
+     * in order to verify. In this case, the nextId is provided so that the Partition knows the minimum event id
+     * that will be written to the partition
+     * 
+     * @throws IOException
+     */
+    void verifyWritable(long nextId) throws IOException;
+    
+    
+    /**
+     * Deletes any journal for this partition that occurred before the given time
+     * @param earliestEventTimeToDelete
+     * @throws IOException
+     */
+    void deleteOldEvents(long earliestEventTimeToDelete) throws IOException;
+
+    /**
+     * Returns the size of this partition in bytes
+     * @return
+     */
+    long getPartitionSize();
+    
+    /**
+     * Returns the size of the journals in the entire container
+     * @return
+     */
+    long getContainerSize();
+    
+    /**
+     * Deletes the oldest journal from this partition
+     * @throws IOException
+     */
+    void deleteOldest() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f23f36d7/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/PartitionManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/PartitionManager.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/PartitionManager.java
index c0a56c4..14d5b17 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/PartitionManager.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/PartitionManager.java
@@ -52,10 +52,23 @@ public interface PartitionManager {
      * @param writeAction specifies whether or not the action writes to the repository
      * @return
      */
-    <T> Set<T> withEachPartition(PartitionAction<T> action) throws IOException;
+//    <T> Set<T> withEachPartition(PartitionAction<T> action, boolean writeAction) throws IOException;
     
     /**
-     * Performs the given Action on each partition and returns the set of results. Unlike
+     * Performs the given Action on each partition and returns the set of results. This method does 
+     * not use the thread pool in order to perform the request in parallel. This is desirable for 
+     * very quick functions, as the thread pool can be fully utilized, resulting in a quick function 
+     * taking far longer than it should.
+     * 
+     * @param action the action to perform
+     * @param writeAction specifies whether or not the action writes to the repository
+     * @return
+     */
+    <T> Set<T> withEachPartitionSerially(PartitionAction<T> action, boolean writeAction) throws IOException;
+    
+    
+    /**
+     * Performs the given Action on each partition. Unlike
      * {@link #withEachPartition(PartitionAction))}, this method does not use the thread pool
      * in order to perform the request in parallel. This is desirable for very quick functions,
      * as the thread pool can be fully utilized, resulting in a quick function taking far longer
@@ -65,7 +78,7 @@ public interface PartitionManager {
      * @param writeAction specifies whether or not the action writes to the repository
      * @return
      */
-    <T> Set<T> withEachPartitionSerially(PartitionAction<T> action) throws IOException;
+    void withEachPartitionSerially(VoidPartitionAction action, boolean writeAction) throws IOException;
     
     /**
      * Performs the given Action to each partition, optionally waiting for the action to complete
@@ -74,7 +87,13 @@ public interface PartitionManager {
      * @param async if <code>true</code>, will perform the action asynchronously; if <code>false</code>, will
      *  wait for the action to complete before returning
      */
-    void withEachPartition(VoidPartitionAction action, boolean async);
+//    void withEachPartition(VoidPartitionAction action, boolean async);
     
     void shutdown();
+    
+    /**
+     * Triggers the Partition Manager to delete events from journals and indices based on the sizes of the containers
+     * and overall size of the repository
+     */
+    void deleteEventsBasedOnSize();
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f23f36d7/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/QueuingPartitionManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/QueuingPartitionManager.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/QueuingPartitionManager.java
index 10af697..9b5c442 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/QueuingPartitionManager.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/QueuingPartitionManager.java
@@ -19,19 +19,18 @@ package org.apache.nifi.provenance.journaling.partition;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.nifi.provenance.journaling.config.JournalingRepositoryConfig;
 import org.apache.nifi.provenance.journaling.index.IndexManager;
@@ -43,16 +42,26 @@ public class QueuingPartitionManager implements PartitionManager {
     
     private static final Logger logger = LoggerFactory.getLogger(QueuingPartitionManager.class);
     
+    private final IndexManager indexManager;
     private final JournalingRepositoryConfig config;
     private final BlockingQueue<Partition> partitionQueue;
     private final JournalingPartition[] partitionArray;
-    private final ExecutorService executor;
+    private final AtomicLong eventIdGenerator;
     private volatile boolean shutdown = false;
     
-    private final AtomicInteger blacklistedCount = new AtomicInteger(0);
+    private final Set<Partition> blackListedPartitions = Collections.synchronizedSet(new HashSet<Partition>());
     
-    public QueuingPartitionManager(final IndexManager indexManager, final JournalingRepositoryConfig config, final ExecutorService executor) throws IOException {
+    public QueuingPartitionManager(final IndexManager indexManager, final AtomicLong eventIdGenerator, final JournalingRepositoryConfig config, final ScheduledExecutorService workerExecutor, final ExecutorService compressionExecutor) throws IOException {
+        this.indexManager = indexManager;
         this.config = config;
+        this.eventIdGenerator = eventIdGenerator;
+        
+        // We can consider using a PriorityQueue here instead. Keep track of how many Partitions are being written
+        // to for each container, as a container usually maps to a physical drive. Then, prioritize the queue
+        // so that the partitions that belong to Container A get a higher priority than those belonging to Container B
+        // if there are currently more partitions on Container B being written to (i.e., we prefer a partition for the
+        // container that is the least used at this moment). Would require significant performance testing to see if it
+        // really provides any benefit.
         this.partitionQueue = new LinkedBlockingQueue<>(config.getPartitionCount());
         this.partitionArray = new JournalingPartition[config.getPartitionCount()];
         
@@ -61,16 +70,24 @@ public class QueuingPartitionManager implements PartitionManager {
             containerTuples.add(new Tuple<>(entry.getKey(), entry.getValue()));
         }
         
+        final Map<String, AtomicLong> containerSizes = new HashMap<>();
+        for ( final String containerName : config.getContainers().keySet() ) {
+            containerSizes.put(containerName, new AtomicLong(0L));
+        }
+        
         for (int i=0; i < config.getPartitionCount(); i++) {
             final Tuple<String, File> tuple = containerTuples.get(i % containerTuples.size());
             final File section = new File(tuple.getValue(), String.valueOf(i));
             
-            final JournalingPartition partition = new JournalingPartition(indexManager, tuple.getKey(), i, section, config, executor);
+            final String containerName = tuple.getKey();
+            final JournalingPartition partition = new JournalingPartition(indexManager, containerName, i, 
+                    section, config, containerSizes.get(containerName), compressionExecutor);
+            partition.restore();
             partitionQueue.offer(partition);
             partitionArray[i] = partition;
         }
         
-        this.executor = executor;
+        workerExecutor.scheduleWithFixedDelay(new CheckBlackListedPartitions(), 30, 30, TimeUnit.SECONDS);
     }
     
     @Override
@@ -82,32 +99,63 @@ public class QueuingPartitionManager implements PartitionManager {
         }
     }
     
-    private Partition nextPartition() {
+    private Partition nextPartition(final boolean writeAction) {
         Partition partition = null;
         
-        while(partition == null) {
-            if (shutdown) {
-                throw new RuntimeException("Journaling Provenance Repository is shutting down");
-            }
-            
-            try {
-                partition = partitionQueue.poll(1, TimeUnit.SECONDS);
-            } catch (final InterruptedException ie) {
-            }
-            
-            if ( partition == null ) {
-                if ( blacklistedCount.get() >= config.getPartitionCount() ) {
-                    throw new RuntimeException("Cannot persist to the Journal Provenance Repository because all partitions have been blacklisted due to write failures");
+        final List<Partition> partitionsSkipped = new ArrayList<>();
+        try {
+            while (partition == null) {
+                if (shutdown) {
+                    throw new RuntimeException("Journaling Provenance Repository is shutting down");
+                }
+                
+                try {
+                    partition = partitionQueue.poll(1, TimeUnit.SECONDS);
+                } catch (final InterruptedException ie) {
+                }
+                
+                if ( partition == null ) {
+                    if ( blackListedPartitions.size() >= config.getPartitionCount() ) {
+                        throw new RuntimeException("Cannot persist to the Journal Provenance Repository because all partitions have been blacklisted due to write failures");
+                    }
+                    
+                    // we are out of partitions. Add back all of the partitions that we skipped so we 
+                    // can try them again.
+                    partitionQueue.addAll(partitionsSkipped);
+                    partitionsSkipped.clear();
+                } else if (writeAction) {
+                    // determine if the container is full.
+                    final String containerName = partition.getContainerName();
+                    long desiredMaxContainerCapacity = config.getMaxCapacity(containerName);
+                    
+                    // If no max capacity set for the container itself, use 1/N of repo max
+                    // where N is the number of containers
+                    if ( desiredMaxContainerCapacity == config.getMaxStorageCapacity() ) {
+                        desiredMaxContainerCapacity = config.getMaxStorageCapacity() / config.getContainers().size();
+                    }
+                    
+                    // if the partition is more than 10% over its desired capacity, we don't want to write to it.
+                    if ( partition.getContainerSize() > 1.1 * desiredMaxContainerCapacity ) {
+                        partitionsSkipped.add(partition);
+                        continue;
+                    }
                 }
             }
+        } finally {
+            partitionQueue.addAll( partitionsSkipped );
         }
         
         return partition;
     }
     
+    
+    private void blackList(final Partition partition) {
+        blackListedPartitions.add(partition);
+    }
+    
     @Override
     public <T> T withPartition(final PartitionAction<T> action, final boolean writeAction) throws IOException {
-        final Partition partition = nextPartition();
+        final Partition partition = nextPartition(writeAction);
 
         boolean ioe = false;
         try {
@@ -117,8 +165,7 @@ public class QueuingPartitionManager implements PartitionManager {
             throw e;
         } finally {
             if ( ioe && writeAction ) {
-                // We failed to write to this Partition. This partition will no longer be usable until NiFi is restarted!
-                blacklistedCount.incrementAndGet();
+                blackList(partition);
             } else {
                 partitionQueue.offer(partition);
             }
@@ -127,7 +174,7 @@ public class QueuingPartitionManager implements PartitionManager {
     
     @Override
     public void withPartition(final VoidPartitionAction action, final boolean writeAction) throws IOException {
-        final Partition partition = nextPartition();
+        final Partition partition = nextPartition(writeAction);
 
         boolean ioe = false;
         try {
@@ -137,8 +184,7 @@ public class QueuingPartitionManager implements PartitionManager {
             throw e;
         } finally {
             if ( ioe && writeAction ) {
-                // We failed to write to this Partition. This partition will no longer be usable until NiFi is restarted!
-                blacklistedCount.incrementAndGet();
+                blackList(partition);
             } else {
                 partitionQueue.offer(partition);
             }
@@ -146,89 +192,234 @@ public class QueuingPartitionManager implements PartitionManager {
     }
 
     
+//    @Override
+//    public <T> Set<T> withEachPartition(final PartitionAction<T> action) throws IOException {
+//        if ( writeAction && blackListedPartitions.size() > 0 ) {
+//            throw new IOException("Cannot perform action {} because at least one partition has been blacklisted (i.e., writint to the partition failed)");
+//        }
+//
+//        final Set<T> results = new HashSet<>(partitionArray.length);
+//        
+//        final Map<Partition, Future<T>> futures = new HashMap<>(partitionArray.length);
+//        for ( final Partition partition : partitionArray ) {
+//            final Callable<T> callable = new Callable<T>() {
+//                @Override
+//                public T call() throws Exception {
+//                    return action.perform(partition);
+//                }
+//            };
+//            
+//            final Future<T> future = executor.submit(callable);
+//            futures.put(partition, future);
+//        }
+//        
+//        for ( final Map.Entry<Partition, Future<T>> entry : futures.entrySet() ) {
+//            try {
+//                final T result = entry.getValue().get();
+//                results.add(result);
+//            } catch (final ExecutionException ee) {
+//                final Throwable cause = ee.getCause();
+//                if ( cause instanceof IOException ) {
+//                    throw (IOException) cause;
+//                } else {
+//                    throw new RuntimeException("Failed to query Partition " + entry.getKey() + " due to " + cause, cause);
+//                }
+//            } catch (InterruptedException e) {
+//                throw new RuntimeException(e);
+//            }
+//        }
+//        
+//        return results;
+//    }
+    
     @Override
-    public <T> Set<T> withEachPartition(final PartitionAction<T> action) throws IOException {
-        final Set<T> results = new HashSet<>(partitionArray.length);
-        
-        // TODO: Do not use blacklisted partitions.
-        final Map<Partition, Future<T>> futures = new HashMap<>(partitionArray.length);
-        for ( final Partition partition : partitionArray ) {
-            final Callable<T> callable = new Callable<T>() {
-                @Override
-                public T call() throws Exception {
-                    return action.perform(partition);
-                }
-            };
-            
-            final Future<T> future = executor.submit(callable);
-            futures.put(partition, future);
+    public <T> Set<T> withEachPartitionSerially(final PartitionAction<T> action, final boolean writeAction) throws IOException {
+        if ( writeAction && blackListedPartitions.size() > 0 ) {
+            throw new IOException("Cannot perform action {} because at least one partition has been blacklisted (i.e., writint to the partition failed)");
         }
         
-        for ( final Map.Entry<Partition, Future<T>> entry : futures.entrySet() ) {
-            try {
-                final T result = entry.getValue().get();
-                results.add(result);
-            } catch (final ExecutionException ee) {
-                final Throwable cause = ee.getCause();
-                if ( cause instanceof IOException ) {
-                    throw (IOException) cause;
-                } else {
-                    throw new RuntimeException("Failed to query Partition " + entry.getKey() + " due to " + cause, cause);
-                }
-            } catch (InterruptedException e) {
-                throw new RuntimeException(e);
-            }
+        final Set<T> results = new HashSet<>(partitionArray.length);
+        for ( final Partition partition : partitionArray ) {
+            results.add( action.perform(partition) );
         }
         
         return results;
     }
     
     @Override
-    public <T> Set<T> withEachPartitionSerially(final PartitionAction<T> action) throws IOException {
-        // TODO: Do not use blacklisted partitions.
-        final Set<T> results = new HashSet<>(partitionArray.length);
+    public void withEachPartitionSerially(final VoidPartitionAction action, final boolean writeAction) throws IOException {
+        if ( writeAction && blackListedPartitions.size() > 0 ) {
+            throw new IOException("Cannot perform action {} because at least one partition has been blacklisted (i.e., writint to the partition failed)");
+        }
+        
         for ( final Partition partition : partitionArray ) {
-            results.add( action.perform(partition) );
+            action.perform(partition);
+        }
+    }
+    
+//    @Override
+//    public void withEachPartition(final VoidPartitionAction action, final boolean async) {
+//        // TODO: skip blacklisted partitions
+//        final Map<Partition, Future<?>> futures = new HashMap<>(partitionArray.length);
+//        for ( final Partition partition : partitionArray ) {
+//            final Runnable runnable = new Runnable() {
+//                @Override
+//                public void run() {
+//                    try {
+//                        action.perform(partition);
+//                    } catch (final Throwable t) {
+//                        logger.error("Failed to perform action against " + partition + " due to " + t);
+//                        if ( logger.isDebugEnabled() ) {
+//                            logger.error("", t);
+//                        }
+//                    }
+//                }
+//            };
+//            
+//            final Future<?> future = executor.submit(runnable);
+//            futures.put(partition, future);
+//        }
+//        
+//        if ( !async ) {
+//            for ( final Map.Entry<Partition, Future<?>> entry : futures.entrySet() ) {
+//                try {
+//                    // throw any exception thrown by runnable
+//                    entry.getValue().get();
+//                } catch (final ExecutionException ee) {
+//                    final Throwable cause = ee.getCause();
+//                    throw new RuntimeException("Failed to query Partition " + entry.getKey() + " due to " + cause, cause);
+//                } catch (InterruptedException e) {
+//                    throw new RuntimeException(e);
+//                }
+//            }
+//        }
+//    }
+    
+    private long getTotalSize() {
+        long totalSize = 0L;
+
+        for ( final JournalingPartition partition : partitionArray ) {
+            totalSize += partition.getPartitionSize();
         }
         
-        return results;
+        for ( final String containerName : config.getContainers().keySet() ) {
+            totalSize += indexManager.getSize(containerName);
+        }
+        
+        return totalSize;
+    }
+    
+    
+    /**
+     * Responsible for looking at partitions that have been marked as blacklisted and checking if they
+     * are able to be written to now. If so, adds them back to the partition queue; otherwise, leaves
+     * them as blacklisted
+     */
+    private class CheckBlackListedPartitions implements Runnable {
+        @Override
+        public void run() {
+            final Set<Partition> reclaimed = new HashSet<>();
+            
+            final Set<Partition> partitions = new HashSet<>(blackListedPartitions);
+            for ( final Partition partition : partitions ) {
+                final long nextId = eventIdGenerator.get();
+                if ( nextId <= 0 ) {
+                    // we don't have an ID to use yet. Don't attempt to do anything yet.
+                    return;
+                }
+                
+                try {
+                    partition.verifyWritable(nextId);
+                    reclaimed.add(partition);
+                 } catch (final IOException ioe) {
+                     logger.debug("{} is still blackListed due to {}", partition, ioe);
+                 }
+            }
+            
+            // any partition that is reclaimable is now removed from the set of blacklisted
+            // partitions and added back to our queue of partitions
+            blackListedPartitions.removeAll(reclaimed);
+            partitionQueue.addAll(reclaimed);
+        }
     }
     
+    
     @Override
-    public void withEachPartition(final VoidPartitionAction action, final boolean async) {
-        // TODO: Do not use blacklisted partitions.
-        final Map<Partition, Future<?>> futures = new HashMap<>(partitionArray.length);
-        for ( final Partition partition : partitionArray ) {
-            final Runnable runnable = new Runnable() {
-                @Override
-                public void run() {
+    public void deleteEventsBasedOnSize() {
+        final Map<String, List<JournalingPartition>> containerPartitionMap = new HashMap<>();
+        
+        for ( final JournalingPartition partition : partitionArray ) {
+            final String container = partition.getContainerName();
+            List<JournalingPartition> list = containerPartitionMap.get(container);
+            if ( list == null ) {
+                list = new ArrayList<>();
+                containerPartitionMap.put(container, list);
+            }
+            
+            list.add(partition);
+        }
+        
+        int iterations = 0;
+        for ( final String containerName : config.getContainers().keySet() ) {
+            // continue as long as we need to delete data from this container.
+            while (true) {
+                // don't hammer the disks if we can't delete anything
+                if ( iterations++ > 0 ) {
+                    try {
+                        Thread.sleep(1000L);
+                    } catch (final InterruptedException ie) {}
+                }
+                
+                final List<JournalingPartition> containerPartitions = containerPartitionMap.get(containerName);
+                final long containerSize = containerPartitions.get(0).getContainerSize();
+                final long maxContainerCapacity = config.getMaxCapacity(containerName);
+                if ( containerSize < maxContainerCapacity ) {
+                    break;
+                }
+                
+                logger.debug("Container {} exceeds max capacity of {} bytes with a size of {} bytes; deleting oldest events", containerName, maxContainerCapacity, containerSize);
+                
+                // container is too large. Delete oldest journal from each partition in this container.
+                for ( final Partition partition : containerPartitions ) {
                     try {
-                        action.perform(partition);
-                    } catch (final Throwable t) {
-                        logger.error("Failed to perform action against " + partition + " due to " + t);
+                        partition.deleteOldest();
+                    } catch (final IOException ioe) {
+                        logger.error("Failed to delete events from {} due to {}", partition, ioe.toString());
                         if ( logger.isDebugEnabled() ) {
-                            logger.error("", t);
+                            logger.error("", ioe);
                         }
                     }
                 }
-            };
-            
-            final Future<?> future = executor.submit(runnable);
-            futures.put(partition, future);
+            }
         }
         
-        if ( !async ) {
-            for ( final Map.Entry<Partition, Future<?>> entry : futures.entrySet() ) {
+        long totalSize;
+        iterations = 0;
+        while ((totalSize = getTotalSize()) >= config.getMaxStorageCapacity()) {
+            logger.debug("Provenance Repository exceeds max capacity of {} bytes with a size of {}; deleting oldest events", config.getMaxStorageCapacity(), totalSize);
+            
+            // don't hammer the disks if we can't delete anything
+            if ( iterations++ > 0 ) {
                 try {
-                    // throw any exception thrown by runnable
-                    entry.getValue().get();
-                } catch (final ExecutionException ee) {
-                    final Throwable cause = ee.getCause();
-                    throw new RuntimeException("Failed to query Partition " + entry.getKey() + " due to " + cause, cause);
-                } catch (InterruptedException e) {
-                    throw new RuntimeException(e);
+                    Thread.sleep(1000L);
+                } catch (final InterruptedException ie) {}
+            }
+
+            for ( final Partition partition : partitionArray ) {
+                try {
+                    partition.deleteOldest();
+                } catch (final IOException ioe) {
+                    logger.error("Failed to delete events from {} due to {}", partition, ioe.toString());
+                    if ( logger.isDebugEnabled() ) {
+                        logger.error("", ioe);
+                    }
                 }
             }
+            
+            // don't hammer the disks if we can't delete anything
+            try {
+                Thread.sleep(1000L);
+            } catch (final InterruptedException ie) {}
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f23f36d7/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/query/QueryManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/query/QueryManager.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/query/QueryManager.java
index 4edc6ad..c5516aa 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/query/QueryManager.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/query/QueryManager.java
@@ -16,10 +16,14 @@
  */
 package org.apache.nifi.provenance.journaling.query;
 
+import java.io.Closeable;
+
+import org.apache.nifi.provenance.ProvenanceEventRepository;
+import org.apache.nifi.provenance.lineage.ComputeLineageSubmission;
 import org.apache.nifi.provenance.search.Query;
 import org.apache.nifi.provenance.search.QuerySubmission;
 
-public interface QueryManager {
+public interface QueryManager extends Closeable {
     /**
      * Submits an asynchronous request to process the given query, returning an
      * identifier that can be used to fetch the results at a later time
@@ -39,4 +43,51 @@ public interface QueryManager {
      * @return
      */
     QuerySubmission retrieveQuerySubmission(String queryIdentifier);
+    
+    /**
+     * Returns the {@link ComputeLineageSubmission} associated with the given
+     * identifier
+     *
+     * @param lineageIdentifier
+     * @return
+     */
+    ComputeLineageSubmission retrieveLineageSubmission(String lineageIdentifier);
+    
+    /**
+     * Submits a Lineage Computation to be completed and returns the
+     * AsynchronousLineageResult that indicates the status of the request and
+     * the results, if the computation is complete.
+     *
+     * @param flowFileUuid the UUID of the FlowFile for which the Lineage should
+     * be calculated
+     * @return a {@link ComputeLineageSubmission} object that can be used to
+     * check if the computing is complete and if so get the results
+     */
+    ComputeLineageSubmission submitLineageComputation(String flowFileUuid);
+    
+    /**
+     * Submits a request to expand the parents of the event with the given id
+     *
+     * @param eventRepo the repository from which to obtain the event information
+     * @param eventId the one-up id of the Event to expand
+     * @return
+     *
+     * @throws IllegalArgumentException if the given identifier identifies a
+     * Provenance Event that has a Type that is not expandable or if the
+     * identifier cannot be found
+     */
+    ComputeLineageSubmission submitExpandParents(final ProvenanceEventRepository eventRepo, long eventId);
+
+    /**
+     * Submits a request to expand the children of the event with the given id
+     *
+     * @param eventRepo the repository from which to obtain the event information
+     * @param eventId the one-up id of the Event
+     * @return
+     *
+     * @throws IllegalArgumentException if the given identifier identifies a
+     * Provenance Event that has a Type that is not expandable or if the
+     * identifier cannot be found
+     */
+    ComputeLineageSubmission submitExpandChildren(final ProvenanceEventRepository eventRepo, long eventId);
 }