You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2016/07/05 18:59:23 UTC

nifi git commit: NIFI-1877, NIFI-1306: Add fields to FlowFile for FIFO Prioritizer, Oldest/Newest FlowFile first prioritizers to work properly

Repository: nifi
Updated Branches:
  refs/heads/master 7e63b0036 -> 32b8a9b9f


NIFI-1877, NIFI-1306: Add fields to FlowFile for FIFO Prioritizer, Oldest/Newest FlowFile first prioritizers to work properly

This closes #546


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/32b8a9b9
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/32b8a9b9
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/32b8a9b9

Branch: refs/heads/master
Commit: 32b8a9b9f4c07b4f5d3049c33880b547481cbb24
Parents: 7e63b00
Author: Mark Payne <ma...@hotmail.com>
Authored: Tue Jul 5 14:56:22 2016 -0400
Committer: Matt Burgess <ma...@apache.org>
Committed: Tue Jul 5 14:59:08 2016 -0400

----------------------------------------------------------------------
 .../java/org/apache/nifi/flowfile/FlowFile.java | 26 +++++++++++
 .../java/org/apache/nifi/util/MockFlowFile.java | 10 +++++
 .../nifi/controller/FileSystemSwapManager.java  | 27 ++++++++++--
 .../apache/nifi/controller/FlowController.java  |  2 +-
 .../repository/StandardFlowFileRecord.java      | 25 ++++++++++-
 .../repository/StandardProcessSession.java      | 23 +++++++++-
 .../WriteAheadFlowFileRepository.java           | 45 +++++++++++++++++---
 .../controller/TestFileSystemSwapManager.java   | 10 +++++
 .../controller/TestStandardFlowFileQueue.java   | 10 +++++
 .../org/apache/nifi/provenance/TestUtil.java    | 10 +++++
 .../TestVolatileProvenanceRepository.java       | 10 +++++
 .../prioritizer/FirstInFirstOutPrioritizer.java |  7 ++-
 .../NewestFlowFileFirstPrioritizer.java         |  2 +-
 .../OldestFlowFileFirstPrioritizer.java         |  2 +-
 14 files changed, 193 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/32b8a9b9/nifi-api/src/main/java/org/apache/nifi/flowfile/FlowFile.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/flowfile/FlowFile.java b/nifi-api/src/main/java/org/apache/nifi/flowfile/FlowFile.java
