You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/02/27 21:47:36 UTC
[09/12] incubator-nifi git commit: NIFI-388: Initial implementation
of prov repo; not yet finished but pushing so that the code is not lost
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f23f36d7/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/JournalingPartition.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/JournalingPartition.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/JournalingPartition.java
index 1ace37f..bba6899 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/JournalingPartition.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/JournalingPartition.java
@@ -23,12 +23,16 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.journaling.JournaledProvenanceEvent;
@@ -37,9 +41,6 @@ import org.apache.nifi.provenance.journaling.config.JournalingRepositoryConfig;
import org.apache.nifi.provenance.journaling.index.EventIndexSearcher;
import org.apache.nifi.provenance.journaling.index.EventIndexWriter;
import org.apache.nifi.provenance.journaling.index.IndexManager;
-import org.apache.nifi.provenance.journaling.index.LuceneIndexSearcher;
-import org.apache.nifi.provenance.journaling.index.LuceneIndexWriter;
-import org.apache.nifi.provenance.journaling.index.MultiIndexSearcher;
import org.apache.nifi.provenance.journaling.index.QueryUtils;
import org.apache.nifi.provenance.journaling.io.StandardEventSerializer;
import org.apache.nifi.provenance.journaling.journals.JournalReader;
@@ -47,8 +48,9 @@ import org.apache.nifi.provenance.journaling.journals.JournalWriter;
import org.apache.nifi.provenance.journaling.journals.StandardJournalReader;
import org.apache.nifi.provenance.journaling.journals.StandardJournalWriter;
import org.apache.nifi.provenance.journaling.tasks.CompressionTask;
+import org.apache.nifi.provenance.journaling.toc.StandardTocReader;
import org.apache.nifi.provenance.journaling.toc.StandardTocWriter;
-import org.apache.nifi.provenance.journaling.toc.TocJournalReader;
+import org.apache.nifi.provenance.journaling.toc.TocReader;
import org.apache.nifi.provenance.journaling.toc.TocWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,29 +64,32 @@ public class JournalingPartition implements Partition {
private final File section;
private final File journalsDir;
- private final JournalingRepositoryConfig config;
+ private final IndexManager indexManager;
+ private final AtomicLong containerSize;
private final ExecutorService executor;
-
- private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
- private final Lock readLock = rwLock.readLock();
- private final Lock writeLock = rwLock.writeLock();
+ private final JournalingRepositoryConfig config;
private JournalWriter journalWriter;
private TocWriter tocWriter;
private int numEventsAtEndOfLastBlock = 0;
private volatile long maxEventId = -1L;
private volatile Long earliestEventTime = null;
+
+ private final Lock lock = new ReentrantLock();
+ private boolean writable = true; // guarded by lock
+ private final List<File> timeOrderedJournalFiles = Collections.synchronizedList(new ArrayList<File>());
+ private final AtomicLong partitionSize = new AtomicLong(0L);
- private final IndexManager indexManager;
-
- public JournalingPartition(final IndexManager indexManager, final String containerName, final int sectionIndex, final File sectionDir, final JournalingRepositoryConfig config, final ExecutorService executor) throws IOException {
+ public JournalingPartition(final IndexManager indexManager, final String containerName, final int sectionIndex, final File sectionDir,
+ final JournalingRepositoryConfig config, final AtomicLong containerSize, final ExecutorService compressionExecutor) throws IOException {
this.indexManager = indexManager;
+ this.containerSize = containerSize;
this.containerName = containerName;
this.sectionIndex = sectionIndex;
this.section = sectionDir;
this.journalsDir = new File(section, "journals");
this.config = config;
- this.executor = executor;
+ this.executor = compressionExecutor;
if (!journalsDir.exists() && !journalsDir.mkdirs()) {
throw new IOException("Could not create directory " + section);
@@ -119,18 +124,24 @@ public class JournalingPartition implements Partition {
@Override
public List<JournaledProvenanceEvent> registerEvents(final Collection<ProvenanceEventRecord> events, final long firstEventId) throws IOException {
- writeLock.lock();
+ if ( events.isEmpty() ) {
+ return Collections.emptyList();
+ }
+
+ lock.lock();
try {
+ if ( !writable ) {
+ throw new IOException("Cannot write to partition " + this + " because there was previously a write failure. The partition will fix itself in time if I/O problems are resolved");
+ }
+
final JournalWriter writer = getJournalWriter(firstEventId);
- if ( !events.isEmpty() ) {
- final int eventsWritten = writer.getEventCount();
- if ( eventsWritten - numEventsAtEndOfLastBlock > config.getBlockSize() ) {
- writer.finishBlock();
- tocWriter.addBlockOffset(writer.getSize());
- numEventsAtEndOfLastBlock = eventsWritten;
- writer.beginNewBlock();
- }
+ final int eventsWritten = writer.getEventCount();
+ if ( eventsWritten - numEventsAtEndOfLastBlock > config.getBlockSize() ) {
+ writer.finishBlock();
+ tocWriter.addBlockOffset(writer.getSize());
+ numEventsAtEndOfLastBlock = eventsWritten;
+ writer.beginNewBlock();
}
writer.write(events, firstEventId);
@@ -139,7 +150,7 @@ public class JournalingPartition implements Partition {
long id = firstEventId;
for (final ProvenanceEventRecord event : events) {
final JournaledStorageLocation location = new JournaledStorageLocation(containerName, String.valueOf(sectionIndex),
- String.valueOf(writer.getJournalId()), tocWriter.getCurrentBlockIndex(), id++);
+ writer.getJournalId(), tocWriter.getCurrentBlockIndex(), id++);
final JournaledProvenanceEvent storedEvent = new JournaledProvenanceEvent(event, location);
storedEvents.add(storedEvent);
}
@@ -169,8 +180,11 @@ public class JournalingPartition implements Partition {
}
return storedEvents;
+ } catch (final IOException ioe) {
+ writable = false;
+ throw ioe;
} finally {
- writeLock.unlock();
+ lock.unlock();
}
}
@@ -194,43 +208,66 @@ public class JournalingPartition implements Partition {
return false;
}
+ private void updateSize(final long delta) {
+ partitionSize.addAndGet(delta);
+ containerSize.addAndGet(delta);
+ }
+
// MUST be called with write lock held.
+ /**
+ * Rolls over the current journal (if any) and begins writing top a new journal.
+ *
+ * <p>
+ * <b>NOTE:</b> This method MUST be called with the write lock held!!
+ * </p>
+ *
+ * @param firstEventId the ID of the first event to add to this journal
+ * @throws IOException
+ */
private void rollover(final long firstEventId) throws IOException {
- // TODO: Rework how rollover works because we now have index manager!!
-
// if we have a writer already, close it and initiate rollover actions
+ final File finishedFile = journalWriter == null ? null : journalWriter.getJournalFile();
if ( journalWriter != null ) {
journalWriter.finishBlock();
journalWriter.close();
tocWriter.close();
- final EventIndexWriter curWriter = getIndexWriter();
+ final File finishedTocFile = tocWriter.getFile();
+ updateSize(finishedFile.length());
+
executor.submit(new Runnable() {
@Override
public void run() {
- try {
- curWriter.sync();
- } catch (final IOException e) {
-
- // TODO Auto-generated catch block
- e.printStackTrace();
+ if ( config.isCompressOnRollover() ) {
+ final long originalSize = finishedFile.length();
+ final long compressedFileSize = new CompressionTask(finishedFile, journalWriter.getJournalId(), finishedTocFile).call();
+ final long sizeAdded = compressedFileSize - originalSize;
+ updateSize(sizeAdded);
}
}
});
- if ( config.isCompressOnRollover() ) {
- final File finishedFile = journalWriter.getJournalFile();
- final File finishedTocFile = tocWriter.getFile();
- executor.submit(new CompressionTask(finishedFile, journalWriter.getJournalId(), finishedTocFile));
- }
+ timeOrderedJournalFiles.add(finishedFile);
}
// create new writers and reset state.
final File journalFile = new File(journalsDir, firstEventId + JOURNAL_FILE_EXTENSION);
journalWriter = new StandardJournalWriter(firstEventId, journalFile, false, new StandardEventSerializer());
- tocWriter = new StandardTocWriter(QueryUtils.getTocFile(journalFile), false, config.isAlwaysSync());
- tocWriter.addBlockOffset(journalWriter.getSize());
- numEventsAtEndOfLastBlock = 0;
+ try {
+ tocWriter = new StandardTocWriter(QueryUtils.getTocFile(journalFile), false, config.isAlwaysSync());
+ tocWriter.addBlockOffset(journalWriter.getSize());
+ numEventsAtEndOfLastBlock = 0;
+ } catch (final Exception e) {
+ try {
+ journalWriter.close();
+ } catch (final IOException ioe) {}
+
+ journalWriter = null;
+
+ throw e;
+ }
+
+ logger.debug("Rolling over {} from {} to {}", this, finishedFile, journalFile);
}
@@ -252,33 +289,24 @@ public class JournalingPartition implements Partition {
@Override
public void restore() throws IOException {
- writeLock.lock();
+ lock.lock();
try {
// delete or rename files if stopped during rollover; compress any files that haven't been compressed
if ( !config.isReadOnly() ) {
final File[] children = journalsDir.listFiles();
if ( children != null ) {
- // find the latest journal.
- File latestJournal = null;
- long latestJournalId = -1L;
-
final List<File> journalFiles = new ArrayList<>();
// find any journal files that either haven't been compressed or were partially compressed when
// we last shutdown and then restart compression.
for ( final File file : children ) {
final String filename = file.getName();
- if ( !filename.contains(JOURNAL_FILE_EXTENSION) ) {
+ if ( !filename.endsWith(JOURNAL_FILE_EXTENSION) ) {
continue;
}
- final Long journalId = getJournalId(file);
- if ( journalId != null && journalId > latestJournalId ) {
- latestJournal = file;
- latestJournalId = journalId;
- }
-
journalFiles.add(file);
+ updateSize(file.length());
if ( !config.isCompressOnRollover() ) {
continue;
@@ -290,7 +318,15 @@ public class JournalingPartition implements Partition {
// both the compressed and uncompressed version of this journal exist. The Compression Task was
// not complete when we shutdown. Delete the compressed journal and toc and re-start the Compression Task.
final File tocFile = QueryUtils.getTocFile(uncompressedFile);
- executor.submit(new CompressionTask(uncompressedFile, getJournalId(uncompressedFile), tocFile));
+ executor.submit(new Runnable() {
+ @Override
+ public void run() {
+ final long originalSize = uncompressedFile.length();
+ final long compressedSize = new CompressionTask(uncompressedFile, getJournalId(uncompressedFile), tocFile).call();
+ final long sizeAdded = compressedSize - originalSize;
+ updateSize(sizeAdded);
+ }
+ });
} else {
// The compressed file exists but the uncompressed file does not. This means that we have finished
// writing the compressed file and deleted the original journal file but then shutdown before
@@ -321,6 +357,22 @@ public class JournalingPartition implements Partition {
}
}
+ // we want to sort the list of all journal files.
+ // we need to create a map of file to last mod time, rather than comparing
+ // by using File.lastModified() because the File.lastModified() value could potentially
+ // change while running the comparator, which violates the comparator's contract.
+ timeOrderedJournalFiles.addAll(journalFiles);
+ final Map<File, Long> lastModTimes = new HashMap<>();
+ for ( final File journalFile : journalFiles ) {
+ lastModTimes.put(journalFile, journalFile.lastModified());
+ }
+ Collections.sort(timeOrderedJournalFiles, new Comparator<File>() {
+ @Override
+ public int compare(final File o1, final File o2) {
+ return lastModTimes.get(o1).compareTo(lastModTimes.get(o2));
+ }
+ });
+
// Get the first event in the earliest journal file so that we know what the earliest time available is
Collections.sort(journalFiles, new Comparator<File>() {
@Override
@@ -332,61 +384,83 @@ public class JournalingPartition implements Partition {
for ( final File journal : journalFiles ) {
try (final JournalReader reader = new StandardJournalReader(journal)) {
final ProvenanceEventRecord record = reader.nextEvent();
- this.earliestEventTime = record.getEventTime();
- break;
+ if ( record != null ) {
+ this.earliestEventTime = record.getEventTime();
+ break;
+ }
} catch (final IOException ioe) {
}
}
-
- // Whatever was the last journal for this partition, we need to remove anything for that journal
- // from the index and re-add them, and then sync the index. This allows us to avoid syncing
- // the index each time (we sync only on rollover) but allows us to still ensure that we index
- // all events.
- if ( latestJournal != null ) {
+
+ // order such that latest journal file is first.
+ Collections.reverse(journalFiles);
+ for ( final File journal : journalFiles ) {
+ try (final JournalReader reader = new StandardJournalReader(journal);
+ final TocReader tocReader = new StandardTocReader(QueryUtils.getTocFile(journal))) {
+
+ final long lastBlockOffset = tocReader.getLastBlockOffset();
+ final ProvenanceEventRecord lastEvent = reader.getLastEvent(lastBlockOffset);
+ if ( lastEvent != null ) {
+ maxEventId = lastEvent.getEventId() + 1;
+ break;
+ }
+ } catch (final EOFException eof) {}
+ }
+
+ // We need to re-index all of the journal files that have not been indexed. We can do this by determining
+ // what is the largest event id that has been indexed for this container and section, and then re-indexing
+ // any file that has an event with an id larger than that.
+ // In order to do that, we iterate over the journal files in the order of newest (largest id) to oldest
+ // (smallest id). If the first event id in a file is greater than the max indexed, we re-index the file.
+ // Beyond that, we need to re-index one additional journal file because it's possible that if the first id
+ // is 10 and the max index id is 15, the file containing 10 could also go up to 20. So we re-index one
+ // file that has a min id less than what has been indexed; then we are done.
+ final Long maxIndexedId = indexManager.getMaxEventId(containerName, String.valueOf(sectionIndex));
+ final List<File> reindexJournals = new ArrayList<>();
+ for ( final File journalFile : journalFiles ) {
+ final Long firstEventId;
try {
- reindex(latestJournal);
- } catch (final EOFException eof) {
+ firstEventId = getJournalId(journalFile);
+ } catch (final NumberFormatException nfe) {
+ // not a journal; skip this file
+ continue;
}
+
+ if ( maxIndexedId == null || firstEventId > maxIndexedId ) {
+ reindexJournals.add(journalFile);
+ } else {
+ reindexJournals.add(journalFile);
+ break;
+ }
+ }
+
+ // Make sure that the indexes are not pointing to events that no longer exist.
+ if ( journalFiles.isEmpty() ) {
+ indexManager.deleteEventsBefore(containerName, sectionIndex, Long.MAX_VALUE);
+ } else {
+ final File firstJournalFile = journalFiles.get(0);
+ indexManager.deleteEventsBefore(containerName, sectionIndex, getJournalId(firstJournalFile));
+ }
+
+ // The reindexJournals list is currently in order of newest to oldest. We need to re-index
+ // in order of oldest to newest, so reverse the list.
+ Collections.reverse(reindexJournals);
+
+ logger.info("Reindexing {} journal files that were not found in index for container {} and section {}", reindexJournals.size(), containerName, sectionIndex);
+ final long reindexStart = System.nanoTime();
+ for ( final File journalFile : reindexJournals ) {
+ indexManager.reindex(containerName, sectionIndex, getJournalId(journalFile), journalFile);
}
+ final long reindexMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - reindexStart);
+ logger.info("Finished reindexing {} journal files for container {} and section {}; reindex took {} millis",
+ reindexJournals.size(), containerName, sectionIndex, reindexMillis);
}
}
} finally {
- writeLock.unlock();
+ lock.unlock();
}
}
-
- private void reindex(final File journalFile) throws IOException {
- // TODO: Rework how recovery works because we now have index manager!!
- try (final TocJournalReader reader = new TocJournalReader(containerName, String.valueOf(sectionIndex), String.valueOf(getJournalId(journalFile)), journalFile)) {
- // We don't know which index contains the data for this journal, so remove the journal
- // from both.
- for (final LuceneIndexWriter indexWriter : indexWriters ) {
- indexWriter.delete(containerName, String.valueOf(sectionIndex), String.valueOf(getJournalId(journalFile)));
- }
-
- long maxId = -1L;
- final List<JournaledProvenanceEvent> storedEvents = new ArrayList<>(1000);
- JournaledProvenanceEvent event;
- final LuceneIndexWriter indexWriter = indexWriters[0];
- while ((event = reader.nextJournaledEvent()) != null ) {
- storedEvents.add(event);
- maxId = event.getEventId();
-
- if ( storedEvents.size() == 1000 ) {
- indexWriter.index(storedEvents);
- storedEvents.clear();
- }
- }
-
- if ( !storedEvents.isEmpty() ) {
- indexWriter.index(storedEvents);
- }
-
- indexWriter.sync();
- this.maxEventId = maxId;
- }
- }
@Override
@@ -443,4 +517,129 @@ public class JournalingPartition implements Partition {
public String toString() {
return "Partition[section=" + sectionIndex + "]";
}
+
+ @Override
+ public void verifyWritable(final long nextId) throws IOException {
+ final long freeSpace = section.getFreeSpace();
+ final long freeMegs = freeSpace / 1024 / 1024;
+ if (freeMegs < 10) {
+ // if not at least 10 MB, don't even try to write
+ throw new IOException("Not Enough Disk Space: partition housing " + section + " has only " + freeMegs + " MB of storage available");
+ }
+
+ rollover(nextId);
+ writable = true;
+ }
+
+ private boolean delete(final File journalFile) {
+ for (int i=0; i < 10; i++) {
+ if ( journalFile.delete() || !journalFile.exists() ) {
+ return true;
+ } else {
+ try {
+ Thread.sleep(100L);
+ } catch (final InterruptedException ie) {}
+ }
+ }
+
+ return false;
+ }
+
+ @Override
+ public void deleteOldEvents(final long earliestEventTimeToDelete) throws IOException {
+ final Set<File> removeFromTimeOrdered = new HashSet<>();
+
+ final long start = System.nanoTime();
+ try {
+ for ( final File journalFile : timeOrderedJournalFiles ) {
+ // since these are time-ordered, if we find one that we don't want to delete, we're done.
+ if ( journalFile.lastModified() < earliestEventTimeToDelete ) {
+ return;
+ }
+
+ final long journalSize;
+ if ( journalFile.exists() ) {
+ journalSize = journalFile.length();
+ } else {
+ continue;
+ }
+
+ if ( delete(journalFile) ) {
+ removeFromTimeOrdered.add(journalFile);
+ } else {
+ logger.warn("Failed to remove expired journal file {}; will attempt to delete again later", journalFile);
+ }
+
+ updateSize(-journalSize);
+ final File tocFile = QueryUtils.getTocFile(journalFile);
+ if ( !delete(tocFile) ) {
+ logger.warn("Failed to remove TOC file for expired journal file {}; will attempt to delete again later", journalFile);
+ }
+ }
+ } finally {
+ timeOrderedJournalFiles.removeAll(removeFromTimeOrdered);
+ }
+
+ final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+ logger.info("Removed {} expired journal files from container {}, section {}; total time for deletion was {} millis",
+ removeFromTimeOrdered.size(), containerName, sectionIndex, millis);
+ }
+
+
+ @Override
+ public void deleteOldest() throws IOException {
+ File removeFromTimeOrdered = null;
+
+ final long start = System.nanoTime();
+ try {
+ for ( final File journalFile : timeOrderedJournalFiles ) {
+ final long journalSize;
+ if ( journalFile.exists() ) {
+ journalSize = journalFile.length();
+ } else {
+ continue;
+ }
+
+ if ( delete(journalFile) ) {
+ removeFromTimeOrdered = journalFile;
+ } else {
+ throw new IOException("Cannot delete oldest event file " + journalFile);
+ }
+
+ final File tocFile = QueryUtils.getTocFile(journalFile);
+ if ( !delete(tocFile) ) {
+ logger.warn("Failed to remove TOC file for expired journal file {}; will attempt to delete again later", journalFile);
+ }
+
+ updateSize(-journalSize);
+ indexManager.deleteEvents(containerName, sectionIndex, getJournalId(journalFile));
+ }
+ } finally {
+ if ( removeFromTimeOrdered != null ) {
+ timeOrderedJournalFiles.remove(removeFromTimeOrdered);
+
+ final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+ logger.info("Removed oldest event file {} from container {}, section {}; total time for deletion was {} millis",
+ removeFromTimeOrdered, containerName, sectionIndex, millis);
+ } else {
+ logger.debug("No journals to remove for {}", this);
+ }
+ }
+ }
+
+
+ @Override
+ public long getPartitionSize() {
+ return partitionSize.get();
+ }
+
+ @Override
+ public long getContainerSize() {
+ return containerSize.get();
+ }
+
+ @Override
+ public String getContainerName() {
+ return containerName;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f23f36d7/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/Partition.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/Partition.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/Partition.java
index e77c8d5..9efae04 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/Partition.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/Partition.java
@@ -66,6 +66,12 @@ public interface Partition {
long getMaxEventId();
/**
+ * Returns the name of the container that this Partition operates on
+ * @return
+ */
+ String getContainerName();
+
+ /**
* Returns the locations of events that have an id at least equal to minEventId, returning the events
* with the smallest ID's possible that are greater than minEventId
*
@@ -82,4 +88,39 @@ public interface Partition {
* @throws IOException
*/
Long getEarliestEventTime() throws IOException;
+
+ /**
+ * Verifies that the partition is able to be written to. A Partition may need to create a new journal
+ * in order to verify. In this case, the nextId is provided so that the Partition knows the minimum event id
+ * that will be written to the partition
+ *
+ * @throws IOException
+ */
+ void verifyWritable(long nextId) throws IOException;
+
+
+ /**
+ * Deletes any journal for this partition that occurred before the given time
+ * @param earliestEventTimeToDelete
+ * @throws IOException
+ */
+ void deleteOldEvents(long earliestEventTimeToDelete) throws IOException;
+
+ /**
+ * Returns the size of this partition in bytes
+ * @return
+ */
+ long getPartitionSize();
+
+ /**
+ * Returns the size of the journals in the entire container
+ * @return
+ */
+ long getContainerSize();
+
+ /**
+ * Deletes the oldest journal from this partition
+ * @throws IOException
+ */
+ void deleteOldest() throws IOException;
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f23f36d7/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/PartitionManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/PartitionManager.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/PartitionManager.java
index c0a56c4..14d5b17 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/PartitionManager.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/PartitionManager.java
@@ -52,10 +52,23 @@ public interface PartitionManager {
* @param writeAction specifies whether or not the action writes to the repository
* @return
*/
- <T> Set<T> withEachPartition(PartitionAction<T> action) throws IOException;
+// <T> Set<T> withEachPartition(PartitionAction<T> action, boolean writeAction) throws IOException;
/**
- * Performs the given Action on each partition and returns the set of results. Unlike
+ * Performs the given Action on each partition and returns the set of results. This method does
+ * not use the thread pool in order to perform the request in parallel. This is desirable for
+ * very quick functions, as the thread pool can be fully utilized, resulting in a quick function
+ * taking far longer than it should.
+ *
+ * @param action the action to perform
+ * @param writeAction specifies whether or not the action writes to the repository
+ * @return
+ */
+ <T> Set<T> withEachPartitionSerially(PartitionAction<T> action, boolean writeAction) throws IOException;
+
+
+ /**
+ * Performs the given Action on each partition. Unlike
* {@link #withEachPartition(PartitionAction))}, this method does not use the thread pool
* in order to perform the request in parallel. This is desirable for very quick functions,
* as the thread pool can be fully utilized, resulting in a quick function taking far longer
@@ -65,7 +78,7 @@ public interface PartitionManager {
* @param writeAction specifies whether or not the action writes to the repository
* @return
*/
- <T> Set<T> withEachPartitionSerially(PartitionAction<T> action) throws IOException;
+ void withEachPartitionSerially(VoidPartitionAction action, boolean writeAction) throws IOException;
/**
* Performs the given Action to each partition, optionally waiting for the action to complete
@@ -74,7 +87,13 @@ public interface PartitionManager {
* @param async if <code>true</code>, will perform the action asynchronously; if <code>false</code>, will
* wait for the action to complete before returning
*/
- void withEachPartition(VoidPartitionAction action, boolean async);
+// void withEachPartition(VoidPartitionAction action, boolean async);
void shutdown();
+
+ /**
+ * Triggers the Partition Manager to delete events from journals and indices based on the sizes of the containers
+ * and overall size of the repository
+ */
+ void deleteEventsBasedOnSize();
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f23f36d7/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/QueuingPartitionManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/QueuingPartitionManager.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/QueuingPartitionManager.java
index 10af697..9b5c442 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/QueuingPartitionManager.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/QueuingPartitionManager.java
@@ -19,19 +19,18 @@ package org.apache.nifi.provenance.journaling.partition;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.nifi.provenance.journaling.config.JournalingRepositoryConfig;
import org.apache.nifi.provenance.journaling.index.IndexManager;
@@ -43,16 +42,26 @@ public class QueuingPartitionManager implements PartitionManager {
private static final Logger logger = LoggerFactory.getLogger(QueuingPartitionManager.class);
+ private final IndexManager indexManager;
private final JournalingRepositoryConfig config;
private final BlockingQueue<Partition> partitionQueue;
private final JournalingPartition[] partitionArray;
- private final ExecutorService executor;
+ private final AtomicLong eventIdGenerator;
private volatile boolean shutdown = false;
- private final AtomicInteger blacklistedCount = new AtomicInteger(0);
+ private final Set<Partition> blackListedPartitions = Collections.synchronizedSet(new HashSet<Partition>());
- public QueuingPartitionManager(final IndexManager indexManager, final JournalingRepositoryConfig config, final ExecutorService executor) throws IOException {
+ public QueuingPartitionManager(final IndexManager indexManager, final AtomicLong eventIdGenerator, final JournalingRepositoryConfig config, final ScheduledExecutorService workerExecutor, final ExecutorService compressionExecutor) throws IOException {
+ this.indexManager = indexManager;
this.config = config;
+ this.eventIdGenerator = eventIdGenerator;
+
+ // We can consider using a PriorityQueue here instead. Keep track of how many Partitions are being written
+ // to for each container, as a container usually maps to a physical drive. Then, prioritize the queue
+ // so that the partitions that belong to Container A get a higher priority than those belonging to Container B
+ // if there are currently more partitions on Container B being written to (i.e., we prefer a partition for the
+ // container that is the least used at this moment). Would require significant performance testing to see if it
+ // really provides any benefit.
this.partitionQueue = new LinkedBlockingQueue<>(config.getPartitionCount());
this.partitionArray = new JournalingPartition[config.getPartitionCount()];
@@ -61,16 +70,24 @@ public class QueuingPartitionManager implements PartitionManager {
containerTuples.add(new Tuple<>(entry.getKey(), entry.getValue()));
}
+ final Map<String, AtomicLong> containerSizes = new HashMap<>();
+ for ( final String containerName : config.getContainers().keySet() ) {
+ containerSizes.put(containerName, new AtomicLong(0L));
+ }
+
for (int i=0; i < config.getPartitionCount(); i++) {
final Tuple<String, File> tuple = containerTuples.get(i % containerTuples.size());
final File section = new File(tuple.getValue(), String.valueOf(i));
- final JournalingPartition partition = new JournalingPartition(indexManager, tuple.getKey(), i, section, config, executor);
+ final String containerName = tuple.getKey();
+ final JournalingPartition partition = new JournalingPartition(indexManager, containerName, i,
+ section, config, containerSizes.get(containerName), compressionExecutor);
+ partition.restore();
partitionQueue.offer(partition);
partitionArray[i] = partition;
}
- this.executor = executor;
+ workerExecutor.scheduleWithFixedDelay(new CheckBlackListedPartitions(), 30, 30, TimeUnit.SECONDS);
}
@Override
@@ -82,32 +99,63 @@ public class QueuingPartitionManager implements PartitionManager {
}
}
- private Partition nextPartition() {
+ private Partition nextPartition(final boolean writeAction) {
Partition partition = null;
- while(partition == null) {
- if (shutdown) {
- throw new RuntimeException("Journaling Provenance Repository is shutting down");
- }
-
- try {
- partition = partitionQueue.poll(1, TimeUnit.SECONDS);
- } catch (final InterruptedException ie) {
- }
-
- if ( partition == null ) {
- if ( blacklistedCount.get() >= config.getPartitionCount() ) {
- throw new RuntimeException("Cannot persist to the Journal Provenance Repository because all partitions have been blacklisted due to write failures");
+ final List<Partition> partitionsSkipped = new ArrayList<>();
+ try {
+ while (partition == null) {
+ if (shutdown) {
+ throw new RuntimeException("Journaling Provenance Repository is shutting down");
+ }
+
+ try {
+ partition = partitionQueue.poll(1, TimeUnit.SECONDS);
+ } catch (final InterruptedException ie) {
+ }
+
+ if ( partition == null ) {
+ if ( blackListedPartitions.size() >= config.getPartitionCount() ) {
+ throw new RuntimeException("Cannot persist to the Journal Provenance Repository because all partitions have been blacklisted due to write failures");
+ }
+
+ // we are out of partitions. Add back all of the partitions that we skipped so we
+ // can try them again.
+ partitionQueue.addAll(partitionsSkipped);
+ partitionsSkipped.clear();
+ } else if (writeAction) {
+ // determine if the container is full.
+ final String containerName = partition.getContainerName();
+ long desiredMaxContainerCapacity = config.getMaxCapacity(containerName);
+
+ // If no max capacity set for the container itself, use 1/N of repo max
+ // where N is the number of containers
+ if ( desiredMaxContainerCapacity == config.getMaxStorageCapacity() ) {
+ desiredMaxContainerCapacity = config.getMaxStorageCapacity() / config.getContainers().size();
+ }
+
+ // if the partition is more than 10% over its desired capacity, we don't want to write to it.
+ if ( partition.getContainerSize() > 1.1 * desiredMaxContainerCapacity ) {
+ partitionsSkipped.add(partition);
+ continue;
+ }
}
}
+ } finally {
+ partitionQueue.addAll( partitionsSkipped );
}
return partition;
}
+
+ private void blackList(final Partition partition) {
+ blackListedPartitions.add(partition);
+ }
+
@Override
public <T> T withPartition(final PartitionAction<T> action, final boolean writeAction) throws IOException {
- final Partition partition = nextPartition();
+ final Partition partition = nextPartition(writeAction);
boolean ioe = false;
try {
@@ -117,8 +165,7 @@ public class QueuingPartitionManager implements PartitionManager {
throw e;
} finally {
if ( ioe && writeAction ) {
- // We failed to write to this Partition. This partition will no longer be usable until NiFi is restarted!
- blacklistedCount.incrementAndGet();
+ blackList(partition);
} else {
partitionQueue.offer(partition);
}
@@ -127,7 +174,7 @@ public class QueuingPartitionManager implements PartitionManager {
@Override
public void withPartition(final VoidPartitionAction action, final boolean writeAction) throws IOException {
- final Partition partition = nextPartition();
+ final Partition partition = nextPartition(writeAction);
boolean ioe = false;
try {
@@ -137,8 +184,7 @@ public class QueuingPartitionManager implements PartitionManager {
throw e;
} finally {
if ( ioe && writeAction ) {
- // We failed to write to this Partition. This partition will no longer be usable until NiFi is restarted!
- blacklistedCount.incrementAndGet();
+ blackList(partition);
} else {
partitionQueue.offer(partition);
}
@@ -146,89 +192,234 @@ public class QueuingPartitionManager implements PartitionManager {
}
+// @Override
+// public <T> Set<T> withEachPartition(final PartitionAction<T> action) throws IOException {
+// if ( writeAction && blackListedPartitions.size() > 0 ) {
+// throw new IOException("Cannot perform action {} because at least one partition has been blacklisted (i.e., writint to the partition failed)");
+// }
+//
+// final Set<T> results = new HashSet<>(partitionArray.length);
+//
+// final Map<Partition, Future<T>> futures = new HashMap<>(partitionArray.length);
+// for ( final Partition partition : partitionArray ) {
+// final Callable<T> callable = new Callable<T>() {
+// @Override
+// public T call() throws Exception {
+// return action.perform(partition);
+// }
+// };
+//
+// final Future<T> future = executor.submit(callable);
+// futures.put(partition, future);
+// }
+//
+// for ( final Map.Entry<Partition, Future<T>> entry : futures.entrySet() ) {
+// try {
+// final T result = entry.getValue().get();
+// results.add(result);
+// } catch (final ExecutionException ee) {
+// final Throwable cause = ee.getCause();
+// if ( cause instanceof IOException ) {
+// throw (IOException) cause;
+// } else {
+// throw new RuntimeException("Failed to query Partition " + entry.getKey() + " due to " + cause, cause);
+// }
+// } catch (InterruptedException e) {
+// throw new RuntimeException(e);
+// }
+// }
+//
+// return results;
+// }
+
@Override
- public <T> Set<T> withEachPartition(final PartitionAction<T> action) throws IOException {
- final Set<T> results = new HashSet<>(partitionArray.length);
-
- // TODO: Do not use blacklisted partitions.
- final Map<Partition, Future<T>> futures = new HashMap<>(partitionArray.length);
- for ( final Partition partition : partitionArray ) {
- final Callable<T> callable = new Callable<T>() {
- @Override
- public T call() throws Exception {
- return action.perform(partition);
- }
- };
-
- final Future<T> future = executor.submit(callable);
- futures.put(partition, future);
+ public <T> Set<T> withEachPartitionSerially(final PartitionAction<T> action, final boolean writeAction) throws IOException {
+ if ( writeAction && blackListedPartitions.size() > 0 ) {
+ throw new IOException("Cannot perform action {} because at least one partition has been blacklisted (i.e., writint to the partition failed)");
}
- for ( final Map.Entry<Partition, Future<T>> entry : futures.entrySet() ) {
- try {
- final T result = entry.getValue().get();
- results.add(result);
- } catch (final ExecutionException ee) {
- final Throwable cause = ee.getCause();
- if ( cause instanceof IOException ) {
- throw (IOException) cause;
- } else {
- throw new RuntimeException("Failed to query Partition " + entry.getKey() + " due to " + cause, cause);
- }
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
+ final Set<T> results = new HashSet<>(partitionArray.length);
+ for ( final Partition partition : partitionArray ) {
+ results.add( action.perform(partition) );
}
return results;
}
@Override
- public <T> Set<T> withEachPartitionSerially(final PartitionAction<T> action) throws IOException {
- // TODO: Do not use blacklisted partitions.
- final Set<T> results = new HashSet<>(partitionArray.length);
+ public void withEachPartitionSerially(final VoidPartitionAction action, final boolean writeAction) throws IOException {
+ if ( writeAction && blackListedPartitions.size() > 0 ) {
+ throw new IOException("Cannot perform action {} because at least one partition has been blacklisted (i.e., writint to the partition failed)");
+ }
+
for ( final Partition partition : partitionArray ) {
- results.add( action.perform(partition) );
+ action.perform(partition);
+ }
+ }
+
+// @Override
+// public void withEachPartition(final VoidPartitionAction action, final boolean async) {
+// // TODO: skip blacklisted partitions
+// final Map<Partition, Future<?>> futures = new HashMap<>(partitionArray.length);
+// for ( final Partition partition : partitionArray ) {
+// final Runnable runnable = new Runnable() {
+// @Override
+// public void run() {
+// try {
+// action.perform(partition);
+// } catch (final Throwable t) {
+// logger.error("Failed to perform action against " + partition + " due to " + t);
+// if ( logger.isDebugEnabled() ) {
+// logger.error("", t);
+// }
+// }
+// }
+// };
+//
+// final Future<?> future = executor.submit(runnable);
+// futures.put(partition, future);
+// }
+//
+// if ( !async ) {
+// for ( final Map.Entry<Partition, Future<?>> entry : futures.entrySet() ) {
+// try {
+// // throw any exception thrown by runnable
+// entry.getValue().get();
+// } catch (final ExecutionException ee) {
+// final Throwable cause = ee.getCause();
+// throw new RuntimeException("Failed to query Partition " + entry.getKey() + " due to " + cause, cause);
+// } catch (InterruptedException e) {
+// throw new RuntimeException(e);
+// }
+// }
+// }
+// }
+
+ private long getTotalSize() {
+ long totalSize = 0L;
+
+ for ( final JournalingPartition partition : partitionArray ) {
+ totalSize += partition.getPartitionSize();
}
- return results;
+ for ( final String containerName : config.getContainers().keySet() ) {
+ totalSize += indexManager.getSize(containerName);
+ }
+
+ return totalSize;
+ }
+
+
+ /**
+ * Responsible for looking at partitions that have been marked as blacklisted and checking if they
+ * are able to be written to now. If so, adds them back to the partition queue; otherwise, leaves
+ * them as blacklisted
+ */
+ private class CheckBlackListedPartitions implements Runnable {
+ @Override
+ public void run() {
+ final Set<Partition> reclaimed = new HashSet<>();
+
+ final Set<Partition> partitions = new HashSet<>(blackListedPartitions);
+ for ( final Partition partition : partitions ) {
+ final long nextId = eventIdGenerator.get();
+ if ( nextId <= 0 ) {
+ // we don't have an ID to use yet. Don't attempt to do anything yet.
+ return;
+ }
+
+ try {
+ partition.verifyWritable(nextId);
+ reclaimed.add(partition);
+ } catch (final IOException ioe) {
+ logger.debug("{} is still blackListed due to {}", partition, ioe);
+ }
+ }
+
+ // any partition that is reclaimable is now removed from the set of blacklisted
+ // partitions and added back to our queue of partitions
+ blackListedPartitions.removeAll(reclaimed);
+ partitionQueue.addAll(reclaimed);
+ }
}
+
@Override
- public void withEachPartition(final VoidPartitionAction action, final boolean async) {
- // TODO: Do not use blacklisted partitions.
- final Map<Partition, Future<?>> futures = new HashMap<>(partitionArray.length);
- for ( final Partition partition : partitionArray ) {
- final Runnable runnable = new Runnable() {
- @Override
- public void run() {
+ public void deleteEventsBasedOnSize() {
+ final Map<String, List<JournalingPartition>> containerPartitionMap = new HashMap<>();
+
+ for ( final JournalingPartition partition : partitionArray ) {
+ final String container = partition.getContainerName();
+ List<JournalingPartition> list = containerPartitionMap.get(container);
+ if ( list == null ) {
+ list = new ArrayList<>();
+ containerPartitionMap.put(container, list);
+ }
+
+ list.add(partition);
+ }
+
+ int iterations = 0;
+ for ( final String containerName : config.getContainers().keySet() ) {
+ // continue as long as we need to delete data from this container.
+ while (true) {
+ // don't hammer the disks if we can't delete anything
+ if ( iterations++ > 0 ) {
+ try {
+ Thread.sleep(1000L);
+ } catch (final InterruptedException ie) {}
+ }
+
+ final List<JournalingPartition> containerPartitions = containerPartitionMap.get(containerName);
+ final long containerSize = containerPartitions.get(0).getContainerSize();
+ final long maxContainerCapacity = config.getMaxCapacity(containerName);
+ if ( containerSize < maxContainerCapacity ) {
+ break;
+ }
+
+ logger.debug("Container {} exceeds max capacity of {} bytes with a size of {} bytes; deleting oldest events", containerName, maxContainerCapacity, containerSize);
+
+ // container is too large. Delete oldest journal from each partition in this container.
+ for ( final Partition partition : containerPartitions ) {
try {
- action.perform(partition);
- } catch (final Throwable t) {
- logger.error("Failed to perform action against " + partition + " due to " + t);
+ partition.deleteOldest();
+ } catch (final IOException ioe) {
+ logger.error("Failed to delete events from {} due to {}", partition, ioe.toString());
if ( logger.isDebugEnabled() ) {
- logger.error("", t);
+ logger.error("", ioe);
}
}
}
- };
-
- final Future<?> future = executor.submit(runnable);
- futures.put(partition, future);
+ }
}
- if ( !async ) {
- for ( final Map.Entry<Partition, Future<?>> entry : futures.entrySet() ) {
+ long totalSize;
+ iterations = 0;
+ while ((totalSize = getTotalSize()) >= config.getMaxStorageCapacity()) {
+ logger.debug("Provenance Repository exceeds max capacity of {} bytes with a size of {}; deleting oldest events", config.getMaxStorageCapacity(), totalSize);
+
+ // don't hammer the disks if we can't delete anything
+ if ( iterations++ > 0 ) {
try {
- // throw any exception thrown by runnable
- entry.getValue().get();
- } catch (final ExecutionException ee) {
- final Throwable cause = ee.getCause();
- throw new RuntimeException("Failed to query Partition " + entry.getKey() + " due to " + cause, cause);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
+ Thread.sleep(1000L);
+ } catch (final InterruptedException ie) {}
+ }
+
+ for ( final Partition partition : partitionArray ) {
+ try {
+ partition.deleteOldest();
+ } catch (final IOException ioe) {
+ logger.error("Failed to delete events from {} due to {}", partition, ioe.toString());
+ if ( logger.isDebugEnabled() ) {
+ logger.error("", ioe);
+ }
}
}
+
+ // don't hammer the disks if we can't delete anything
+ try {
+ Thread.sleep(1000L);
+ } catch (final InterruptedException ie) {}
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f23f36d7/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/query/QueryManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/query/QueryManager.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/query/QueryManager.java
index 4edc6ad..c5516aa 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/query/QueryManager.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/query/QueryManager.java
@@ -16,10 +16,14 @@
*/
package org.apache.nifi.provenance.journaling.query;
+import java.io.Closeable;
+
+import org.apache.nifi.provenance.ProvenanceEventRepository;
+import org.apache.nifi.provenance.lineage.ComputeLineageSubmission;
import org.apache.nifi.provenance.search.Query;
import org.apache.nifi.provenance.search.QuerySubmission;
-public interface QueryManager {
+public interface QueryManager extends Closeable {
/**
* Submits an asynchronous request to process the given query, returning an
* identifier that can be used to fetch the results at a later time
@@ -39,4 +43,51 @@ public interface QueryManager {
* @return
*/
QuerySubmission retrieveQuerySubmission(String queryIdentifier);
+
+ /**
+ * Returns the {@link ComputeLineageSubmission} associated with the given
+ * identifier
+ *
+ * @param lineageIdentifier
+ * @return
+ */
+ ComputeLineageSubmission retrieveLineageSubmission(String lineageIdentifier);
+
+ /**
+ * Submits a Lineage Computation to be completed and returns the
+ * AsynchronousLineageResult that indicates the status of the request and
+ * the results, if the computation is complete.
+ *
+ * @param flowFileUuid the UUID of the FlowFile for which the Lineage should
+ * be calculated
+ * @return a {@link ComputeLineageSubmission} object that can be used to
+ * check if the computing is complete and if so get the results
+ */
+ ComputeLineageSubmission submitLineageComputation(String flowFileUuid);
+
+ /**
+ * Submits a request to expand the parents of the event with the given id
+ *
+ * @param eventRepo the repository from which to obtain the event information
+ * @param eventId the one-up id of the Event to expand
+ * @return
+ *
+ * @throws IllegalArgumentException if the given identifier identifies a
+ * Provenance Event that has a Type that is not expandable or if the
+ * identifier cannot be found
+ */
+ ComputeLineageSubmission submitExpandParents(final ProvenanceEventRepository eventRepo, long eventId);
+
+ /**
+ * Submits a request to expand the children of the event with the given id
+ *
+ * @param eventRepo the repository from which to obtain the event information
+ * @param eventId the one-up id of the Event
+ * @return
+ *
+ * @throws IllegalArgumentException if the given identifier identifies a
+ * Provenance Event that has a Type that is not expandable or if the
+ * identifier cannot be found
+ */
+ ComputeLineageSubmission submitExpandChildren(final ProvenanceEventRepository eventRepo, long eventId);
}