You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/02/27 21:47:33 UTC

[06/12] incubator-nifi git commit: bug fixes and additional pieces of repo implemented

bug fixes and additional pieces of repo implemented


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/b95e7569
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/b95e7569
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/b95e7569

Branch: refs/heads/journaling-prov-repo
Commit: b95e7569f75c7beb2fd214d3304ae9630f4a8545
Parents: a68bef6
Author: Mark Payne <ma...@hotmail.com>
Authored: Thu Feb 12 19:19:37 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Thu Feb 12 19:19:37 2015 -0500

----------------------------------------------------------------------
 .../apache/nifi/provenance/EventIdLocation.java |  31 +++
 .../provenance/IdEnrichedProvenanceEvent.java   | 175 ++++++++++++++++
 .../nifi/provenance/StandardLineageResult.java  |   2 +-
 .../org/apache/nifi/util/file/FileUtils.java    |   1 -
 .../MockProvenanceEventRepository.java          |  41 +++-
 .../manager/impl/ClusteredEventAccess.java      |  32 ++-
 .../repository/StandardProcessSession.java      | 164 ++++++---------
 .../nifi/controller/tasks/ExpireFlowFiles.java  |  27 ++-
 .../repository/TestStandardProcessSession.java  |  13 +-
 .../nifi/web/controller/ControllerFacade.java   |   6 +-
 .../pom.xml                                     |   5 +
 .../JournalingProvenanceRepository.java         |  31 +--
 .../config/JournalingRepositoryConfig.java      |   2 +-
 .../journals/StandardJournalReader.java         |   2 +-
 .../partition/JournalingPartition.java          |   6 +
 .../partition/QueuingPartitionManager.java      |  39 +++-
 .../journaling/query/QueryManager.java          |  42 ++++
 .../journaling/query/StandardQueryManager.java  | 144 ++++++++++++++
 ...he.nifi.provenance.ProvenanceEventRepository |  15 ++
 .../TestJournalingProvenanceRepository.java     | 144 ++++++++++++++
 .../nifi/provenance/journaling/TestUtil.java    |   8 +
 .../PersistentProvenanceRepository.java         |  80 ++++++--
 .../nifi/provenance/lucene/DocsReader.java      |  21 +-
 .../nifi/provenance/lucene/IndexSearch.java     |   8 +-
 .../nifi/provenance/lucene/LineageQuery.java    |  11 +-
 .../TestPersistentProvenanceRepository.java     |  44 ++---
 .../nifi-provenance-repository-nar/pom.xml      |   4 +
 .../VolatileProvenanceRepository.java           | 198 +------------------
 .../nifi-provenance-repository-bundle/pom.xml   |   6 +
 29 files changed, 909 insertions(+), 393 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b95e7569/nifi/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/EventIdLocation.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/EventIdLocation.java b/nifi/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/EventIdLocation.java
