You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/11/02 20:32:20 UTC

[23/50] [abbrv] nifi git commit: NIFI-1082: Ensure that events returned from the provenance repository are ordered such that newest events are provided first

NIFI-1082: Ensure that events returned from the provenance repository are ordered such that newest events are provided first


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/cf8ca3dc
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/cf8ca3dc
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/cf8ca3dc

Branch: refs/heads/NIFI-730
Commit: cf8ca3dc2c9ee9220c0b83b6c003ef20c67fbd33
Parents: dc4004d
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Oct 28 14:45:13 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Wed Oct 28 17:32:51 2015 -0400

----------------------------------------------------------------------
 .../StandardProvenanceEventRecord.java          |  36 ++--
 .../nifi/provenance/StandardQueryResult.java    |  25 ++-
 .../nifi/provenance/IndexConfiguration.java     |  47 +++---
 .../PersistentProvenanceRepository.java         | 124 +++++++-------
 .../nifi/provenance/lucene/DocsReader.java      |  49 ++----
 .../nifi/provenance/lucene/IndexSearch.java     |  35 ++--
 .../nifi/provenance/lucene/LineageQuery.java    |   7 +-
 .../TestPersistentProvenanceRepository.java     | 163 ++++++++++++++++++-
 .../VolatileProvenanceRepository.java           |  16 +-
 9 files changed, 327 insertions(+), 175 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/cf8ca3dc/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardProvenanceEventRecord.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardProvenanceEventRecord.java b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardProvenanceEventRecord.java
index 4eb7001..892a8f8 100644
--- a/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardProvenanceEventRecord.java
+++ b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardProvenanceEventRecord.java
@@ -71,7 +71,7 @@ public final class StandardProvenanceEventRecord implements ProvenanceEventRecor
     private final Map<String, String> previousAttributes;
     private final Map<String, String> updatedAttributes;
 
-    private volatile long eventId;
+    private volatile long eventId = -1L;
 
     private StandardProvenanceEventRecord(final Builder builder) {
         this.eventTime = builder.eventTime;
@@ -105,8 +105,8 @@ public final class StandardProvenanceEventRecord implements ProvenanceEventRecor
         contentClaimOffset = builder.contentClaimOffset;
         contentSize = builder.contentSize;
 
-        previousAttributes = builder.previousAttributes == null ? Collections.<String, String>emptyMap() : Collections.unmodifiableMap(builder.previousAttributes);
-        updatedAttributes = builder.updatedAttributes == null ? Collections.<String, String>emptyMap() : Collections.unmodifiableMap(builder.updatedAttributes);
+        previousAttributes = builder.previousAttributes == null ? Collections.<String, String> emptyMap() : Collections.unmodifiableMap(builder.previousAttributes);
+        updatedAttributes = builder.updatedAttributes == null ? Collections.<String, String> emptyMap() : Collections.unmodifiableMap(builder.updatedAttributes);
 
         sourceQueueIdentifier = builder.sourceQueueIdentifier;
 
@@ -198,12 +198,12 @@ public final class StandardProvenanceEventRecord implements ProvenanceEventRecor
 
     @Override
     public List<String> getParentUuids() {
-        return parentUuids == null ? Collections.<String>emptyList() : parentUuids;
+        return parentUuids == null ? Collections.<String> emptyList() : parentUuids;
     }
 
     @Override
     public List<String> getChildUuids() {
-        return childrenUuids == null ? Collections.<String>emptyList() : childrenUuids;
+        return childrenUuids == null ? Collections.<String> emptyList() : childrenUuids;
     }
 
     @Override
@@ -298,7 +298,7 @@ public final class StandardProvenanceEventRecord implements ProvenanceEventRecor
         }
 
         return -37423 + 3 * componentId.hashCode() + (transitUri == null ? 0 : 41 * transitUri.hashCode())
-                + (relationship == null ? 0 : 47 * relationship.hashCode()) + 44 * eventTypeCode;
+            + (relationship == null ? 0 : 47 * relationship.hashCode()) + 44 * eventTypeCode;
     }
 
     @Override
@@ -316,7 +316,7 @@ public final class StandardProvenanceEventRecord implements ProvenanceEventRecor
         final StandardProvenanceEventRecord other = (StandardProvenanceEventRecord) obj;
         // If event ID's are populated and not equal, return false. If they have not yet been populated, do not
         // use them in the comparison.