index 1288d21..43fbf20 100644
--- a/nifi-api/src/main/java/org/apache/nifi/flowfile/FlowFile.java
+++ b/nifi-api/src/main/java/org/apache/nifi/flowfile/FlowFile.java
@@ -53,6 +53,19 @@ public interface FlowFile extends Comparable<FlowFile> {
     long getLineageStartDate();
 
     /**
+     * Returns a 64-bit integer that indicates the order in which the FlowFile was added to the
+     * flow with respect to other FlowFiles that have the same last lineage start date.
+     * I.e., if two FlowFiles return the same value for {@link #getLineageStartDate()}, the order
+     * in which those FlowFiles were added to the flow can be determined by looking at the result of
+     * this method. However, no guarantee is made by this method about the ordering of FlowFiles
+     * that have different values for the {@link #getLineageStartDate()} method.
+     *
+     * @return the index that can be used to compare two FlowFiles with the same lineage start date
+     * to understand the order in which the two FlowFiles were enqueued.
+     */
+    long getLineageStartIndex();
+
+    /**
      * @return the time at which the FlowFile was most recently added to a
      * FlowFile queue, or {@code null} if the FlowFile has never been enqueued.
      * This value will always be populated before it is passed to a
@@ -61,6 +74,19 @@ public interface FlowFile extends Comparable<FlowFile> {
     Long getLastQueueDate();
 
     /**
+     * Returns a 64-bit integer that indicates the order in which the FlowFile was added to the
+     * FlowFile queue with respect to other FlowFiles that have the same last queue date.
+     * I.e., if two FlowFiles return the same value for {@link #getLastQueueDate()}, the order
+     * in which those FlowFiles were enqueued can be determined by looking at the result of
+     * this method. However, no guarantee is made by this method about the ordering of FlowFiles
+     * that have different values for the {@link #getLastQueueDate()} method.
+     *
+     * @return the index that can be used to compare two FlowFiles with the same last queue date
+     * to understand the order in which the two FlowFiles were enqueued.
+     */
+    long getQueueDateIndex();
+
+    /**
      * <p>
      * If a FlowFile is derived from multiple "parent" FlowFiles, all of the
      * parents' Lineage Identifiers will be in the set.

http://git-wip-us.apache.org/repos/asf/nifi/blob/32b8a9b9/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java
----------------------------------------------------------------------
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java
index 41bcc74..049c65b 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java
@@ -290,4 +290,14 @@ public class MockFlowFile implements FlowFileRecord {
     public long getContentClaimOffset() {
         return 0;
     }
+
+    @Override
+    public long getLineageStartIndex() {
+        return 0;
+    }
+
+    @Override
+    public long getQueueDateIndex() {
+        return 0;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/32b8a9b9/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
index 156389b..5234f0e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
@@ -74,7 +74,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
     private static final Pattern SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+\\.swap");
     private static final Pattern TEMP_SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+\\.swap\\.part");
 
-    public static final int SWAP_ENCODING_VERSION = 8;
+    public static final int SWAP_ENCODING_VERSION = 9;
     public static final String EVENT_CATEGORY = "Swap FlowFiles";
     private static final Logger logger = LoggerFactory.getLogger(FileSystemSwapManager.class);
 
@@ -328,7 +328,9 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
                 }
 
                 out.writeLong(flowFile.getLineageStartDate());
+                out.writeLong(flowFile.getLineageStartIndex());
                 out.writeLong(flowFile.getLastQueueDate());
+                out.writeLong(flowFile.getQueueDateIndex());
                 out.writeLong(flowFile.getSize());
 
                 final ContentClaim claim = flowFile.getContentClaim();
@@ -447,10 +449,29 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
                         lineageIdentifiers.add(in.readUTF());
                     }
                     ffBuilder.lineageIdentifiers(lineageIdentifiers);
-                    ffBuilder.lineageStartDate(in.readLong());
+
+                    // version 9 adds in a 'lineage start index'
+                    final long lineageStartDate = in.readLong();
+                    final long lineageStartIndex;
+                    if (serializationVersion > 8) {
+                        lineageStartIndex = in.readLong();
+                    } else {
+                        lineageStartIndex = 0L;
+                    }
+
+                    ffBuilder.lineageStart(lineageStartDate, lineageStartIndex);
 
                     if (serializationVersion > 5) {
-                        ffBuilder.lastQueueDate(in.readLong());
+                        // Version 9 adds in a 'queue date index'
+                        final long lastQueueDate = in.readLong();
+                        final long queueDateIndex;
+                        if (serializationVersion > 8) {
+                            queueDateIndex = in.readLong();
+                        } else {
+                            queueDateIndex = 0L;
+                        }
+
+                        ffBuilder.lastQueued(lastQueueDate, queueDateIndex);
                     }
                 }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/32b8a9b9/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 3d2eca2..8c710fa 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -3671,7 +3671,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
             .entryDate(System.currentTimeMillis())
             .id(flowFileRepository.getNextFlowFileSequence())
             .lineageIdentifiers(lineageIdentifiers)
-            .lineageStartDate(event.getLineageStartDate())
+            .lineageStart(event.getLineageStartDate(), 0L)
             .size(contentSize.longValue())
             // Create a new UUID and add attributes indicating that this is a replay
             .addAttribute("flowfile.replay", "true")

http://git-wip-us.apache.org/repos/asf/nifi/blob/32b8a9b9/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java
index d3d8d40..cd1ba14 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java
@@ -48,6 +48,7 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord {
     private final long id;
     private final long entryDate;
     private final long lineageStartDate;
+    private final long lineageStartIndex;
     private final Set<String> lineageIdentifiers;
     private final long size;
     private final long penaltyExpirationMs;
@@ -55,18 +56,21 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord {
     private final ContentClaim claim;
     private final long claimOffset;
     private final long lastQueueDate;
+    private final long queueDateIndex;
 
     private StandardFlowFileRecord(final Builder builder) {
         this.id = builder.bId;
         this.attributes = builder.bAttributes;
         this.entryDate = builder.bEntryDate;
         this.lineageStartDate = builder.bLineageStartDate;
+        this.lineageStartIndex = builder.bLineageStartIndex;
         this.lineageIdentifiers = builder.bLineageIdentifiers;
         this.penaltyExpirationMs = builder.bPenaltyExpirationMs;
         this.size = builder.bSize;
         this.claim = builder.bClaim;
         this.claimOffset = builder.bClaimOffset;
         this.lastQueueDate = builder.bLastQueueDate;
+        this.queueDateIndex = builder.bQueueDateIndex;
     }
 
     @Override
@@ -124,6 +128,16 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord {
         return this.claimOffset;
     }
 
+    @Override
+    public long getLineageStartIndex() {
+        return lineageStartIndex;
+    }
+
+    @Override
+    public long getQueueDateIndex() {
+        return queueDateIndex;
+    }
+
     /**
      * Provides the natural ordering for FlowFile objects which is based on their identifier.
      *
@@ -167,6 +181,7 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord {
         private long bId;
         private long bEntryDate = System.currentTimeMillis();
         private long bLineageStartDate = bEntryDate;
+        private long bLineageStartIndex = 0L;
         private final Set<String> bLineageIdentifiers = new HashSet<>();
         private long bPenaltyExpirationMs = -1L;
         private long bSize = 0L;
@@ -174,6 +189,7 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord {
         private ContentClaim bClaim = null;
         private long bClaimOffset = 0L;
         private long bLastQueueDate = System.currentTimeMillis();
+        private long bQueueDateIndex = 0L;
 
         public Builder id(final long id) {
             bId = id;
@@ -204,8 +220,9 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord {
             return this;
         }
 
-        public Builder lineageStartDate(final long lineageStartDate) {
+        public Builder lineageStart(final long lineageStartDate, final long lineageStartIndex) {
             bLineageStartDate = lineageStartDate;
+            bLineageStartIndex = lineageStartIndex;
             return this;
         }
 
@@ -298,8 +315,9 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord {
             return this;
         }
 
-        public Builder lastQueueDate(final long lastQueueDate) {
+        public Builder lastQueued(final long lastQueueDate, final long queueDateIndex) {
             this.bLastQueueDate = lastQueueDate;
+            this.bQueueDateIndex = queueDateIndex;
             return this;
         }
 
@@ -310,6 +328,7 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord {
             bId = specFlowFile.getId();
             bEntryDate = specFlowFile.getEntryDate();
             bLineageStartDate = specFlowFile.getLineageStartDate();
+            bLineageStartIndex = specFlowFile.getLineageStartIndex();
             bLineageIdentifiers.clear();
             bLineageIdentifiers.addAll(specFlowFile.getLineageIdentifiers());
             bPenaltyExpirationMs = specFlowFile.getPenaltyExpirationMillis();
@@ -317,6 +336,8 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord {
             bAttributes.putAll(specFlowFile.getAttributes());
             bClaim = specFlowFile.getContentClaim();
             bClaimOffset = specFlowFile.getContentClaimOffset();
+            bLastQueueDate = specFlowFile.getLastQueueDate();
+            bQueueDateIndex = specFlowFile.getQueueDateIndex();
 
             return this;
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/32b8a9b9/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 062e515..f6eca71 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -92,6 +92,7 @@ import org.slf4j.LoggerFactory;
 public final class StandardProcessSession implements ProcessSession, ProvenanceEventEnricher {
 
     private static final AtomicLong idGenerator = new AtomicLong(0L);
+    private static final AtomicLong enqueuedIndex = new AtomicLong(0L);
 
     // determines how many things must be transferred, removed, modified in order to avoid logging the FlowFile ID's on commit/rollback
     public static final int VERBOSE_LOG_THRESHOLD = 10;
@@ -1526,11 +1527,18 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
         return newFile;
     }
 
+    private void updateLastQueuedDate(final StandardRepositoryRecord record) {
+        final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent())
+            .lastQueued(System.currentTimeMillis(), enqueuedIndex.getAndIncrement()).build();
+        record.setWorking(newFile);
+    }
+
     @Override
     public void transfer(final FlowFile flowFile, final Relationship relationship) {
         validateRecordState(flowFile);
         final StandardRepositoryRecord record = records.get(flowFile);
         record.setTransferRelationship(relationship);
+        updateLastQueuedDate(record);
         final int numDestinations = context.getConnections(relationship).size();
         final int multiplier = Math.max(1, numDestinations);
 
@@ -1560,6 +1568,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
             throw new IllegalArgumentException("Cannot transfer FlowFiles that are created in this Session back to self");
         }
         record.setTransferRelationship(Relationship.SELF);
+        updateLastQueuedDate(record);
     }
 
     @Override
@@ -1589,6 +1598,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
         for (final FlowFile flowFile : flowFiles) {
             final StandardRepositoryRecord record = records.get(flowFile);
             record.setTransferRelationship(relationship);
+            updateLastQueuedDate(record);
+
             contentSize += flowFile.getSize() * multiplier;
         }
 
@@ -2482,7 +2493,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
         final Set<String> lineageIdentifiers = new HashSet<>(parent.getLineageIdentifiers());
         lineageIdentifiers.add(parent.getAttribute(CoreAttributes.UUID.key()));
         fFileBuilder.lineageIdentifiers(lineageIdentifiers);
-        fFileBuilder.lineageStartDate(parent.getLineageStartDate());
+        fFileBuilder.lineageStart(parent.getLineageStartDate(), parent.getLineageStartIndex());
         fFileBuilder.addAttributes(newAttributes);
 
         final FlowFileRecord fFile = fFileBuilder.build();
@@ -2516,6 +2527,14 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
             }
         }
 
+        // find the smallest lineage start index that has the same lineage start date as the one we've chosen.
+        long lineageStartIndex = 0L;
+        for (final FlowFile parent : parents) {
+            if (parent.getLineageStartDate() == lineageStartDate && parent.getLineageStartIndex() < lineageStartIndex) {
+                lineageStartIndex = parent.getLineageStartIndex();
+            }
+        }
+
         newAttributes.put(CoreAttributes.FILENAME.key(), String.valueOf(System.nanoTime()));
         newAttributes.put(CoreAttributes.PATH.key(), DEFAULT_FLOWFILE_PATH);
         newAttributes.put(CoreAttributes.UUID.key(), UUID.randomUUID().toString());
@@ -2523,7 +2542,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
         final FlowFileRecord fFile = new StandardFlowFileRecord.Builder().id(context.getNextFlowFileSequence())
             .addAttributes(newAttributes)
             .lineageIdentifiers(lineageIdentifiers)
-            .lineageStartDate(lineageStartDate)
+            .lineageStart(lineageStartDate, lineageStartIndex)
             .build();
 
         final StandardRepositoryRecord record = new StandardRepositoryRecord(null);

http://git-wip-us.apache.org/repos/asf/nifi/blob/32b8a9b9/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
index 211baa7..0f40cbb 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
@@ -388,6 +388,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
     }
 
     private static class WriteAheadRecordSerde implements SerDe<RepositoryRecord> {
+        private static final int CURRENT_ENCODING_VERSION = 8;
 
         public static final byte ACTION_CREATE = 0;
         public static final byte ACTION_UPDATE = 1;
@@ -474,9 +475,11 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
             }
 
             out.writeLong(flowFile.getLineageStartDate());
+            out.writeLong(flowFile.getLineageStartIndex());
 
             final Long queueDate = flowFile.getLastQueueDate();
             out.writeLong(queueDate == null ? System.currentTimeMillis() : queueDate);
+            out.writeLong(flowFile.getQueueDateIndex());
             out.writeLong(flowFile.getSize());
 
             if (associatedQueue == null) {
@@ -552,10 +555,26 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
                     lineageIdentifiers.add(in.readUTF());
                 }
                 ffBuilder.lineageIdentifiers(lineageIdentifiers);
-                ffBuilder.lineageStartDate(in.readLong());
+
+                final long lineageStartDate = in.readLong();
+                final long lineageStartIndex;
+                if (version > 7) {
+                    lineageStartIndex = in.readLong();
+                } else {
+                    lineageStartIndex = 0L;
+                }
+                ffBuilder.lineageStart(lineageStartDate, lineageStartIndex);
 
                 if (version > 5) {
-                    ffBuilder.lastQueueDate(in.readLong());
+                    final long lastQueueDate = in.readLong();
+                    final long queueDateIndex;
+                    if (version > 7) {
+                        queueDateIndex = in.readLong();
+                    } else {
+                        queueDateIndex = 0L;
+                    }
+
+                    ffBuilder.lastQueued(lastQueueDate, queueDateIndex);
                 }
             }
 
@@ -648,10 +667,26 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
                     lineageIdentifiers.add(in.readUTF());
                 }
                 ffBuilder.lineageIdentifiers(lineageIdentifiers);
-                ffBuilder.lineageStartDate(in.readLong());
+
+                final long lineageStartDate = in.readLong();
+                final long lineageStartIndex;
+                if (version > 7) {
+                    lineageStartIndex = in.readLong();
+                } else {
+                    lineageStartIndex = 0L;
+                }
+                ffBuilder.lineageStart(lineageStartDate, lineageStartIndex);
 
                 if (version > 5) {
-                    ffBuilder.lastQueueDate(in.readLong());
+                    final long lastQueueDate = in.readLong();
+                    final long queueDateIndex;
+                    if (version > 7) {
+                        queueDateIndex = in.readLong();
+                    } else {
+                        queueDateIndex = 0L;
+                    }
+
+                    ffBuilder.lastQueued(lastQueueDate, queueDateIndex);
                 }
             }
 
@@ -872,7 +907,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
 
         @Override
         public int getVersion() {
-            return 7;
+            return CURRENT_ENCODING_VERSION;
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/32b8a9b9/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
index fcfd524..c27fad3 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
@@ -242,5 +242,15 @@ public class TestFileSystemSwapManager {
         public long getContentClaimOffset() {
             return 0;
         }
+
+        @Override
+        public long getLineageStartIndex() {
+            return 0;
+        }
+
+        @Override
+        public long getQueueDateIndex() {
+            return 0;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/32b8a9b9/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
index 18c55c6..85d9838 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
@@ -692,6 +692,16 @@ public class TestStandardFlowFileQueue {
         public long getContentClaimOffset() {
             return 0;
         }
+
+        @Override
+        public long getLineageStartIndex() {
+            return 0;
+        }
+
+        @Override
+        public long getQueueDateIndex() {
+            return 0;
+        }
     }
 
     private static class FlowFileSizePrioritizer implements FlowFilePrioritizer {

http://git-wip-us.apache.org/repos/asf/nifi/blob/32b8a9b9/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java
index 26766d6..a34d78b 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java
@@ -77,6 +77,16 @@ public class TestUtil {
             public int compareTo(final FlowFile o) {
                 return 0;
             }
+
+            @Override
+            public long getLineageStartIndex() {
+                return 0;
+            }
+
+            @Override
+            public long getQueueDateIndex() {
+                return 0;
+            }
         };
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/32b8a9b9/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/test/java/org/apache/nifi/provenance/TestVolatileProvenanceRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/test/java/org/apache/nifi/provenance/TestVolatileProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/test/java/org/apache/nifi/provenance/TestVolatileProvenanceRepository.java
index 7a7a334..4daa31c 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/test/java/org/apache/nifi/provenance/TestVolatileProvenanceRepository.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/test/java/org/apache/nifi/provenance/TestVolatileProvenanceRepository.java
@@ -172,6 +172,16 @@ public class TestVolatileProvenanceRepository {
             public Long getLastQueueDate() {
                 return System.currentTimeMillis();
             }
+
+            @Override
+            public long getLineageStartIndex() {
+                return 0;
+            }
+
+            @Override
+            public long getQueueDateIndex() {
+                return 0;
+            }
         };
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/32b8a9b9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/main/java/org/apache/nifi/prioritizer/FirstInFirstOutPrioritizer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/main/java/org/apache/nifi/prioritizer/FirstInFirstOutPrioritizer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/main/java/org/apache/nifi/prioritizer/FirstInFirstOutPrioritizer.java
index 6107e87..08437c7 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/main/java/org/apache/nifi/prioritizer/FirstInFirstOutPrioritizer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/main/java/org/apache/nifi/prioritizer/FirstInFirstOutPrioritizer.java
@@ -31,7 +31,12 @@ public class FirstInFirstOutPrioritizer implements FlowFilePrioritizer {
             return 1;
         }
 
-        return o1.getLastQueueDate().compareTo(o2.getLastQueueDate());
+        final int dateComparison = o1.getLastQueueDate().compareTo(o2.getLastQueueDate());
+        if (dateComparison != 0) {
+            return dateComparison;
+        }
+
+        return Long.compare(o1.getQueueDateIndex(), o2.getQueueDateIndex());
     }
 
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/32b8a9b9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/main/java/org/apache/nifi/prioritizer/NewestFlowFileFirstPrioritizer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/main/java/org/apache/nifi/prioritizer/NewestFlowFileFirstPrioritizer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/main/java/org/apache/nifi/prioritizer/NewestFlowFileFirstPrioritizer.java
index 515993e..4893cf0 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/main/java/org/apache/nifi/prioritizer/NewestFlowFileFirstPrioritizer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/main/java/org/apache/nifi/prioritizer/NewestFlowFileFirstPrioritizer.java
@@ -36,7 +36,7 @@ public class NewestFlowFileFirstPrioritizer implements FlowFilePrioritizer {
             return lineageDateCompare;
         }
 
-        return Long.compare(o2.getId(), o1.getId());
+        return Long.compare(o2.getLineageStartIndex(), o1.getLineageStartIndex());
     }
 
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/32b8a9b9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/main/java/org/apache/nifi/prioritizer/OldestFlowFileFirstPrioritizer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/main/java/org/apache/nifi/prioritizer/OldestFlowFileFirstPrioritizer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/main/java/org/apache/nifi/prioritizer/OldestFlowFileFirstPrioritizer.java
index b4781f3..386d912 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/main/java/org/apache/nifi/prioritizer/OldestFlowFileFirstPrioritizer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/main/java/org/apache/nifi/prioritizer/OldestFlowFileFirstPrioritizer.java
@@ -36,7 +36,7 @@ public class OldestFlowFileFirstPrioritizer implements FlowFilePrioritizer {
             return lineageDateCompare;
         }
 
-        return Long.compare(o1.getId(), o2.getId());
+        return Long.compare(o1.getLineageStartIndex(), o2.getLineageStartIndex());
     }
 
 }