new file mode 100644
index 0000000..9cc6c4d
--- /dev/null
+++ b/nifi/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/EventIdLocation.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance;
+
+import org.apache.nifi.provenance.StorageLocation;
+
+public class EventIdLocation implements StorageLocation {
+    private final long id;
+
+    public EventIdLocation(final long id) {
+        this.id = id;
+    }
+    
+    public long getId() {
+        return id;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b95e7569/nifi/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/IdEnrichedProvenanceEvent.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/IdEnrichedProvenanceEvent.java b/nifi/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/IdEnrichedProvenanceEvent.java
new file mode 100644
index 0000000..4ef0e5d
--- /dev/null
+++ b/nifi/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/IdEnrichedProvenanceEvent.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.provenance.StorageLocation;
+import org.apache.nifi.provenance.StoredProvenanceEvent;
+
+public class IdEnrichedProvenanceEvent implements StoredProvenanceEvent {
+
+    private final ProvenanceEventRecord event;
+    private final long id;
+    
+    public IdEnrichedProvenanceEvent(final ProvenanceEventRecord event) {
+        this(event, event.getEventId());
+    }
+    
+    public IdEnrichedProvenanceEvent(final ProvenanceEventRecord event, final long id) {
+        this.event = event;
+        this.id = id;
+    }
+    
+    @Override
+    public StorageLocation getStorageLocation() {
+        return new EventIdLocation(id);
+    }
+
+    public long getEventId() {
+        return id;
+    }
+
+    public long getEventTime() {
+        return event.getEventTime();
+    }
+
+    public long getFlowFileEntryDate() {
+        return event.getFlowFileEntryDate();
+    }
+
+    public long getLineageStartDate() {
+        return event.getLineageStartDate();
+    }
+
+    public Set<String> getLineageIdentifiers() {
+        return event.getLineageIdentifiers();
+    }
+
+    public long getFileSize() {
+        return event.getFileSize();
+    }
+
+    public Long getPreviousFileSize() {
+        return event.getPreviousFileSize();
+    }
+
+    public long getEventDuration() {
+        return event.getEventDuration();
+    }
+
+    public ProvenanceEventType getEventType() {
+        return event.getEventType();
+    }
+
+    public Map<String, String> getAttributes() {
+        return event.getAttributes();
+    }
+
+    public Map<String, String> getPreviousAttributes() {
+        return event.getPreviousAttributes();
+    }
+
+    public Map<String, String> getUpdatedAttributes() {
+        return event.getUpdatedAttributes();
+    }
+
+    public String getComponentId() {
+        return event.getComponentId();
+    }
+
+    public String getComponentType() {
+        return event.getComponentType();
+    }
+
+    public String getTransitUri() {
+        return event.getTransitUri();
+    }
+
+    public String getSourceSystemFlowFileIdentifier() {
+        return event.getSourceSystemFlowFileIdentifier();
+    }
+
+    public String getFlowFileUuid() {
+        return event.getFlowFileUuid();
+    }
+
+    public List<String> getParentUuids() {
+        return event.getParentUuids();
+    }
+
+    public List<String> getChildUuids() {
+        return event.getChildUuids();
+    }
+
+    public String getAlternateIdentifierUri() {
+        return event.getAlternateIdentifierUri();
+    }
+
+    public String getDetails() {
+        return event.getDetails();
+    }
+
+    public String getRelationship() {
+        return event.getRelationship();
+    }
+
+    public String getSourceQueueIdentifier() {
+        return event.getSourceQueueIdentifier();
+    }
+
+    public String getContentClaimSection() {
+        return event.getContentClaimSection();
+    }
+
+    public String getPreviousContentClaimSection() {
+        return event.getPreviousContentClaimSection();
+    }
+
+    public String getContentClaimContainer() {
+        return event.getContentClaimContainer();
+    }
+
+    public String getPreviousContentClaimContainer() {
+        return event.getPreviousContentClaimContainer();
+    }
+
+    public String getContentClaimIdentifier() {
+        return event.getContentClaimIdentifier();
+    }
+
+    public String getPreviousContentClaimIdentifier() {
+        return event.getPreviousContentClaimIdentifier();
+    }
+
+    public Long getContentClaimOffset() {
+        return event.getContentClaimOffset();
+    }
+
+    public Long getPreviousContentClaimOffset() {
+        return event.getPreviousContentClaimOffset();
+    }
+
+    @Override
+    public String toString() {
+        return event.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b95e7569/nifi/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardLineageResult.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardLineageResult.java b/nifi/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardLineageResult.java
index afb56e8..0f454bd 100644
--- a/nifi/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardLineageResult.java
+++ b/nifi/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardLineageResult.java
@@ -178,7 +178,7 @@ public class StandardLineageResult implements ComputeLineageResult {
         }
     }
 
-    public void update(final Collection<ProvenanceEventRecord> records) {
+    public void update(final Collection<StoredProvenanceEvent> records) {
         writeLock.lock();
         try {
             relevantRecords.addAll(records);

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b95e7569/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/file/FileUtils.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/file/FileUtils.java b/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/file/FileUtils.java
index 41a0557..71dbc79 100644
--- a/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/file/FileUtils.java
+++ b/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/file/FileUtils.java
@@ -16,7 +16,6 @@
  */
 package org.apache.nifi.util.file;
 
-import java.io.BufferedInputStream;
 import java.io.Closeable;
 import java.io.File;
 import java.io.FileInputStream;

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b95e7569/nifi/nifi-mock/src/main/java/org/apache/nifi/provenance/MockProvenanceEventRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-mock/src/main/java/org/apache/nifi/provenance/MockProvenanceEventRepository.java b/nifi/nifi-mock/src/main/java/org/apache/nifi/provenance/MockProvenanceEventRepository.java
index 241041a..c4caa71 100644
--- a/nifi/nifi-mock/src/main/java/org/apache/nifi/provenance/MockProvenanceEventRepository.java
+++ b/nifi/nifi-mock/src/main/java/org/apache/nifi/provenance/MockProvenanceEventRepository.java
@@ -18,6 +18,7 @@ package org.apache.nifi.provenance;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
@@ -30,11 +31,11 @@ import org.apache.nifi.provenance.search.SearchableField;
 
 public class MockProvenanceEventRepository implements ProvenanceEventRepository {
 
-    private final List<ProvenanceEventRecord> records = new ArrayList<>();
+    private final List<StoredProvenanceEvent> records = new ArrayList<>();
     private final AtomicLong idGenerator = new AtomicLong(0L);
 
     @Override
-    public void registerEvents(final Iterable<ProvenanceEventRecord> events) {
+    public void registerEvents(final Collection<ProvenanceEventRecord> events) {
         for (final ProvenanceEventRecord event : events) {
             registerEvent(event);
         }
@@ -50,7 +51,7 @@ public class MockProvenanceEventRepository implements ProvenanceEventRepository
         }
         newRecord.setEventId(idGenerator.getAndIncrement());
 
-        records.add(newRecord);
+        records.add(new IdEnrichedProvenanceEvent(newRecord));
     }
 
     @Override
@@ -58,7 +59,7 @@ public class MockProvenanceEventRepository implements ProvenanceEventRepository
     }
 
     @Override
-    public List<ProvenanceEventRecord> getEvents(long firstRecordId, int maxRecords) throws IOException {
+    public List<StoredProvenanceEvent> getEvents(long firstRecordId, int maxRecords) throws IOException {
         if (firstRecordId > records.size()) {
             return Collections.emptyList();
         }
@@ -92,7 +93,7 @@ public class MockProvenanceEventRepository implements ProvenanceEventRepository
     }
 
     @Override
-    public ProvenanceEventRecord getEvent(long id) throws IOException {
+    public StoredProvenanceEvent getEvent(long id) throws IOException {
         if (id > records.size()) {
             return null;
         }
@@ -128,4 +129,34 @@ public class MockProvenanceEventRepository implements ProvenanceEventRepository
     public ProvenanceEventBuilder eventBuilder() {
         return new StandardProvenanceEventRecord.Builder();
     }
+    
+    @Override
+    public Long getEarliestEventTime() throws IOException {
+        final StoredProvenanceEvent event = getEvent(0);
+        if ( event == null ) {
+            return null;
+        }
+        
+        return event.getEventTime();
+    }
+    
+    @Override
+    public StoredProvenanceEvent getEvent(final StorageLocation location) throws IOException {
+        if ( location instanceof EventIdLocation ) {
+            return getEvent( ((EventIdLocation) location).getId() );
+        }
+        throw new IllegalArgumentException("Invalid StorageLocation");
+    }
+    
+    @Override
+    public List<StoredProvenanceEvent> getEvents(final List<StorageLocation> storageLocations) throws IOException {
+        final List<StoredProvenanceEvent> events = new ArrayList<>(storageLocations.size());
+        for ( final StorageLocation location : storageLocations ) {
+            final StoredProvenanceEvent event = getEvent(location);
+            if ( event != null ) {
+                events.add(event);
+            }
+        }
+        return events;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b95e7569/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredEventAccess.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredEventAccess.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredEventAccess.java
index 2015530..7780d04 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredEventAccess.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredEventAccess.java
@@ -18,6 +18,8 @@ package org.apache.nifi.cluster.manager.impl;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 
 import org.apache.nifi.controller.status.ProcessGroupStatus;
@@ -25,6 +27,8 @@ import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.provenance.ProvenanceEventBuilder;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.ProvenanceEventRepository;
+import org.apache.nifi.provenance.StorageLocation;
+import org.apache.nifi.provenance.StoredProvenanceEvent;
 import org.apache.nifi.provenance.lineage.ComputeLineageSubmission;
 import org.apache.nifi.provenance.search.Query;
 import org.apache.nifi.provenance.search.QuerySubmission;
@@ -59,12 +63,12 @@ public class ClusteredEventAccess implements EventAccess {
             }
 
             @Override
-            public ProvenanceEventRecord getEvent(long eventId) throws IOException {
+            public StoredProvenanceEvent getEvent(long eventId) throws IOException {
                 return null;
             }
 
             @Override
-            public List<ProvenanceEventRecord> getEvents(long startEventId, int maxEvents) throws IOException {
+            public List<StoredProvenanceEvent> getEvents(long startEventId, int maxEvents) throws IOException {
                 return new ArrayList<>();
             }
 
@@ -88,10 +92,6 @@ public class ClusteredEventAccess implements EventAccess {
             }
 
             @Override
-            public void registerEvents(final Iterable<ProvenanceEventRecord> events) {
-            }
-
-            @Override
             public ComputeLineageSubmission retrieveLineageSubmission(final String submissionId) {
                 return null;
             }
@@ -130,6 +130,26 @@ public class ClusteredEventAccess implements EventAccess {
             public void initialize(EventReporter eventReporter) throws IOException {
 
             }
+            
+            @Override
+            public Long getEarliestEventTime() throws IOException {
+                return null;
+            }
+            
+            @Override
+            public StoredProvenanceEvent getEvent(final StorageLocation location) throws IOException {
+                return null;
+            }
+            
+            @Override
+            public List<StoredProvenanceEvent> getEvents(final List<StorageLocation> storageLocations) throws IOException {
+                return Collections.emptyList();
+            }
+            
+            @Override
+            public void registerEvents(final Collection<ProvenanceEventRecord> events) throws IOException {
+                
+            }
         };
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b95e7569/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index dcb461c..899fccc 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -28,11 +28,9 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
@@ -53,9 +51,6 @@ import org.apache.nifi.controller.repository.io.LimitedInputStream;
 import org.apache.nifi.controller.repository.io.LongHolder;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.stream.io.BufferedOutputStream;
-import org.apache.nifi.stream.io.NonCloseableInputStream;
-import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.FlowFileFilter;
 import org.apache.nifi.processor.ProcessSession;
@@ -74,6 +69,9 @@ import org.apache.nifi.provenance.ProvenanceEventRepository;
 import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.provenance.ProvenanceReporter;
 import org.apache.nifi.provenance.StandardProvenanceEventRecord;
+import org.apache.nifi.stream.io.BufferedOutputStream;
+import org.apache.nifi.stream.io.NonCloseableInputStream;
+import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.util.NiFiProperties;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -299,7 +297,12 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
         resetReadClaim();
 
         final long updateProvenanceStart = System.nanoTime();
-        updateProvenanceRepo(checkpoint);
+        try {
+            updateProvenanceRepo(checkpoint);
+        } catch (final IOException ioe) {
+            rollback();
+            throw new ProcessException("Provenance Repository failed to update", ioe);
+        }
 
         final long claimRemovalStart = System.nanoTime();
         final long updateProvenanceNanos = claimRemovalStart - updateProvenanceStart;
@@ -497,7 +500,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
         eventTypes.add(eventType);
     }
     
-    private void updateProvenanceRepo(final Checkpoint checkpoint) {
+    private void updateProvenanceRepo(final Checkpoint checkpoint) throws IOException {
         // Update Provenance Repository
         final ProvenanceEventRepository provenanceRepo = context.getProvenanceRepository();
 
@@ -641,46 +644,16 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
         }
 
         final List<ProvenanceEventRecord> autoTermEvents = checkpoint.autoTerminatedEvents;
-        final Iterable<ProvenanceEventRecord> iterable = new Iterable<ProvenanceEventRecord>() {
-            final Iterator<ProvenanceEventRecord> recordsToSubmitIterator = recordsToSubmit.iterator();
-            final Iterator<ProvenanceEventRecord> autoTermIterator = autoTermEvents == null ? null : autoTermEvents.iterator();
-
-            @Override
-            public Iterator<ProvenanceEventRecord> iterator() {
-                return new Iterator<ProvenanceEventRecord>() {
-                    @Override
-                    public boolean hasNext() {
-                        return recordsToSubmitIterator.hasNext() || (autoTermIterator != null && autoTermIterator.hasNext());
-                    }
-
-                    @Override
-                    public ProvenanceEventRecord next() {
-                        if (recordsToSubmitIterator.hasNext()) {
-                            final ProvenanceEventRecord rawEvent = recordsToSubmitIterator.next();
-
-                            // Update the Provenance Event Record with all of the info that we know about the event.
-                            // For SEND events, we do not want to update the FlowFile info on the Event, because the event should
-                            // reflect the FlowFile as it was sent to the remote system. However, for other events, we want to use
-                            // the representation of the FlowFile as it is committed, as this is the only way in which it really
-                            // exists in our system -- all other representations are volatile representations that have not been
-                            // exposed.
-                            return enrich(rawEvent, flowFileRecordMap, checkpoint.records, rawEvent.getEventType() != ProvenanceEventType.SEND);
-                        } else if (autoTermIterator != null && autoTermIterator.hasNext()) {
-                            return enrich(autoTermIterator.next(), flowFileRecordMap, checkpoint.records, true);
-                        }
-
-                        throw new NoSuchElementException();
-                    }
-
-                    @Override
-                    public void remove() {
-                        throw new UnsupportedOperationException();
-                    }
-                };
-            }
-        };
-
-        provenanceRepo.registerEvents(iterable);
+        
+        final List<ProvenanceEventRecord> enrichedEvents = new ArrayList<>();
+        for ( final ProvenanceEventRecord record : recordsToSubmit ) {
+            enrichedEvents.add(enrich(record, flowFileRecordMap, checkpoint.records, record.getEventType() != ProvenanceEventType.SEND));
+        }
+        for ( final ProvenanceEventRecord record : autoTermEvents ) {
+            enrichedEvents.add(enrich(record, flowFileRecordMap, checkpoint.records, true));
+        }
+        
+        provenanceRepo.registerEvents(enrichedEvents);
     }
 
     private void updateEventContentClaims(final ProvenanceEventBuilder builder, final FlowFile flowFile, final StandardRepositoryRecord repoRecord) {
@@ -1140,7 +1113,12 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
             final Connection conn = connections.get(context.getNextIncomingConnectionIndex() % connections.size());
             final Set<FlowFileRecord> expired = new HashSet<>();
             final FlowFileRecord flowFile = conn.getFlowFileQueue().poll(expired);
-            removeExpired(expired, conn);
+            
+            try {
+                removeExpired(expired, conn);
+            } catch (final IOException ioe) {
+                throw new ProcessException("Failed to update repositories to remove expired FlowFiles", ioe);
+            }
 
             if (flowFile != null) {
                 registerDequeuedRecord(flowFile, conn);
@@ -1201,7 +1179,11 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
             for (final Connection conn : connections) {
                 final Set<FlowFileRecord> expired = new HashSet<>();
                 final List<FlowFileRecord> newlySelected = poller.poll(conn.getFlowFileQueue(), expired);
-                removeExpired(expired, conn);
+                try {
+                    removeExpired(expired, conn);
+                } catch (final IOException ioe) {
+                    throw new ProcessException("Failed to update repositories to remove expired FlowFiles", ioe);
+                }
 
                 if (newlySelected.isEmpty() && expired.isEmpty()) {
                     continue;
@@ -1571,7 +1553,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
         }
     }
 
-    public void expireFlowFiles() {
+    public void expireFlowFiles() throws IOException {
         final Set<FlowFileRecord> expired = new HashSet<>();
         final FlowFileFilter filter = new FlowFileFilter() {
             @Override
@@ -1589,7 +1571,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
         }
     }
 
-    private void removeExpired(final Set<FlowFileRecord> flowFiles, final Connection connection) {
+    private void removeExpired(final Set<FlowFileRecord> flowFiles, final Connection connection) throws IOException {
         if (flowFiles.isEmpty()) {
             return;
         }
@@ -1612,7 +1594,31 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
         final Map<String, FlowFileRecord> recordIdMap = new HashMap<>();
         for (final FlowFileRecord flowFile : flowFiles) {
             recordIdMap.put(flowFile.getAttribute(CoreAttributes.UUID.key()), flowFile);
+        }
+        
+        final Set<ProvenanceEventRecord> expiredEvents = expiredReporter.getEvents();
+        final List<ProvenanceEventRecord> events = new ArrayList<>(expiredEvents.size());
+        for ( final ProvenanceEventRecord event : expiredEvents ) {
+            final StandardProvenanceEventRecord.Builder enriched = new StandardProvenanceEventRecord.Builder().fromEvent(event);
+            final FlowFileRecord record = recordIdMap.get(event.getFlowFileUuid());
+            if (record == null) {
+                continue;
+            }
+
+            final ContentClaim claim = record.getContentClaim();
+            if (claim != null) {
+                enriched.setCurrentContentClaim(claim.getContainer(), claim.getSection(), claim.getId(), record.getContentClaimOffset(), record.getSize());
+                enriched.setPreviousContentClaim(claim.getContainer(), claim.getSection(), claim.getId(), record.getContentClaimOffset(), record.getSize());
+            }
 
+            enriched.setAttributes(record.getAttributes(), Collections.<String, String>emptyMap());
+            events.add(enriched.build());
+        }
+        
+        context.getProvenanceRepository().registerEvents(events);
+        context.getFlowFileRepository().updateRepository(expiredRecords);
+        
+        for ( final FlowFileRecord flowFile : flowFiles ) {
             final StandardRepositoryRecord record = new StandardRepositoryRecord(connection.getFlowFileQueue(), flowFile);
             record.markForDelete();
             expiredRecords.add(record);
@@ -1623,53 +1629,6 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
             final Object terminator = (connectable instanceof ProcessorNode) ? ((ProcessorNode) connectable).getProcessor() : connectable;
             LOG.info("{} terminated by {} due to FlowFile expiration; life of FlowFile = {} ms", new Object[]{flowFile, terminator, flowFileLife});
         }
-
-        try {
-            final Iterable<ProvenanceEventRecord> iterable = new Iterable<ProvenanceEventRecord>() {
-                @Override
-                public Iterator<ProvenanceEventRecord> iterator() {
-                    final Iterator<ProvenanceEventRecord> expiredEventIterator = expiredReporter.getEvents().iterator();
-                    final Iterator<ProvenanceEventRecord> enrichingIterator = new Iterator<ProvenanceEventRecord>() {
-                        @Override
-                        public boolean hasNext() {
-                            return expiredEventIterator.hasNext();
-                        }
-
-                        @Override
-                        public ProvenanceEventRecord next() {
-                            final ProvenanceEventRecord event = expiredEventIterator.next();
-                            final StandardProvenanceEventRecord.Builder enriched = new StandardProvenanceEventRecord.Builder().fromEvent(event);
-                            final FlowFileRecord record = recordIdMap.get(event.getFlowFileUuid());
-                            if (record == null) {
-                                return null;
-                            }
-
-                            final ContentClaim claim = record.getContentClaim();
-                            if (claim != null) {
-                                enriched.setCurrentContentClaim(claim.getContainer(), claim.getSection(), claim.getId(), record.getContentClaimOffset(), record.getSize());
-                                enriched.setPreviousContentClaim(claim.getContainer(), claim.getSection(), claim.getId(), record.getContentClaimOffset(), record.getSize());
-                            }
-
-                            enriched.setAttributes(record.getAttributes(), Collections.<String, String>emptyMap());
-                            return enriched.build();
-                        }
-
-                        @Override
-                        public void remove() {
-                            throw new UnsupportedOperationException();
-                        }
-                    };
-
-                    return enrichingIterator;
-                }
-            };
-
-            context.getProvenanceRepository().registerEvents(iterable);
-            context.getFlowFileRepository().updateRepository(expiredRecords);
-        } catch (final IOException e) {
-            LOG.error("Failed to update FlowFile Repository to record expired records due to {}", e);
-        }
-
     }
 
     private InputStream getInputStream(final FlowFile flowFile, final ContentClaim claim, final long offset) throws ContentNotFoundException {
@@ -2438,7 +2397,14 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
 
         final ProvenanceEventRecord dropEvent = provenanceReporter.drop(suspectRecord.getCurrent(), nfe.getMessage() == null ? "Content Not Found" : nfe.getMessage());
         if (dropEvent != null) {
-            context.getProvenanceRepository().registerEvent(dropEvent);
+            try {
+                context.getProvenanceRepository().registerEvent(dropEvent);
+            } catch (final IOException ioe) {
+                LOG.error("{} Failed to register DROP Provenance event for {} when handling ContentNotFound error due to {}", this, suspectRecord.getCurrent(), ioe.toString());
+                if ( LOG.isDebugEnabled() ) {
+                    LOG.error("", ioe);
+                }
+            }
         }
 
         if (missingClaim == registeredClaim) {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b95e7569/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ExpireFlowFiles.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ExpireFlowFiles.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ExpireFlowFiles.java
index a351a68..d0020b5 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ExpireFlowFiles.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ExpireFlowFiles.java
@@ -16,9 +16,12 @@
  */
 package org.apache.nifi.controller.tasks;
 
+import java.io.IOException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
+import javax.print.attribute.standard.Severity;
+
 import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.connectable.Connection;
 import org.apache.nifi.connectable.Funnel;
@@ -29,9 +32,12 @@ import org.apache.nifi.controller.repository.ProcessContext;
 import org.apache.nifi.controller.repository.StandardProcessSession;
 import org.apache.nifi.controller.repository.StandardProcessSessionFactory;
 import org.apache.nifi.controller.scheduling.ProcessContextFactory;
+import org.apache.nifi.events.BulletinFactory;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.groups.RemoteProcessGroup;
 import org.apache.nifi.util.FormatUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This task runs through all Connectable Components and goes through its
@@ -39,7 +45,8 @@ import org.apache.nifi.util.FormatUtils;
  * desired side effect of expiring old FlowFiles.
  */
 public class ExpireFlowFiles implements Runnable {
-
+    private static final Logger logger = LoggerFactory.getLogger(ExpireFlowFiles.class);
+    
     private final FlowController flowController;
     private final ProcessContextFactory contextFactory;
 
@@ -51,7 +58,19 @@ public class ExpireFlowFiles implements Runnable {
     @Override
     public void run() {
         final ProcessGroup rootGroup = flowController.getGroup(flowController.getRootGroupId());
-        expireFlowFiles(rootGroup);
+        
+        try {
+            expireFlowFiles(rootGroup);
+        } catch (final Exception e) {
+            logger.error("Failed to expire FlowFiles due to {}", e.toString());
+            if ( logger.isDebugEnabled() ) {
+                logger.error("", e);
+            }
+            
+            flowController.getBulletinRepository().addBulletin(BulletinFactory.createBulletin(
+                    "FlowFile Expiration", Severity.ERROR.getName(), "Could not expire FlowFiles due to " + e));
+            
+        }
     }
 
     private StandardProcessSession createSession(final Connectable connectable) {
@@ -60,7 +79,7 @@ public class ExpireFlowFiles implements Runnable {
         return sessionFactory.createSession();
     }
 
-    private void expireFlowFiles(final Connectable connectable) {
+    private void expireFlowFiles(final Connectable connectable) throws IOException {
         // determine if the incoming connections for this Connectable have Expiration configured.
         boolean expirationConfigured = false;
         for (final Connection incomingConn : connectable.getIncomingConnections()) {
@@ -80,7 +99,7 @@ public class ExpireFlowFiles implements Runnable {
         session.commit();
     }
 
-    private void expireFlowFiles(final ProcessGroup group) {
+    private void expireFlowFiles(final ProcessGroup group) throws IOException {
         for (final ProcessorNode procNode : group.getProcessors()) {
             expireFlowFiles(procNode);
         }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b95e7569/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
index 1ff63c5..7ae156c 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
@@ -65,6 +65,7 @@ import org.apache.nifi.provenance.MockProvenanceEventRepository;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.ProvenanceEventRepository;
 import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.provenance.StoredProvenanceEvent;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -357,7 +358,7 @@ public class TestStandardProcessSession {
         
         session.commit();
         
-        final List<ProvenanceEventRecord> events = provenanceRepo.getEvents(0L, 1000);
+        final List<StoredProvenanceEvent> events = provenanceRepo.getEvents(0L, 1000);
 
         // We should have a JOIN and 2 ATTRIBUTE_MODIFIED's
         assertEquals(3, events.size());
@@ -412,7 +413,7 @@ public class TestStandardProcessSession {
         session.remove(orig);
         session.commit();
 
-        final List<ProvenanceEventRecord> events = provenanceRepo.getEvents(0L, 1000);
+        final List<StoredProvenanceEvent> events = provenanceRepo.getEvents(0L, 1000);
         assertEquals(2, events.size());
 
         final ProvenanceEventRecord firstRecord = events.get(0);
@@ -838,7 +839,7 @@ public class TestStandardProcessSession {
         session.transfer(newFlowFile, new Relationship.Builder().name("A").build());
         session.commit();
         
-        final List<ProvenanceEventRecord> events = provenanceRepo.getEvents(0L, 10000);
+        final List<StoredProvenanceEvent> events = provenanceRepo.getEvents(0L, 10000);
         assertFalse(events.isEmpty());
         assertEquals(1, events.size());
         
@@ -857,7 +858,7 @@ public class TestStandardProcessSession {
         session.transfer(newFlowFile, new Relationship.Builder().name("A").build());
         session.commit();
         
-        final List<ProvenanceEventRecord> events = provenanceRepo.getEvents(0L, 10000);
+        final List<StoredProvenanceEvent> events = provenanceRepo.getEvents(0L, 10000);
         assertFalse(events.isEmpty());
         assertEquals(1, events.size());
         
@@ -883,7 +884,7 @@ public class TestStandardProcessSession {
         session.transfer(existingFlowFile, new Relationship.Builder().name("A").build());
         session.commit();
         
-        final List<ProvenanceEventRecord> events = provenanceRepo.getEvents(0L, 10000);
+        final List<StoredProvenanceEvent> events = provenanceRepo.getEvents(0L, 10000);
         assertFalse(events.isEmpty());
         assertEquals(1, events.size());
         
@@ -904,7 +905,7 @@ public class TestStandardProcessSession {
         session.transfer(existingFlowFile, new Relationship.Builder().name("A").build());
         session.commit();
         
-        final List<ProvenanceEventRecord> events = provenanceRepo.getEvents(0L, 10000);
+        final List<StoredProvenanceEvent> events = provenanceRepo.getEvents(0L, 10000);
         assertFalse(events.isEmpty());
         assertEquals(1, events.size());
         

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b95e7569/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
index b009581..56db464 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
@@ -697,9 +697,9 @@ public class ControllerFacade implements ControllerServiceProvider {
             resultsDto.setTimeOffset(TimeZone.getDefault().getOffset(now.getTime()));
 
             // get the oldest available event time
-            final List<ProvenanceEventRecord> firstEvent = provenanceRepository.getEvents(0, 1);
-            if (!firstEvent.isEmpty()) {
-                resultsDto.setOldestEvent(new Date(firstEvent.get(0).getEventTime()));
+            final Long oldestEventTime = provenanceRepository.getEarliestEventTime();
+            if (oldestEventTime != null) {
+                resultsDto.setOldestEvent(new Date(oldestEventTime));
             }
 
             provenanceDto.setResults(resultsDto);

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b95e7569/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/pom.xml b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/pom.xml
index 5997281..4e9e9fb 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/pom.xml
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/pom.xml
@@ -36,5 +36,10 @@
 			<groupId>org.apache.lucene</groupId>
 			<artifactId>lucene-queryparser</artifactId>
 		</dependency>
+		<dependency>
+			<groupId>org.slf4j</groupId>
+			<artifactId>slf4j-simple</artifactId>
+			<scope>test</scope>
+		</dependency>
 	</dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b95e7569/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java
index 2130e73..7911d73 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java
@@ -55,6 +55,8 @@ import org.apache.nifi.provenance.journaling.partition.PartitionAction;
 import org.apache.nifi.provenance.journaling.partition.PartitionManager;
 import org.apache.nifi.provenance.journaling.partition.QueuingPartitionManager;
 import org.apache.nifi.provenance.journaling.partition.VoidPartitionAction;
+import org.apache.nifi.provenance.journaling.query.QueryManager;
+import org.apache.nifi.provenance.journaling.query.StandardQueryManager;
 import org.apache.nifi.provenance.journaling.toc.StandardTocReader;
 import org.apache.nifi.provenance.journaling.toc.TocReader;
 import org.apache.nifi.provenance.lineage.ComputeLineageSubmission;
@@ -67,15 +69,17 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class JournalingProvenanceRepository implements ProvenanceEventRepository {
+    public static final String BLOCK_SIZE = "nifi.provenance.block.size";
+    
     private static final Logger logger = LoggerFactory.getLogger(JournalingProvenanceRepository.class);
     
     private final JournalingRepositoryConfig config;
-    private final PartitionManager partitionManager;
     private final AtomicLong idGenerator = new AtomicLong(0L);
-    
-    private EventReporter eventReporter;    // effectively final
     private final ExecutorService executor;
     
+    private EventReporter eventReporter;    // effectively final
+    private PartitionManager partitionManager;  // effectively final
+    private QueryManager queryManager;    // effectively final
     
     public JournalingProvenanceRepository() throws IOException {
         this(createConfig());
@@ -84,7 +88,6 @@ public class JournalingProvenanceRepository implements ProvenanceEventRepository
     public JournalingProvenanceRepository(final JournalingRepositoryConfig config) throws IOException {
         this.config = config;
         this.executor = Executors.newFixedThreadPool(config.getThreadPoolSize());
-        this.partitionManager = new QueuingPartitionManager(config, executor);
     }
     
     
@@ -110,7 +113,8 @@ public class JournalingProvenanceRepository implements ProvenanceEventRepository
         final boolean compressOnRollover = Boolean.parseBoolean(properties.getProperty(NiFiProperties.PROVENANCE_COMPRESS_ON_ROLLOVER));
         final String indexedFieldString = properties.getProperty(NiFiProperties.PROVENANCE_INDEXED_FIELDS);
         final String indexedAttrString = properties.getProperty(NiFiProperties.PROVENANCE_INDEXED_ATTRIBUTES);
-
+        final int blockSize = properties.getIntegerProperty(BLOCK_SIZE, 1000);
+        
         final Boolean alwaysSync = Boolean.parseBoolean(properties.getProperty("nifi.provenance.repository.always.sync", "false"));
 
         final List<SearchableField> searchableFields = SearchableFieldParser.extractSearchableFields(indexedFieldString, true);
@@ -137,7 +141,8 @@ public class JournalingProvenanceRepository implements ProvenanceEventRepository
         config.setMaxStorageCapacity(maxStorageBytes);
         config.setThreadPoolSize(queryThreads);
         config.setPartitionCount(journalCount);
-
+        config.setBlockSize(blockSize);
+        
         if (shardSize != null) {
             config.setDesiredIndexSize(DataUnit.parseDataSize(shardSize, DataUnit.B).longValue());
         }
@@ -150,6 +155,9 @@ public class JournalingProvenanceRepository implements ProvenanceEventRepository
     @Override
     public synchronized void initialize(final EventReporter eventReporter) throws IOException {
         this.eventReporter = eventReporter;
+        
+        this.partitionManager = new QueuingPartitionManager(config, executor);
+        this.queryManager = new StandardQueryManager(partitionManager, config, 10);
     }
 
     @Override
@@ -328,14 +336,12 @@ public class JournalingProvenanceRepository implements ProvenanceEventRepository
     
     @Override
     public QuerySubmission submitQuery(final Query query) {
-        // TODO Auto-generated method stub
-        return null;
+        return queryManager.submitQuery(query);
     }
 
     @Override
     public QuerySubmission retrieveQuerySubmission(final String queryIdentifier) {
-        // TODO Auto-generated method stub
-        return null;
+        return queryManager.retrieveQuerySubmission(queryIdentifier);
     }
 
     @Override
@@ -364,7 +370,10 @@ public class JournalingProvenanceRepository implements ProvenanceEventRepository
 
     @Override
     public void close() throws IOException {
-        partitionManager.shutdown();
+        if ( partitionManager != null ) {
+            partitionManager.shutdown();
+        }
+        
         executor.shutdown();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b95e7569/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/config/JournalingRepositoryConfig.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/config/JournalingRepositoryConfig.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/config/JournalingRepositoryConfig.java
index 6dd7be9..8998932 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/config/JournalingRepositoryConfig.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/config/JournalingRepositoryConfig.java
@@ -34,7 +34,7 @@ public class JournalingRepositoryConfig {
     private long journalCapacity = 1024L * 1024L * 5L;   // 5 MB
     private long desiredIndexBytes = 1024L * 1024L * 500L; // 500 MB
     private int partitionCount = 16;
-    private int blockSize = 100;
+    private int blockSize = 5000;
 
     private List<SearchableField> searchableFields = new ArrayList<>();
     private List<SearchableField> searchableAttributes = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b95e7569/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalReader.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalReader.java
index 82ef39b..2ec5131 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalReader.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalReader.java
@@ -163,7 +163,7 @@ public class StandardJournalReader implements JournalReader {
             }
         }
         
-        throw new IOException("Could not find event with ID " + eventId);
+        throw new IOException("Could not find event with ID " + eventId + " in " + this);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b95e7569/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/JournalingPartition.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/JournalingPartition.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/JournalingPartition.java
index 51f84a2..651c41e 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/JournalingPartition.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/JournalingPartition.java
@@ -214,6 +214,7 @@ public class JournalingPartition implements Partition {
         final File journalFile = new File(journalsDir, firstEventId + JOURNAL_FILE_EXTENSION);
         journalWriter = new StandardJournalWriter(firstEventId, journalFile, false, new StandardEventSerializer());
         tocWriter = new StandardTocWriter(QueryUtils.getTocFile(journalFile), false);
+        tocWriter.addBlockOffset(journalWriter.getSize());
         numEventsAtEndOfLastBlock = 0;
     }
     
@@ -421,4 +422,9 @@ public class JournalingPartition implements Partition {
     public Long getEarliestEventTime() throws IOException {
         return earliestEventTime;
     }
+    
+    @Override
+    public String toString() {
+        return "Partition[section=" + sectionName + "]";
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b95e7569/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/QueuingPartitionManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/QueuingPartitionManager.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/QueuingPartitionManager.java
index 4ac0fc6..51d90e2 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/QueuingPartitionManager.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/QueuingPartitionManager.java
@@ -35,9 +35,13 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.nifi.provenance.journaling.config.JournalingRepositoryConfig;
 import org.apache.nifi.util.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class QueuingPartitionManager implements PartitionManager {
-
+    
+    private static final Logger logger = LoggerFactory.getLogger(QueuingPartitionManager.class);
+    
     private final JournalingRepositoryConfig config;
     private final BlockingQueue<Partition> partitionQueue;
     private final JournalingPartition[] partitionArray;
@@ -180,6 +184,39 @@ public class QueuingPartitionManager implements PartitionManager {
     
     @Override
     public void withEachPartition(final VoidPartitionAction action, final boolean async) {
+        // TODO: Do not use blacklisted partitions.
+        final Map<Partition, Future<?>> futures = new HashMap<>(partitionArray.length);
+        for ( final Partition partition : partitionArray ) {
+            final Runnable runnable = new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        action.perform(partition);
+                    } catch (final Throwable t) {
+                        logger.error("Failed to perform action against " + partition + " due to " + t);
+                        if ( logger.isDebugEnabled() ) {
+                            logger.error("", t);
+                        }
+                    }
+                }
+            };
+            
+            final Future<?> future = executor.submit(runnable);
+            futures.put(partition, future);
+        }
         
+        if ( !async ) {
+            for ( final Map.Entry<Partition, Future<?>> entry : futures.entrySet() ) {
+                try {
+                    // throw any exception thrown by runnable
+                    entry.getValue().get();
+                } catch (final ExecutionException ee) {
+                    final Throwable cause = ee.getCause();
+                    throw new RuntimeException("Failed to query Partition " + entry.getKey() + " due to " + cause, cause);
+                } catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b95e7569/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/query/QueryManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/query/QueryManager.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/query/QueryManager.java
new file mode 100644
index 0000000..4edc6ad
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/query/QueryManager.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.journaling.query;
+
+import org.apache.nifi.provenance.search.Query;
+import org.apache.nifi.provenance.search.QuerySubmission;
+
+public interface QueryManager {
+    /**
+     * Submits an asynchronous request to process the given query, returning an
+     * identifier that can be used to fetch the results at a later time
+     *
+     * @param query
+     * @return
+     */
+    QuerySubmission submitQuery(Query query);
+    
+    /**
+     * Returns the QueryResult associated with the given identifier, if the
+     * query has finished processing. If the query has not yet finished running,
+     * returns <code>null</code>.
+     *
+     * @param queryIdentifier
+     *
+     * @return
+     */
+    QuerySubmission retrieveQuerySubmission(String queryIdentifier);
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b95e7569/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/query/StandardQueryManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/query/StandardQueryManager.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/query/StandardQueryManager.java
new file mode 100644
index 0000000..4cce231
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/query/StandardQueryManager.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.journaling.query;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.nifi.provenance.AsyncQuerySubmission;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.StoredProvenanceEvent;
+import org.apache.nifi.provenance.journaling.JournaledProvenanceEvent;
+import org.apache.nifi.provenance.journaling.JournaledStorageLocation;
+import org.apache.nifi.provenance.journaling.config.JournalingRepositoryConfig;
+import org.apache.nifi.provenance.journaling.index.EventIndexSearcher;
+import org.apache.nifi.provenance.journaling.index.QueryUtils;
+import org.apache.nifi.provenance.journaling.index.SearchResult;
+import org.apache.nifi.provenance.journaling.journals.JournalReader;
+import org.apache.nifi.provenance.journaling.journals.StandardJournalReader;
+import org.apache.nifi.provenance.journaling.partition.Partition;
+import org.apache.nifi.provenance.journaling.partition.PartitionManager;
+import org.apache.nifi.provenance.journaling.partition.VoidPartitionAction;
+import org.apache.nifi.provenance.journaling.toc.StandardTocReader;
+import org.apache.nifi.provenance.journaling.toc.TocReader;
+import org.apache.nifi.provenance.search.Query;
+import org.apache.nifi.provenance.search.QuerySubmission;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StandardQueryManager implements QueryManager {
+    private static final Logger logger = LoggerFactory.getLogger(StandardQueryManager.class);
+    
+    private final int maxConcurrentQueries;
+    private final JournalingRepositoryConfig config;
+    private final PartitionManager partitionManager;
+    private final ConcurrentMap<String, AsyncQuerySubmission> querySubmissionMap = new ConcurrentHashMap<>();
+    
+    public StandardQueryManager(final PartitionManager partitionManager, final JournalingRepositoryConfig config, final int maxConcurrentQueries) {
+        this.config = config;
+        this.maxConcurrentQueries = maxConcurrentQueries;
+        this.partitionManager = partitionManager;
+    }
+    
+    @Override
+    public QuerySubmission submitQuery(final Query query) {
+        final int numQueries = querySubmissionMap.size();
+        if (numQueries > maxConcurrentQueries) {
+            throw new IllegalStateException("Cannot process query because there are currently " + numQueries + " queries whose results have not been deleted (likely due to poorly behaving clients not issuing DELETE requests). Please try again later.");
+        }
+
+        if (query.getEndDate() != null && query.getStartDate() != null && query.getStartDate().getTime() > query.getEndDate().getTime()) {
+            throw new IllegalArgumentException("Query End Time cannot be before Query Start Time");
+        }
+
+        if (query.getSearchTerms().isEmpty() && query.getStartDate() == null && query.getEndDate() == null) {
+            final AsyncQuerySubmission result = new AsyncQuerySubmission(query, 1);
+
+            querySubmissionMap.put(query.getIdentifier(), result);
+            return result;
+        }
+
+        final AtomicInteger retrievalCount = new AtomicInteger(query.getMaxResults());
+        final AsyncQuerySubmission submission = new AsyncQuerySubmission(query, config.getPartitionCount()) {
+            @Override
+            public void cancel() {
+                super.cancel();
+                querySubmissionMap.remove(query.getIdentifier());
+            }
+        };
+        
+        querySubmissionMap.put(query.getIdentifier(), submission);
+
+        partitionManager.withEachPartition(new VoidPartitionAction() {
+            @SuppressWarnings({ "rawtypes", "unchecked" })
+            @Override
+            public void perform(final Partition partition) throws IOException {
+                logger.debug("Running {} against {}", query, partition);
+                
+                try (final EventIndexSearcher searcher = partition.newIndexSearcher()) {
+                    final SearchResult searchResult = searcher.search(query);
+                    logger.debug("{} has {} hits against {} over {} files", query, searchResult.getTotalCount(), partition, searchResult.getLocations().size());
+                    
+                    final List<StoredProvenanceEvent> matchingRecords = new ArrayList<>();
+                    final Map<File, List<JournaledStorageLocation>> locationMap = QueryUtils.orderLocations((List) searchResult.getLocations(), config);
+                    for ( final Map.Entry<File, List<JournaledStorageLocation>> entry : locationMap.entrySet() ) {
+                        final File journalFile = entry.getKey();
+                        final List<JournaledStorageLocation> locations = entry.getValue();
+                        
+                        if ( retrievalCount.get() <= 0 ) {
+                            break;
+                        }
+                        
+                        try (final JournalReader reader = new StandardJournalReader(journalFile);
+                             final TocReader tocReader = new StandardTocReader(QueryUtils.getTocFile(journalFile))) {
+                            
+                            for ( final JournaledStorageLocation location : locations ) {
+                                final long blockOffset = tocReader.getBlockOffset(location.getBlockIndex());
+                                final ProvenanceEventRecord event = reader.getEvent(blockOffset, location.getEventId());
+                                matchingRecords.add(new JournaledProvenanceEvent(event, location));
+                                
+                                final int recordsLeft = retrievalCount.decrementAndGet();
+                                if ( recordsLeft <= 0 ) {
+                                    break;
+                                }
+                            }
+                        }
+                    }
+                    
+                    logger.debug("Finished executing {} against {}", query, partition);
+                    submission.getResult().update(matchingRecords, searchResult.getTotalCount());
+                } catch (final Exception e) {
+                    submission.getResult().setError("Failed to query " + partition + " due to " + e.toString());
+                    throw e;
+                }
+            }
+        }, true);
+        
+        return submission;
+    }
+
+    @Override
+    public QuerySubmission retrieveQuerySubmission(final String queryIdentifier) {
+        return querySubmissionMap.get(queryIdentifier);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b95e7569/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/resources/META-INF/services/org.apache.nifi.provenance.ProvenanceEventRepository
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/resources/META-INF/services/org.apache.nifi.provenance.ProvenanceEventRepository b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/resources/META-INF/services/org.apache.nifi.provenance.ProvenanceEventRepository
new file mode 100644
index 0000000..e224c51
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/resources/META-INF/services/org.apache.nifi.provenance.ProvenanceEventRepository
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.provenance.journaling.JournalingProvenanceRepository
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b95e7569/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/TestJournalingProvenanceRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/TestJournalingProvenanceRepository.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/TestJournalingProvenanceRepository.java
new file mode 100644
index 0000000..a547a8a
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/TestJournalingProvenanceRepository.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.journaling;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.nifi.provenance.SearchableFields;
+import org.apache.nifi.provenance.StoredProvenanceEvent;
+import org.apache.nifi.provenance.journaling.config.JournalingRepositoryConfig;
+import org.apache.nifi.provenance.search.Query;
+import org.apache.nifi.provenance.search.QueryResult;
+import org.apache.nifi.provenance.search.QuerySubmission;
+import org.apache.nifi.provenance.search.SearchTerms;
+import org.apache.nifi.provenance.search.SearchableField;
+import org.apache.nifi.util.file.FileUtils;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestJournalingProvenanceRepository {
+
+    
+    @BeforeClass
+    public static void setupLogging() {
+        System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance.journaling", "DEBUG");
+    }
+    
+    @Test
+    public void testStoreAndRetrieve() throws IOException {
+        final JournalingRepositoryConfig config = new JournalingRepositoryConfig();
+        final Map<String, File> containers = new HashMap<>();
+        containers.put("container1", new File("target/" + UUID.randomUUID().toString()));
+        containers.put("container2", new File("target/" + UUID.randomUUID().toString()));
+        config.setContainers(containers);
+        config.setPartitionCount(3);
+        
+        try (final JournalingProvenanceRepository repo = new JournalingProvenanceRepository(config)) {
+            repo.initialize(null);
+            final Map<String, String> attributes = new HashMap<>();
+            
+            for (int i=0; i < 10; i++) {
+                attributes.put("i", String.valueOf(i));
+                repo.registerEvent(TestUtil.generateEvent(i, attributes));
+            }
+            
+            // retrieve records one at a time.
+            for (int i=0; i < 10; i++) {
+                final StoredProvenanceEvent event = repo.getEvent(i);
+                assertNotNull(event);
+                assertEquals((long) i, event.getEventId());
+                assertEquals("00000000-0000-0000-0000-00000000000" + i, event.getFlowFileUuid());
+            }
+            
+            final List<StoredProvenanceEvent> events = repo.getEvents(0, 1000);
+            assertNotNull(events);
+            assertEquals(10, events.size());
+            for (int i=0; i < 10; i++) {
+                final StoredProvenanceEvent event = events.get(i);
+                assertNotNull(event);
+                assertEquals((long) i, event.getEventId());
+                assertEquals("00000000-0000-0000-0000-00000000000" + i, event.getFlowFileUuid());
+            }
+        } finally {
+            for ( final File file : containers.values() ) {
+                if ( file.exists() ) {
+                    FileUtils.deleteFile(file, true);
+                }
+            }
+        }
+    }
+    
+    
+    @Test(timeout=10000000)
+    public void testSearchByUUID() throws IOException, InterruptedException {
+        final JournalingRepositoryConfig config = new JournalingRepositoryConfig();
+        final Map<String, File> containers = new HashMap<>();
+        containers.put("container1", new File("target/" + UUID.randomUUID().toString()));
+        containers.put("container2", new File("target/" + UUID.randomUUID().toString()));
+        config.setContainers(containers);
+        
+        config.setPartitionCount(3);
+        config.setSearchableFields(Arrays.asList(new SearchableField[] {
+                SearchableFields.FlowFileUUID
+        }));
+        
+        try (final JournalingProvenanceRepository repo = new JournalingProvenanceRepository(config)) {
+            repo.initialize(null);
+            
+            final Map<String, String> attributes = new HashMap<>();
+            
+            for (int i=0; i < 10; i++) {
+                attributes.put("i", String.valueOf(i));
+                repo.registerEvent(TestUtil.generateEvent(i, attributes));
+            }
+            
+            final Query query = new Query("query");
+            query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.FlowFileUUID, "00000000-0000-0000-0000-000000000005"));
+            final QuerySubmission submission = repo.submitQuery(query);
+            assertNotNull(submission);
+            
+            final QueryResult result = submission.getResult();
+            while ( !result.isFinished() ) {
+                Thread.sleep(50L);
+            }
+            
+            assertNull(result.getError());
+            final List<StoredProvenanceEvent> matches = result.getMatchingEvents();
+            assertNotNull(matches);
+            assertEquals(1, matches.size());
+            
+            final StoredProvenanceEvent event = matches.get(0);
+            assertEquals(5, event.getEventId());
+            assertEquals("00000000-0000-0000-0000-000000000005", event.getFlowFileUuid());
+        } finally {
+            for ( final File file : containers.values() ) {
+                FileUtils.deleteFile(file, true);
+            }
+        }
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b95e7569/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/TestUtil.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/TestUtil.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/TestUtil.java
index 45b7338..6d05f7a 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/TestUtil.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/TestUtil.java
@@ -16,6 +16,8 @@
  */
 package org.apache.nifi.provenance.journaling;
 
+import java.util.Collections;
+import java.util.Map;
 import java.util.UUID;
 
 import org.apache.nifi.provenance.ProvenanceEventRecord;
@@ -23,7 +25,12 @@ import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.provenance.StandardProvenanceEventRecord;
 
 public class TestUtil {
+    
     public static ProvenanceEventRecord generateEvent(final long id) {
+        return generateEvent(id, Collections.<String, String>emptyMap());
+    }
+    
+    public static ProvenanceEventRecord generateEvent(final long id, final Map<String, String> attributes) {
         // Create prov event to add to the stream
         final ProvenanceEventRecord event = new StandardProvenanceEventRecord.Builder()
             .setEventType(ProvenanceEventType.CREATE)
@@ -34,6 +41,7 @@ public class TestUtil {
             .setFlowFileEntryDate(System.currentTimeMillis() - 1000L)
             .setLineageStartDate(System.currentTimeMillis() - 2000L)
             .setCurrentContentClaim(null, null, null, null, 0L)
+            .setAttributes(null, attributes == null ? Collections.<String, String>emptyMap() : attributes)
             .build();
         
         return event;