-        if (eventId > 0L && other.getEventId() > 0L && eventId != other.getEventId()) {
+        if (eventId >= 0L && other.getEventId() >= 0L && eventId != other.getEventId()) {
             return false;
         }
         if (eventType != other.eventType) {
@@ -397,16 +397,16 @@ public final class StandardProvenanceEventRecord implements ProvenanceEventRecor
     @Override
     public String toString() {
         return "ProvenanceEventRecord ["
-                + "eventId=" + eventId
-                + ", eventType=" + eventType
-                + ", eventTime=" + new Date(eventTime)
-                + ", uuid=" + uuid
-                + ", fileSize=" + contentSize
-                + ", componentId=" + componentId
-                + ", transitUri=" + transitUri
-                + ", sourceSystemFlowFileIdentifier=" + sourceSystemFlowFileIdentifier
-                + ", parentUuids=" + parentUuids
-                + ", alternateIdentifierUri=" + alternateIdentifierUri + "]";
+            + "eventId=" + eventId
+            + ", eventType=" + eventType
+            + ", eventTime=" + new Date(eventTime)
+            + ", uuid=" + uuid
+            + ", fileSize=" + contentSize
+            + ", componentId=" + componentId
+            + ", transitUri=" + transitUri
+            + ", sourceSystemFlowFileIdentifier=" + sourceSystemFlowFileIdentifier
+            + ", parentUuids=" + parentUuids
+            + ", alternateIdentifierUri=" + alternateIdentifierUri + "]";
     }
 
     public static class Builder implements ProvenanceEventBuilder {
@@ -663,7 +663,7 @@ public final class StandardProvenanceEventRecord implements ProvenanceEventRecor
             setFlowFileEntryDate(flowFile.getEntryDate());
             setLineageIdentifiers(flowFile.getLineageIdentifiers());
             setLineageStartDate(flowFile.getLineageStartDate());
-            setAttributes(Collections.<String, String>emptyMap(), flowFile.getAttributes());
+            setAttributes(Collections.<String, String> emptyMap(), flowFile.getAttributes());
             uuid = flowFile.getAttribute(CoreAttributes.UUID.key());
             this.contentSize = flowFile.getSize();
             return this;

http://git-wip-us.apache.org/repos/asf/nifi/blob/cf8ca3dc/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardQueryResult.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardQueryResult.java b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardQueryResult.java
index 9a9a27d..bef63e1 100644
--- a/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardQueryResult.java
+++ b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardQueryResult.java
@@ -18,6 +18,7 @@ package org.apache.nifi.provenance;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Date;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
@@ -40,7 +41,7 @@ public class StandardQueryResult implements QueryResult {
 
     private final Lock writeLock = rwLock.writeLock();
     // guarded by writeLock
-    private final List<ProvenanceEventRecord> matchingRecords = new ArrayList<>();
+    private final List<List<ProvenanceEventRecord>> matchingRecords;
     private long totalHitCount;
     private int numCompletedSteps = 0;
     private Date expirationDate;
@@ -53,6 +54,11 @@ public class StandardQueryResult implements QueryResult {
         this.query = query;
         this.numSteps = numSteps;
         this.creationNanos = System.nanoTime();
+        this.matchingRecords = new ArrayList<>(numSteps);
+
+        for (int i = 0; i < Math.max(1, numSteps); i++) {
+            matchingRecords.add(Collections.<ProvenanceEventRecord> emptyList());
+        }
 
         updateExpiration();
     }
@@ -61,13 +67,14 @@ public class StandardQueryResult implements QueryResult {
     public List<ProvenanceEventRecord> getMatchingEvents() {
         readLock.lock();
         try {
-            if (matchingRecords.size() <= query.getMaxResults()) {
-                return new ArrayList<>(matchingRecords);
-            }
-
             final List<ProvenanceEventRecord> copy = new ArrayList<>(query.getMaxResults());
-            for (int i = 0; i < query.getMaxResults(); i++) {
-                copy.add(matchingRecords.get(i));
+            for (final List<ProvenanceEventRecord> recordList : matchingRecords) {
+                if (copy.size() + recordList.size() > query.getMaxResults()) {
+                    copy.addAll(recordList.subList(0, query.getMaxResults() - copy.size()));
+                    return copy;
+                } else {
+                    copy.addAll(recordList);
+                }
             }
 
             return copy;
@@ -141,10 +148,10 @@ public class StandardQueryResult implements QueryResult {
         }
     }
 
-    public void update(final Collection<ProvenanceEventRecord> matchingRecords, final long totalHits) {
+    public void update(final Collection<ProvenanceEventRecord> matchingRecords, final long totalHits, final int indexId) {
         writeLock.lock();
         try {
-            this.matchingRecords.addAll(matchingRecords);
+            this.matchingRecords.set(indexId, new ArrayList<>(matchingRecords));
             this.totalHitCount += totalHits;
 
             numCompletedSteps++;

http://git-wip-us.apache.org/repos/asf/nifi/blob/cf8ca3dc/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java
index 4e80811..a6e6d5d 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/IndexConfiguration.java
@@ -36,7 +36,6 @@ import java.util.regex.Pattern;
 
 import org.apache.nifi.provenance.serialization.RecordReader;
 import org.apache.nifi.provenance.serialization.RecordReaders;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -173,6 +172,9 @@ public class IndexConfiguration {
             for (final List<File> list : indexDirectoryMap.values()) {
                 files.addAll(list);
             }
+
+            Collections.sort(files, new IndexDirectoryComparator());
+
             return files;
         } finally {
             lock.unlock();
@@ -198,11 +200,11 @@ public class IndexConfiguration {
      * span (times inclusive).
      *
      * @param startTime the start time of the query for which the indices are
-     * desired
+     *            desired
      * @param endTime the end time of the query for which the indices are
-     * desired
+     *            desired
      * @return the index directories that are applicable only for the given time
-     * span (times inclusive).
+     *         span (times inclusive).
      */
     public List<File> getIndexDirectories(final Long startTime, final Long endTime) {
         if (startTime == null && endTime == null) {
@@ -213,14 +215,7 @@ public class IndexConfiguration {
         lock.lock();
         try {
             final List<File> sortedIndexDirectories = getIndexDirectories();
-            Collections.sort(sortedIndexDirectories, new Comparator<File>() {
-                @Override
-                public int compare(final File o1, final File o2) {
-                    final long epochTimestamp1 = getIndexStartTime(o1);
-                    final long epochTimestamp2 = getIndexStartTime(o2);
-                    return Long.compare(epochTimestamp1, epochTimestamp2);
-                }
-            });
+            Collections.sort(sortedIndexDirectories, new IndexDirectoryComparator());
 
             for (final File indexDir : sortedIndexDirectories) {
                 // If the index was last modified before the start time, we know that it doesn't
@@ -252,9 +247,9 @@ public class IndexConfiguration {
      * event log
      *
      * @param provenanceLogFile the provenance log file for which the index
-     * directories are desired
+     *            directories are desired
      * @return the index directories that are applicable only for the given
-     * event log
+     *         event log
      */
     public List<File> getIndexDirectories(final File provenanceLogFile) {
         final List<File> dirs = new ArrayList<>();
@@ -262,23 +257,16 @@ public class IndexConfiguration {
         try {
             final List<File> indices = indexDirectoryMap.get(provenanceLogFile.getParentFile());
             if (indices == null) {
-                return Collections.<File>emptyList();
+                return Collections.<File> emptyList();
             }
 
             final List<File> sortedIndexDirectories = new ArrayList<>(indices);
-            Collections.sort(sortedIndexDirectories, new Comparator<File>() {
-                @Override
-                public int compare(final File o1, final File o2) {
-                    final long epochTimestamp1 = getIndexStartTime(o1);
-                    final long epochTimestamp2 = getIndexStartTime(o2);
-                    return Long.compare(epochTimestamp1, epochTimestamp2);
-                }
-            });
+            Collections.sort(sortedIndexDirectories, new IndexDirectoryComparator());
 
             final Long firstEntryTime = getFirstEntryTime(provenanceLogFile);
             if (firstEntryTime == null) {
                 logger.debug("Found no records in {} so returning no Indices for it", provenanceLogFile);
-                return Collections.<File>emptyList();
+                return Collections.<File> emptyList();
             }
 
             boolean foundIndexCreatedLater = false;
@@ -376,7 +364,7 @@ public class IndexConfiguration {
         lock.lock();
         try {
             if (minIndexedId == null || id > minIndexedId) {
-                if (maxIndexedId == null || id > maxIndexedId) {  // id will be > maxIndexedId if all records were expired
+                if (maxIndexedId == null || id > maxIndexedId) { // id will be > maxIndexedId if all records were expired
                     minIndexedId = maxIndexedId;
                 } else {
                     minIndexedId = id;
@@ -395,4 +383,13 @@ public class IndexConfiguration {
             lock.unlock();
         }
     }
+
+    private class IndexDirectoryComparator implements Comparator<File> {
+        @Override
+        public int compare(final File o1, final File o2) {
+            final long epochTimestamp1 = getIndexStartTime(o1);
+            final long epochTimestamp2 = getIndexStartTime(o2);
+            return -Long.compare(epochTimestamp1, epochTimestamp2);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/cf8ca3dc/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
index 89e1419..7f1bf8c 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
@@ -108,7 +108,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
     public static final String EVENT_CATEGORY = "Provenance Repository";
     private static final String FILE_EXTENSION = ".prov";
     private static final String TEMP_FILE_SUFFIX = ".prov.part";
-    private static final long PURGE_EVENT_MILLISECONDS = 2500L; //Determines the frequency over which the task to delete old events will occur
+    private static final long PURGE_EVENT_MILLISECONDS = 2500L; // Determines the frequency over which the task to delete old events will occur
     public static final int SERIALIZATION_VERSION = 8;
     public static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+");
     public static final Pattern INDEX_PATTERN = Pattern.compile("index-\\d+");
@@ -404,9 +404,9 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
                 // if this is the first record, try to find out the block index and jump directly to
                 // the block index. This avoids having to read through a lot of data that we don't care about
                 // just to get to the first record that we want.
-                if ( records.isEmpty() ) {
+                if (records.isEmpty()) {
                     final TocReader tocReader = reader.getTocReader();
-                    if ( tocReader != null ) {
+                    if (tocReader != null) {
                         final Integer blockIndex = tocReader.getBlockIndexForEventId(firstRecordId);
                         if (blockIndex != null) {
                             reader.skipToBlock(blockIndex);
@@ -641,7 +641,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
 
             indexManager.close();
 
-            if ( writers != null ) {
+            if (writers != null) {
                 for (final RecordWriter writer : writers) {
                     writer.close();
                 }
@@ -700,7 +700,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
                     // journal will result in corruption!
                     writer.markDirty();
                     dirtyWriterCount.incrementAndGet();
-                    streamStartTime.set(0L);    // force rollover to happen soon.
+                    streamStartTime.set(0L); // force rollover to happen soon.
                     throw t;
                 } finally {
                     writer.unlock();
@@ -912,15 +912,15 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
                 removed.add(baseName);
             } catch (final FileNotFoundException fnf) {
                 logger.warn("Failed to perform Expiration Action {} on Provenance Event file {} because the file no longer exists; will not "
-                        + "perform additional Expiration Actions on this file", currentAction, file);
+                    + "perform additional Expiration Actions on this file", currentAction, file);
                 removed.add(baseName);
             } catch (final Throwable t) {
                 logger.warn("Failed to perform Expiration Action {} on Provenance Event file {} due to {}; will not perform additional "
-                        + "Expiration Actions on this file at this time", currentAction, file, t.toString());
+                    + "Expiration Actions on this file at this time", currentAction, file, t.toString());
                 logger.warn("", t);
                 eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed to perform Expiration Action " + currentAction +
-                        " on Provenance Event file " + file + " due to " + t.toString() + "; will not perform additional Expiration Actions " +
-                        "on this file at this time");
+                    " on Provenance Event file " + file + " due to " + t.toString() + "; will not perform additional Expiration Actions " +
+                    "on this file at this time");
             }
         }
 
@@ -1131,10 +1131,10 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
     protected int getJournalCount() {
         // determine how many 'journals' we have in the journals directories
         int journalFileCount = 0;
-        for ( final File storageDir : configuration.getStorageDirectories() ) {
+        for (final File storageDir : configuration.getStorageDirectories()) {
             final File journalsDir = new File(storageDir, "journals");
             final File[] journalFiles = journalsDir.listFiles();
-            if ( journalFiles != null ) {
+            if (journalFiles != null) {
                 journalFileCount += journalFiles.length;
             }
         }
@@ -1169,12 +1169,12 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
                     writer.close();
                 } catch (final IOException ioe) {
                     logger.warn("Failed to close {} due to {}", writer, ioe.toString());
-                    if ( logger.isDebugEnabled() ) {
+                    if (logger.isDebugEnabled()) {
                         logger.warn("", ioe);
                     }
                 }
             }
-            if ( logger.isDebugEnabled() ) {
+            if (logger.isDebugEnabled()) {
                 logger.debug("Going to merge {} files for journals starting with ID {}", journalsToMerge.size(), LuceneUtil.substringBefore(journalsToMerge.get(0).getName(), "."));
             }
 
@@ -1263,10 +1263,10 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
             // that is no longer the case.
             if (journalFileCount > journalCountThreshold || repoSize > sizeThreshold) {
                 logger.warn("The rate of the dataflow is exceeding the provenance recording rate. "
-                        + "Slowing down flow to accommodate. Currently, there are {} journal files ({} bytes) and "
-                        + "threshold for blocking is {} ({} bytes)", journalFileCount, repoSize, journalCountThreshold, sizeThreshold);
+                    + "Slowing down flow to accommodate. Currently, there are {} journal files ({} bytes) and "
+                    + "threshold for blocking is {} ({} bytes)", journalFileCount, repoSize, journalCountThreshold, sizeThreshold);
                 eventReporter.reportEvent(Severity.WARNING, "Provenance Repository", "The rate of the dataflow is "
-                        + "exceeding the provenance recording rate. Slowing down flow to accommodate");
+                    + "exceeding the provenance recording rate. Slowing down flow to accommodate");
 
                 while (journalFileCount > journalCountThreshold || repoSize > sizeThreshold) {
                     // if a shutdown happens while we are in this loop, kill the rollover thread and break
@@ -1293,15 +1293,15 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
                     }
 
                     logger.debug("Provenance Repository is still behind. Keeping flow slowed down "
-                            + "to accommodate. Currently, there are {} journal files ({} bytes) and "
-                            + "threshold for blocking is {} ({} bytes)", journalFileCount, repoSize, journalCountThreshold, sizeThreshold);
+                        + "to accommodate. Currently, there are {} journal files ({} bytes) and "
+                        + "threshold for blocking is {} ({} bytes)", journalFileCount, repoSize, journalCountThreshold, sizeThreshold);
 
                     journalFileCount = getJournalCount();
                     repoSize = getSize(getLogFiles(), 0L);
                 }
 
                 logger.info("Provenance Repository has now caught up with rolling over journal files. Current number of "
-                        + "journal files to be rolled over is {}", journalFileCount);
+                    + "journal files to be rolled over is {}", journalFileCount);
             }
 
             // we've finished rolling over successfully. Create new writers and reset state.
@@ -1335,7 +1335,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
             }
 
             for (final File journalFile : journalFiles) {
-                if ( journalFile.isDirectory() ) {
+                if (journalFile.isDirectory()) {
                     continue;
                 }
 
@@ -1403,7 +1403,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
      */
     File mergeJournals(final List<File> journalFiles, final File suggestedMergeFile, final EventReporter eventReporter) throws IOException {
         logger.debug("Merging {} to {}", journalFiles, suggestedMergeFile);
-        if ( this.closed.get() ) {
+        if (this.closed.get()) {
             logger.info("Provenance Repository has been closed; will not merge journal files to {}", suggestedMergeFile);
             return null;
         }
@@ -1439,14 +1439,14 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
                 // we have all "partial" files and there is already a merged file. Delete the data from the index
                 // because the merge file may not be fully merged. We will re-merge.
                 logger.warn("Merged Journal File {} already exists; however, all partial journal files also exist "
-                        + "so assuming that the merge did not finish. Repeating procedure in order to ensure consistency.");
+                    + "so assuming that the merge did not finish. Repeating procedure in order to ensure consistency.");
 
                 final DeleteIndexAction deleteAction = new DeleteIndexAction(this, indexConfig, indexManager);
                 try {
                     deleteAction.execute(suggestedMergeFile);
                 } catch (final Exception e) {
                     logger.warn("Failed to delete records from Journal File {} from the index; this could potentially result in duplicates. Failure was due to {}", suggestedMergeFile, e.toString());
-                    if ( logger.isDebugEnabled() ) {
+                    if (logger.isDebugEnabled()) {
                         logger.warn("", e);
                     }
                 }
@@ -1460,18 +1460,18 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
                 }
 
                 final File tocFile = TocUtil.getTocFile(suggestedMergeFile);
-                if ( tocFile.exists() && !tocFile.delete() ) {
+                if (tocFile.exists() && !tocFile.delete()) {
                     logger.error("Failed to delete .toc file {}; this may result in not being able to read the Provenance Events from the {} Journal File. "
                         + "This can be corrected by manually deleting the {} file", tocFile, suggestedMergeFile, tocFile);
                 }
             }
         } else {
             logger.warn("Cannot merge journal files {} because expected first file to end with extension '.0' "
-                    + "but it did not; assuming that the files were already merged but only some finished deletion "
-                    + "before restart. Deleting remaining partial journal files.", journalFiles);
+                + "but it did not; assuming that the files were already merged but only some finished deletion "
+                + "before restart. Deleting remaining partial journal files.", journalFiles);
 
-            for ( final File file : journalFiles ) {
-                if ( !file.delete() && file.exists() ) {
+            for (final File file : journalFiles) {
+                if (!file.delete() && file.exists()) {
                     logger.warn("Failed to delete unneeded journal file {}; this file should be cleaned up manually", file);
                 }
             }
@@ -1529,7 +1529,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
                 } catch (final EOFException eof) {
                 } catch (final Exception e) {
                     logger.warn("Failed to generate Provenance Event Record from Journal due to " + e + "; it's possible that the record wasn't "
-                            + "completely written to the file. This record will be skipped.");
+                        + "completely written to the file. This record will be skipped.");
                     if (logger.isDebugEnabled()) {
                         logger.warn("", e);
                     }
@@ -1544,11 +1544,11 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
                     continue;
                 }
 
-                if ( record.getEventTime() < earliestTimestamp ) {
+                if (record.getEventTime() < earliestTimestamp) {
                     earliestTimestamp = record.getEventTime();
                 }
 
-                if ( record.getEventId() < minEventId ) {
+                if (record.getEventId() < minEventId) {
                     minEventId = record.getEventId();
                 }
 
@@ -1799,7 +1799,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
         final int numQueries = querySubmissionMap.size();
         if (numQueries > MAX_UNDELETED_QUERY_RESULTS) {
             throw new IllegalStateException("Cannot process query because there are currently " + numQueries + " queries whose results have not "
-                    + "been deleted due to poorly behaving clients not issuing DELETE requests. Please try again later.");
+                + "been deleted due to poorly behaving clients not issuing DELETE requests. Please try again later.");
         }
 
         if (query.getEndDate() != null && query.getStartDate() != null && query.getStartDate().getTime() > query.getEndDate().getTime()) {
@@ -1820,7 +1820,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
 
                 Long maxEventId = getMaxEventId();
                 if (maxEventId == null) {
-                    result.getResult().update(Collections.<ProvenanceEventRecord>emptyList(), 0L);
+                    result.getResult().update(Collections.<ProvenanceEventRecord> emptyList(), 0L, 0);
                     maxEventId = 0L;
                 }
                 Long minIndexedId = indexConfig.getMinIdIndexed();
@@ -1830,7 +1830,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
 
                 final long totalNumDocs = maxEventId - minIndexedId;
 
-                result.getResult().update(trimmed, totalNumDocs);
+                result.getResult().update(trimmed, totalNumDocs, 0);
             } else {
                 queryExecService.submit(new GetMostRecentRunnable(query, result));
             }
@@ -1839,18 +1839,18 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
             return result;
         }
 
-        final AtomicInteger retrievalCount = new AtomicInteger(0);
         final List<File> indexDirectories = indexConfig.getIndexDirectories(
-                query.getStartDate() == null ? null : query.getStartDate().getTime(),
-                        query.getEndDate() == null ? null : query.getEndDate().getTime());
+            query.getStartDate() == null ? null : query.getStartDate().getTime(),
+            query.getEndDate() == null ? null : query.getEndDate().getTime());
         final AsyncQuerySubmission result = new AsyncQuerySubmission(query, indexDirectories.size());
         querySubmissionMap.put(query.getIdentifier(), result);
 
         if (indexDirectories.isEmpty()) {
-            result.getResult().update(Collections.<ProvenanceEventRecord>emptyList(), 0L);
+            result.getResult().update(Collections.<ProvenanceEventRecord> emptyList(), 0L, 0);
         } else {
+            int indexId = 0;
             for (final File indexDir : indexDirectories) {
-                queryExecService.submit(new QueryRunnable(query, result, indexDir, retrievalCount));
+                queryExecService.submit(new QueryRunnable(query, result, indexDir, indexId++));
             }
         }
 
@@ -2024,11 +2024,11 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
     }
 
     Lineage computeLineage(final String flowFileUuid) throws IOException {
-        return computeLineage(Collections.<String>singleton(flowFileUuid), LineageComputationType.FLOWFILE_LINEAGE, null, 0L, Long.MAX_VALUE);
+        return computeLineage(Collections.<String> singleton(flowFileUuid), LineageComputationType.FLOWFILE_LINEAGE, null, 0L, Long.MAX_VALUE);
     }
 
     private Lineage computeLineage(final Collection<String> flowFileUuids, final LineageComputationType computationType, final Long eventId, final Long startTimestamp,
-            final Long endTimestamp) throws IOException {
+        final Long endTimestamp) throws IOException {
         final AsyncLineageSubmission submission = submitLineageComputation(flowFileUuids, computationType, eventId, startTimestamp, endTimestamp);
         final StandardLineageResult result = submission.getResult();
         while (!result.isFinished()) {
@@ -2051,7 +2051,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
     }
 
     private AsyncLineageSubmission submitLineageComputation(final Collection<String> flowFileUuids, final LineageComputationType computationType,
-            final Long eventId, final long startTimestamp, final long endTimestamp) {
+        final Long eventId, final long startTimestamp, final long endTimestamp) {
         final List<File> indexDirs = indexConfig.getIndexDirectories(startTimestamp, endTimestamp);
         final AsyncLineageSubmission result = new AsyncLineageSubmission(computationType, eventId, flowFileUuids, indexDirs.size());
         lineageSubmissionMap.put(result.getLineageIdentifier(), result);
@@ -2068,9 +2068,9 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
         try {
             final ProvenanceEventRecord event = getEvent(eventId);
             if (event == null) {
-                final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.<String>emptyList(), 1);
+                final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.<String> emptyList(), 1);
                 lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
-                submission.getResult().update(Collections.<ProvenanceEventRecord>emptyList());
+                submission.getResult().update(Collections.<ProvenanceEventRecord> emptyList());
                 return submission;
             }
 
@@ -2081,13 +2081,13 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
                 case REPLAY:
                     return submitLineageComputation(event.getChildUuids(), LineageComputationType.EXPAND_CHILDREN, eventId, event.getEventTime(), Long.MAX_VALUE);
                 default:
-                    final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.<String>emptyList(), 1);
+                    final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.<String> emptyList(), 1);
                     lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
                     submission.getResult().setError("Event ID " + eventId + " indicates an event of type " + event.getEventType() + " so its children cannot be expanded");
                     return submission;
             }
         } catch (final IOException ioe) {
-            final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.<String>emptyList(), 1);
+            final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.<String> emptyList(), 1);
             lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
 
             if (ioe.getMessage() == null) {
@@ -2105,9 +2105,9 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
         try {
             final ProvenanceEventRecord event = getEvent(eventId);
             if (event == null) {
-                final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.<String>emptyList(), 1);
+                final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.<String> emptyList(), 1);
                 lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
-                submission.getResult().update(Collections.<ProvenanceEventRecord>emptyList());
+                submission.getResult().update(Collections.<ProvenanceEventRecord> emptyList());
                 return submission;
             }
 
@@ -2118,14 +2118,14 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
                 case REPLAY:
                     return submitLineageComputation(event.getParentUuids(), LineageComputationType.EXPAND_PARENTS, eventId, event.getLineageStartDate(), event.getEventTime());
                 default: {
-                    final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections.<String>emptyList(), 1);
+                    final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections.<String> emptyList(), 1);
                     lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
                     submission.getResult().setError("Event ID " + eventId + " indicates an event of type " + event.getEventType() + " so its parents cannot be expanded");
                     return submission;
                 }
             }
         } catch (final IOException ioe) {
-            final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections.<String>emptyList(), 1);
+            final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections.<String> emptyList(), 1);
             lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
 
             if (ioe.getMessage() == null) {
@@ -2248,7 +2248,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
             // get the max indexed event id
             final Long maxEventId = indexConfig.getMaxIdIndexed();
             if (maxEventId == null) {
-                submission.getResult().update(Collections.<ProvenanceEventRecord>emptyList(), 0);
+                submission.getResult().update(Collections.<ProvenanceEventRecord> emptyList(), 0L, 0);
                 return;
             }
 
@@ -2263,7 +2263,9 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
                 final long totalNumDocs = maxEventId - minIndexedId;
 
                 final List<ProvenanceEventRecord> mostRecent = getEvents(startIndex, maxResults);
-                submission.getResult().update(mostRecent, totalNumDocs);
+                // reverse the order so that the newest events come first.
+                Collections.reverse(mostRecent);
+                submission.getResult().update(mostRecent, totalNumDocs, 0);
             } catch (final IOException ioe) {
                 logger.error("Failed to retrieve records from Provenance Repository: " + ioe.toString());
                 if (logger.isDebugEnabled()) {
@@ -2284,24 +2286,28 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
         private final Query query;
         private final AsyncQuerySubmission submission;
         private final File indexDir;
-        private final AtomicInteger retrievalCount;
+        private final int indexId;
 
-        public QueryRunnable(final Query query, final AsyncQuerySubmission submission, final File indexDir, final AtomicInteger retrievalCount) {
+        public QueryRunnable(final Query query, final AsyncQuerySubmission submission, final File indexDir, final int indexId) {
             this.query = query;
             this.submission = submission;
             this.indexDir = indexDir;
-            this.retrievalCount = retrievalCount;
+            this.indexId = indexId;
         }
 
         @Override
         public void run() {
             try {
                 final IndexSearch search = new IndexSearch(PersistentProvenanceRepository.this, indexDir, indexManager, maxAttributeChars);
-                final StandardQueryResult queryResult = search.search(query, retrievalCount, firstEventTimestamp);
-                submission.getResult().update(queryResult.getMatchingEvents(), queryResult.getTotalHitCount());
+                final StandardQueryResult queryResult = search.search(query, firstEventTimestamp);
+
+                logger.debug("Merging query results for indexId {}; before merge, num events = {}", indexId, queryResult.getTotalHitCount());
+                submission.getResult().update(queryResult.getMatchingEvents(), queryResult.getTotalHitCount(), indexId);
+                logger.debug("Merging query results for indexId {}; after merge, num events = {}", indexId, queryResult.getTotalHitCount());
+
                 if (queryResult.isFinished()) {
                     logger.info("Successfully executed Query[{}] against Index {}; Search took {} milliseconds; Total Hits = {}",
-                            query, indexDir, queryResult.getQueryTime(), queryResult.getTotalHitCount());
+                        query, indexDir, queryResult.getQueryTime(), queryResult.getTotalHitCount());
                 }
             } catch (final Throwable t) {
                 logger.error("Failed to query Provenance Repository Index {} due to {}", indexDir, t.toString());
@@ -2344,7 +2350,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
                 result.update(matchingRecords);
 
                 logger.info("Successfully created Lineage for FlowFiles with UUIDs {} in {} milliseconds; Lineage contains {} nodes and {} edges",
-                        flowFileUuids, result.getComputationTime(TimeUnit.MILLISECONDS), result.getNodes().size(), result.getEdges().size());
+                    flowFileUuids, result.getComputationTime(TimeUnit.MILLISECONDS), result.getNodes().size(), result.getEdges().size());
             } catch (final Throwable t) {
                 logger.error("Failed to query provenance repository due to {}", t.toString());
                 if (logger.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/cf8ca3dc/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
index c2a7609..e6e78c1 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java
@@ -22,25 +22,23 @@ import java.io.IOException;
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.IndexableField;
+import org.apache.lucene.search.ScoreDoc;
+import org.apache.lucene.search.TopDocs;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.SearchableFields;
 import org.apache.nifi.provenance.StandardProvenanceEventRecord;
 import org.apache.nifi.provenance.serialization.RecordReader;
 import org.apache.nifi.provenance.serialization.RecordReaders;
 import org.apache.nifi.provenance.toc.TocReader;
-import org.apache.lucene.document.Document;
-import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.index.IndexableField;
-import org.apache.lucene.search.ScoreDoc;
-import org.apache.lucene.search.TopDocs;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -51,11 +49,7 @@ public class DocsReader {
     }
 
     public Set<ProvenanceEventRecord> read(final TopDocs topDocs, final IndexReader indexReader, final Collection<Path> allProvenanceLogFiles,
-            final AtomicInteger retrievalCount, final int maxResults, final int maxAttributeChars) throws IOException {
-        if (retrievalCount.get() >= maxResults) {
-            return Collections.emptySet();
-        }
-
+        final int maxResults, final int maxAttributeChars) throws IOException {
         final long start = System.nanoTime();
         final int numDocs = Math.min(topDocs.scoreDocs.length, maxResults);
         final List<Document> docs = new ArrayList<>(numDocs);
@@ -68,13 +62,13 @@ public class DocsReader {
 
         final long readDocuments = System.nanoTime() - start;
         logger.debug("Reading {} Lucene Documents took {} millis", docs.size(), TimeUnit.NANOSECONDS.toMillis(readDocuments));
-        return read(docs, allProvenanceLogFiles, retrievalCount, maxResults, maxAttributeChars);
+        return read(docs, allProvenanceLogFiles, maxResults, maxAttributeChars);
     }
 
 
     private long getByteOffset(final Document d, final RecordReader reader) {
         final IndexableField blockField = d.getField(FieldNames.BLOCK_INDEX);
-        if ( blockField != null ) {
+        if (blockField != null) {
             final int blockIndex = blockField.numericValue().intValue();
             final TocReader tocReader = reader.getTocReader();
             return tocReader.getBlockOffset(blockIndex);
@@ -86,21 +80,21 @@ public class DocsReader {
 
     private ProvenanceEventRecord getRecord(final Document d, final RecordReader reader) throws IOException {
         final IndexableField blockField = d.getField(FieldNames.BLOCK_INDEX);
-        if ( blockField == null ) {
+        if (blockField == null) {
             reader.skipTo(getByteOffset(d, reader));
         } else {
             reader.skipToBlock(blockField.numericValue().intValue());
         }
 
         StandardProvenanceEventRecord record;
-        while ( (record = reader.nextRecord()) != null) {
+        while ((record = reader.nextRecord()) != null) {
             final IndexableField idField = d.getField(SearchableFields.Identifier.getSearchableFieldName());
-            if ( idField == null || idField.numericValue().longValue() == record.getEventId() ) {
+            if (idField == null || idField.numericValue().longValue() == record.getEventId()) {
                 break;
             }
         }
 
-        if ( record == null ) {
+        if (record == null) {
             throw new IOException("Failed to find Provenance Event " + d);
         } else {
             return record;
@@ -109,10 +103,7 @@ public class DocsReader {
 
 
     public Set<ProvenanceEventRecord> read(final List<Document> docs, final Collection<Path> allProvenanceLogFiles,
-        final AtomicInteger retrievalCount, final int maxResults, final int maxAttributeChars) throws IOException {
-        if (retrievalCount.get() >= maxResults) {
-            return Collections.emptySet();
-        }
+        final int maxResults, final int maxAttributeChars) throws IOException {
 
         LuceneUtil.sortDocsForRetrieval(docs);
 
@@ -129,7 +120,7 @@ public class DocsReader {
         try {
             for (final Document d : docs) {
                 final String storageFilename = d.getField(FieldNames.STORAGE_FILENAME).stringValue();
-                if ( storageFilesToSkip.contains(storageFilename) ) {
+                if (storageFilesToSkip.contains(storageFilename)) {
                     continue;
                 }
 
@@ -137,10 +128,6 @@ public class DocsReader {
                     if (reader != null && storageFilename.equals(lastStorageFilename)) {
                         matchingRecords.add(getRecord(d, reader));
                         eventsReadThisFile++;
-
-                        if ( retrievalCount.incrementAndGet() >= maxResults ) {
-                            break;
-                        }
                     } else {
                         logger.debug("Opening log file {}", storageFilename);
 
@@ -152,14 +139,14 @@ public class DocsReader {
                         final List<File> potentialFiles = LuceneUtil.getProvenanceLogFiles(storageFilename, allProvenanceLogFiles);
                         if (potentialFiles.isEmpty()) {
                             logger.warn("Could not find Provenance Log File with basename {} in the "
-                                    + "Provenance Repository; assuming file has expired and continuing without it", storageFilename);
+                                + "Provenance Repository; assuming file has expired and continuing without it", storageFilename);
                             storageFilesToSkip.add(storageFilename);
                             continue;
                         }
 
                         if (potentialFiles.size() > 1) {
                             throw new FileNotFoundException("Found multiple Provenance Log Files with basename " +
-                                    storageFilename + " in the Provenance Repository");
+                                storageFilename + " in the Provenance Repository");
                         }
 
                         for (final File file : potentialFiles) {
@@ -171,10 +158,6 @@ public class DocsReader {
                                 reader = RecordReaders.newRecordReader(file, allProvenanceLogFiles, maxAttributeChars);
                                 matchingRecords.add(getRecord(d, reader));
                                 eventsReadThisFile = 1;
-
-                                if ( retrievalCount.incrementAndGet() >= maxResults ) {
-                                    break;
-                                }
                             } catch (final IOException e) {
                                 throw new IOException("Failed to retrieve record " + d + " from Provenance File " + file + " due to " + e, e);
                             }

http://git-wip-us.apache.org/repos/asf/nifi/blob/cf8ca3dc/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
index 7fcd8ab..c0ca8a7 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexSearch.java
@@ -19,17 +19,22 @@ package org.apache.nifi.provenance.lucene;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Date;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.lucene.search.IndexSearcher;
 import org.apache.lucene.search.Query;
+import org.apache.lucene.search.Sort;
+import org.apache.lucene.search.SortField;
+import org.apache.lucene.search.SortField.Type;
 import org.apache.lucene.search.TopDocs;
 import org.apache.nifi.provenance.PersistentProvenanceRepository;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.SearchableFields;
 import org.apache.nifi.provenance.StandardQueryResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -48,7 +53,7 @@ public class IndexSearch {
         this.maxAttributeChars = maxAttributeChars;
     }
 
-    public StandardQueryResult search(final org.apache.nifi.provenance.search.Query provenanceQuery, final AtomicInteger retrievedCount, final long firstEventTimestamp) throws IOException {
+    public StandardQueryResult search(final org.apache.nifi.provenance.search.Query provenanceQuery, final long firstEventTimestamp) throws IOException {
         if (!indexDirectory.exists() && !indexDirectory.mkdirs()) {
             throw new IOException("Unable to create Indexing Directory " + indexDirectory);
         }
@@ -57,7 +62,6 @@ public class IndexSearch {
         }
 
         final StandardQueryResult sqr = new StandardQueryResult(provenanceQuery, 1);
-        final Set<ProvenanceEventRecord> matchingRecords;
 
         // we need to set the start date because if we do not, the first index may still have events that have aged off from
         // the repository, and we don't want those events to count toward the total number of matches.
@@ -77,38 +81,47 @@ public class IndexSearch {
             final long searchStartNanos = System.nanoTime();
             final long openSearcherNanos = searchStartNanos - start;
 
-            final TopDocs topDocs = searcher.search(luceneQuery, provenanceQuery.getMaxResults());
+            final Sort sort = new Sort(new SortField(SearchableFields.Identifier.getSearchableFieldName(), Type.LONG, true));
+            final TopDocs topDocs = searcher.search(luceneQuery, provenanceQuery.getMaxResults(), sort);
             final long finishSearch = System.nanoTime();
             final long searchNanos = finishSearch - searchStartNanos;
 
             logger.debug("Searching {} took {} millis; opening searcher took {} millis", this,
-                    TimeUnit.NANOSECONDS.toMillis(searchNanos), TimeUnit.NANOSECONDS.toMillis(openSearcherNanos));
+                TimeUnit.NANOSECONDS.toMillis(searchNanos), TimeUnit.NANOSECONDS.toMillis(openSearcherNanos));
 
             if (topDocs.totalHits == 0) {
-                sqr.update(Collections.<ProvenanceEventRecord>emptyList(), 0);
+                sqr.update(Collections.<ProvenanceEventRecord> emptyList(), 0, 0);
                 return sqr;
             }
 
             final DocsReader docsReader = new DocsReader(repository.getConfiguration().getStorageDirectories());
-            matchingRecords = docsReader.read(topDocs, searcher.getIndexReader(), repository.getAllLogFiles(), retrievedCount,
+            final Set<ProvenanceEventRecord> matchingRecords = docsReader.read(topDocs, searcher.getIndexReader(), repository.getAllLogFiles(),
                 provenanceQuery.getMaxResults(), maxAttributeChars);
 
             final long readRecordsNanos = System.nanoTime() - finishSearch;
             logger.debug("Reading {} records took {} millis for {}", matchingRecords.size(), TimeUnit.NANOSECONDS.toMillis(readRecordsNanos), this);
 
-            sqr.update(matchingRecords, topDocs.totalHits);
+            // The records returned are going to be in a sorted set. The sort order will be dependent on
+            // the ID of the events, which is also approximately the same as the timestamp of the event (i.e.
+            // it's ordered by the time when the event was inserted into the repo, not the time when the event took
+            // place). We want to reverse this so that we get the newest events first, so we have to first create a
+            // new List object to hold the events, and then reverse the list.
+            final List<ProvenanceEventRecord> recordList = new ArrayList<>(matchingRecords);
+            Collections.reverse(recordList);
+
+            sqr.update(recordList, topDocs.totalHits, 0);
             return sqr;
         } catch (final FileNotFoundException e) {
             // nothing has been indexed yet, or the data has already aged off
             logger.warn("Attempted to search Provenance Index {} but could not find the file due to {}", indexDirectory, e);
-            if ( logger.isDebugEnabled() ) {
+            if (logger.isDebugEnabled()) {
                 logger.warn("", e);
             }
 
-            sqr.update(Collections.<ProvenanceEventRecord>emptyList(), 0);
+            sqr.update(Collections.<ProvenanceEventRecord> emptyList(), 0, 0);
             return sqr;
         } finally {
-            if ( searcher != null ) {
+            if (searcher != null) {
                 indexManager.returnIndexSearcher(indexDirectory, searcher);
             }
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/cf8ca3dc/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java
index e9e6e63..e1996f6 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/LineageQuery.java
@@ -25,7 +25,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.lucene.index.Term;
 import org.apache.lucene.search.BooleanClause.Occur;
@@ -95,11 +94,11 @@ public class LineageQuery {
 
                 final DocsReader docsReader = new DocsReader(repo.getConfiguration().getStorageDirectories());
                 final Set<ProvenanceEventRecord> recs = docsReader.read(uuidQueryTopDocs, searcher.getIndexReader(), repo.getAllLogFiles(),
-                    new AtomicInteger(0), Integer.MAX_VALUE, maxAttributeChars);
+                    Integer.MAX_VALUE, maxAttributeChars);
 
                 final long readDocsEnd = System.nanoTime();
                 logger.debug("Finished Lineage Query against {}; Lucene search took {} millis, reading records took {} millis",
-                        indexDirectory, TimeUnit.NANOSECONDS.toMillis(searchEnd - searchStart), TimeUnit.NANOSECONDS.toMillis(readDocsEnd - searchEnd));
+                    indexDirectory, TimeUnit.NANOSECONDS.toMillis(searchEnd - searchStart), TimeUnit.NANOSECONDS.toMillis(readDocsEnd - searchEnd));
 
                 return recs;
             } finally {
@@ -108,7 +107,7 @@ public class LineageQuery {
         } catch (final FileNotFoundException fnfe) {
             // nothing has been indexed yet, or the data has already aged off
             logger.warn("Attempted to search Provenance Index {} but could not find the file due to {}", indexDirectory, fnfe);
-            if ( logger.isDebugEnabled() ) {
+            if (logger.isDebugEnabled()) {
                 logger.warn("", fnfe);
             }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/cf8ca3dc/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
index 6875743..036e97f 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue;
 
 import java.io.File;
 import java.io.FileFilter;
+import java.io.FilenameFilter;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -117,16 +118,16 @@ public class TestPersistentProvenanceRepository {
         // Delete all of the storage files. We do this in order to clean up the tons of files that
         // we create but also to ensure that we have closed all of the file handles. If we leave any
         // streams open, for instance, this will throw an IOException, causing our unit test to fail.
-        for ( final File storageDir : config.getStorageDirectories() ) {
+        for (final File storageDir : config.getStorageDirectories()) {
             int i;
-            for (i=0; i < 3; i++) {
+            for (i = 0; i < 3; i++) {
                 try {
                     FileUtils.deleteFile(storageDir, true);
                     break;
                 } catch (final IOException ioe) {
                     // if there is a virus scanner, etc. running in the background we may not be able to
                     // delete the file. Wait a sec and try again.
-                    if ( i == 2 ) {
+                    if (i == 2) {
                         throw ioe;
                     } else {
                         try {
@@ -441,7 +442,7 @@ public class TestPersistentProvenanceRepository {
         repo.waitForRollover();
 
         final Query query = new Query(UUID.randomUUID().toString());
-        //        query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.FlowFileUUID, "00000000-0000-0000-0000*"));
+        // query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.FlowFileUUID, "00000000-0000-0000-0000*"));
         query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.Filename, "file-*"));
         query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, "12?4"));
         query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.TransitURI, "nifi://*"));
@@ -603,7 +604,7 @@ public class TestPersistentProvenanceRepository {
 
         repo.purgeOldEvents();
 
-        Thread.sleep(2000L);    // purge is async. Give it time to do its job.
+        Thread.sleep(2000L); // purge is async. Give it time to do its job.
 
         query.setMaxResults(100);
         final QuerySubmission noResultSubmission = repo.submitQuery(query);
@@ -614,6 +615,152 @@ public class TestPersistentProvenanceRepository {
         assertEquals(0, noResultSubmission.getResult().getTotalHitCount());
     }
 
+
+    @Test
+    public void testEventsAreOrdered() throws IOException, InterruptedException, ParseException {
+        final RepositoryConfiguration config = createConfiguration();
+        config.setMaxRecordLife(30, TimeUnit.SECONDS);
+        config.setMaxStorageCapacity(1024L * 1024L);
+        config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS);
+        config.setMaxEventFileCapacity(1024L * 1024L);
+        config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields()));
+
+        repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
+        repo.initialize(getEventReporter());
+
+        final String uuid = "00000000-0000-0000-0000-000000000000";
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("abc", "xyz");
+        attributes.put("xyz", "abc");
+        attributes.put("filename", "file-" + uuid);
+
+        final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
+        builder.setEventTime(System.currentTimeMillis());
+        builder.setEventType(ProvenanceEventType.RECEIVE);
+        builder.setTransitUri("nifi://unit-test");
+        attributes.put("uuid", uuid);
+        builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
+        builder.setComponentId("1234");
+        builder.setComponentType("dummy processor");
+
+        for (int i = 0; i < 10; i++) {
+            builder.fromFlowFile(createFlowFile(i, 3000L, attributes));
+            builder.setEventTime(System.currentTimeMillis());
+            repo.registerEvent(builder.build());
+            Thread.sleep(20);
+        }
+
+        // Give time for rollover to happen
+        repo.waitForRollover();
+
+        // Perform a "Most Recent Events" Query
+        final Query query = new Query(UUID.randomUUID().toString());
+        query.setMaxResults(100);
+
+        final QueryResult result = repo.queryEvents(query);
+        assertEquals(10, result.getMatchingEvents().size());
+
+        final List<ProvenanceEventRecord> matchingEvents = result.getMatchingEvents();
+        long timestamp = matchingEvents.get(0).getEventTime();
+
+        for (final ProvenanceEventRecord record : matchingEvents) {
+            assertTrue(record.getEventTime() <= timestamp);
+            timestamp = record.getEventTime();
+        }
+
+        // Perform a Query for a particular component, so that this doesn't just get the most recent events
+        // and has to actually hit Lucene.
+        final Query query2 = new Query(UUID.randomUUID().toString());
+        query2.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, "1234"));
+        query2.setMaxResults(100);
+        final QueryResult result2 = repo.queryEvents(query2);
+        assertEquals(10, result2.getMatchingEvents().size());
+
+        final List<ProvenanceEventRecord> matchingEvents2 = result2.getMatchingEvents();
+        timestamp = matchingEvents2.get(0).getEventTime();
+
+        for (final ProvenanceEventRecord record : matchingEvents2) {
+            assertTrue(record.getEventTime() <= timestamp);
+            timestamp = record.getEventTime();
+        }
+    }
+
+
+    @Test
+    public void testEventsAreOrderedAcrossMultipleIndexes() throws IOException, InterruptedException, ParseException {
+        final RepositoryConfiguration config = createConfiguration();
+        config.setMaxRecordLife(30, TimeUnit.SECONDS);
+        config.setMaxStorageCapacity(1024L * 1024L);
+        config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS);
+        config.setMaxEventFileCapacity(1024L * 1024L);
+        config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields()));
+        config.setDesiredIndexSize(1L);
+
+        repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
+        repo.initialize(getEventReporter());
+
+        final String uuid = "00000000-0000-0000-0000-000000000000";
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("abc", "xyz");
+        attributes.put("xyz", "abc");
+        attributes.put("filename", "file-" + uuid);
+
+        final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
+        builder.setEventTime(System.currentTimeMillis());
+        builder.setEventType(ProvenanceEventType.RECEIVE);
+        builder.setTransitUri("nifi://unit-test");
+        attributes.put("uuid", uuid);
+        builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
+        builder.setComponentId("1234");
+        builder.setComponentType("dummy processor");
+
+        for (int i = 0; i < 10; i++) {
+            builder.fromFlowFile(createFlowFile(i, 3000L, attributes));
+            builder.setEventTime(System.currentTimeMillis());
+            repo.registerEvent(builder.build());
+            Thread.sleep(20);
+        }
+
+        // Give time for rollover to happen
+        repo.waitForRollover();
+
+        for (int i = 0; i < 10; i++) {
+            builder.fromFlowFile(createFlowFile(i, 3000L, attributes));
+            builder.setEventTime(System.currentTimeMillis());
+            repo.registerEvent(builder.build());
+            Thread.sleep(20);
+        }
+
+        repo.waitForRollover();
+
+        // Verify that multiple indexes exist
+        final File storageDir = config.getStorageDirectories().get(0);
+        final File[] subDirs = storageDir.listFiles(new FilenameFilter() {
+            @Override
+            public boolean accept(final File dir, final String name) {
+                return name.startsWith("index-");
+            }
+        });
+        assertEquals(2, subDirs.length);
+
+        // Perform a Query for a particular component, so that this doesn't just get the most recent events
+        // and has to actually hit Lucene.
+        final Query query = new Query(UUID.randomUUID().toString());
+        query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.ComponentID, "1234"));
+        query.setMaxResults(100);
+        final QueryResult result = repo.queryEvents(query);
+        assertEquals(20, result.getMatchingEvents().size());
+
+        final List<ProvenanceEventRecord> matchingEvents = result.getMatchingEvents();
+        long timestamp = matchingEvents.get(0).getEventTime();
+
+        for (final ProvenanceEventRecord record : matchingEvents) {
+            assertTrue(record.getEventTime() <= timestamp);
+            timestamp = record.getEventTime();
+        }
+    }
+
+
     @Test
     public void testIndexAndCompressOnRolloverAndSubsequentEmptySearch() throws IOException, InterruptedException, ParseException {
         final RepositoryConfiguration config = createConfiguration();
@@ -939,7 +1086,7 @@ public class TestPersistentProvenanceRepository {
         config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS);
         config.setMaxEventFileCapacity(1024L * 1024L);
         config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields()));
-        config.setDesiredIndexSize(10);  // force new index to be created for each rollover
+        config.setDesiredIndexSize(10); // force new index to be created for each rollover
 
         repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
         repo.initialize(getEventReporter());
@@ -961,7 +1108,7 @@ public class TestPersistentProvenanceRepository {
         for (int i = 0; i < 10; i++) {
             attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + i);
             builder.fromFlowFile(createFlowFile(i, 3000L, attributes));
-            builder.setEventTime(10L);  // make sure the events are destroyed when we call purge
+            builder.setEventTime(10L); // make sure the events are destroyed when we call purge
             repo.registerEvent(builder.build());
         }
 
@@ -1019,7 +1166,7 @@ public class TestPersistentProvenanceRepository {
     @Test
     public void testBackPressure() throws IOException, InterruptedException {
         final RepositoryConfiguration config = createConfiguration();
-        config.setMaxEventFileCapacity(1L);  // force rollover on each record.
+        config.setMaxEventFileCapacity(1L); // force rollover on each record.
         config.setJournalCount(1);
 
         final AtomicInteger journalCountRef = new AtomicInteger(0);

http://git-wip-us.apache.org/repos/asf/nifi/blob/cf8ca3dc/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java
index f4f9d12..ac13f08 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java
@@ -373,7 +373,7 @@ public class VolatileProvenanceRepository implements ProvenanceEventRepository {
     }
 
     public Lineage computeLineage(final String flowFileUUID) throws IOException {
-        return computeLineage(Collections.<String>singleton(flowFileUUID), LineageComputationType.FLOWFILE_LINEAGE, null);
+        return computeLineage(Collections.<String> singleton(flowFileUUID), LineageComputationType.FLOWFILE_LINEAGE, null);
     }
 
     private Lineage computeLineage(final Collection<String> flowFileUuids, final LineageComputationType computationType, final Long eventId) throws IOException {
@@ -411,9 +411,9 @@ public class VolatileProvenanceRepository implements ProvenanceEventRepository {
     public ComputeLineageSubmission submitExpandParents(final long eventId) {
         final ProvenanceEventRecord event = getEvent(eventId);
         if (event == null) {
-            final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections.<String>emptyList(), 1);
+            final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections.<String> emptyList(), 1);
             lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
-            submission.getResult().update(Collections.<ProvenanceEventRecord>emptyList());
+            submission.getResult().update(Collections.<ProvenanceEventRecord> emptyList());
             return submission;
         }
 
@@ -424,7 +424,7 @@ public class VolatileProvenanceRepository implements ProvenanceEventRepository {
             case CLONE:
                 return submitLineageComputation(event.getParentUuids(), LineageComputationType.EXPAND_PARENTS, eventId);
             default: {
-                final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections.<String>emptyList(), 1);
+                final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections.<String> emptyList(), 1);
                 lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
                 submission.getResult().setError("Event ID " + eventId + " indicates an event of type " + event.getEventType() + " so its parents cannot be expanded");
                 return submission;
@@ -440,9 +440,9 @@ public class VolatileProvenanceRepository implements ProvenanceEventRepository {
     public ComputeLineageSubmission submitExpandChildren(final long eventId) {
         final ProvenanceEventRecord event = getEvent(eventId);
         if (event == null) {
-            final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.<String>emptyList(), 1);
+            final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.<String> emptyList(), 1);
             lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
-            submission.getResult().update(Collections.<ProvenanceEventRecord>emptyList());
+            submission.getResult().update(Collections.<ProvenanceEventRecord> emptyList());
             return submission;
         }
 
@@ -453,7 +453,7 @@ public class VolatileProvenanceRepository implements ProvenanceEventRepository {
             case CLONE:
                 return submitLineageComputation(event.getChildUuids(), LineageComputationType.EXPAND_CHILDREN, eventId);
             default: {
-                final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.<String>emptyList(), 1);
+                final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.<String> emptyList(), 1);
                 lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
                 submission.getResult().setError("Event ID " + eventId + " indicates an event of type " + event.getEventType() + " so its children cannot be expanded");
                 return submission;
@@ -526,7 +526,7 @@ public class VolatileProvenanceRepository implements ProvenanceEventRepository {
 
             }, IterationDirection.BACKWARD);
 
-            submission.getResult().update(matchingRecords, matchingCount.get());
+            submission.getResult().update(matchingRecords, matchingCount.get(), 0);
         }
     }