You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2016/07/05 18:59:23 UTC
nifi git commit: NIFI-1877,
NIFI-1306: Add fields to FlowFile for FIFO Prioritizer,
Oldest/Newest FlowFile first prioritizers to work properly
Repository: nifi
Updated Branches:
refs/heads/master 7e63b0036 -> 32b8a9b9f
NIFI-1877, NIFI-1306: Add fields to FlowFile for FIFO Prioritizer, Oldest/Newest FlowFile first prioritizers to work properly
This closes #546
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/32b8a9b9
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/32b8a9b9
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/32b8a9b9
Branch: refs/heads/master
Commit: 32b8a9b9f4c07b4f5d3049c33880b547481cbb24
Parents: 7e63b00
Author: Mark Payne <ma...@hotmail.com>
Authored: Tue Jul 5 14:56:22 2016 -0400
Committer: Matt Burgess <ma...@apache.org>
Committed: Tue Jul 5 14:59:08 2016 -0400
----------------------------------------------------------------------
.../java/org/apache/nifi/flowfile/FlowFile.java | 26 +++++++++++
.../java/org/apache/nifi/util/MockFlowFile.java | 10 +++++
.../nifi/controller/FileSystemSwapManager.java | 27 ++++++++++--
.../apache/nifi/controller/FlowController.java | 2 +-
.../repository/StandardFlowFileRecord.java | 25 ++++++++++-
.../repository/StandardProcessSession.java | 23 +++++++++-
.../WriteAheadFlowFileRepository.java | 45 +++++++++++++++++---
.../controller/TestFileSystemSwapManager.java | 10 +++++
.../controller/TestStandardFlowFileQueue.java | 10 +++++
.../org/apache/nifi/provenance/TestUtil.java | 10 +++++
.../TestVolatileProvenanceRepository.java | 10 +++++
.../prioritizer/FirstInFirstOutPrioritizer.java | 7 ++-
.../NewestFlowFileFirstPrioritizer.java | 2 +-
.../OldestFlowFileFirstPrioritizer.java | 2 +-
14 files changed, 193 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/32b8a9b9/nifi-api/src/main/java/org/apache/nifi/flowfile/FlowFile.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/flowfile/FlowFile.java b/nifi-api/src/main/java/org/apache/nifi/flowfile/FlowFile.java
index 1288d21..43fbf20 100644
--- a/nifi-api/src/main/java/org/apache/nifi/flowfile/FlowFile.java
+++ b/nifi-api/src/main/java/org/apache/nifi/flowfile/FlowFile.java
@@ -53,6 +53,19 @@ public interface FlowFile extends Comparable<FlowFile> {
long getLineageStartDate();
/**
+ * Returns a 64-bit integer that indicates the order in which the FlowFile was added to the
+ * flow with respect to other FlowFiles that have the same last lineage start date.
+ * I.e., if two FlowFiles return the same value for {@link #getLineageStartDate()}, the order
+ * in which those FlowFiles were added to the flow can be determined by looking at the result of
+ * this method. However, no guarantee is made by this method about the ordering of FlowFiles
+ * that have different values for the {@link #getLineageStartDate()} method.
+ *
+ * @return the index that can be used to compare two FlowFiles with the same lineage start date
+ * to understand the order in which the two FlowFiles were enqueued.
+ */
+ long getLineageStartIndex();
+
+ /**
* @return the time at which the FlowFile was most recently added to a
* FlowFile queue, or {@code null} if the FlowFile has never been enqueued.
* This value will always be populated before it is passed to a
@@ -61,6 +74,19 @@ public interface FlowFile extends Comparable<FlowFile> {
Long getLastQueueDate();
/**
+ * Returns a 64-bit integer that indicates the order in which the FlowFile was added to the
+ * FlowFile queue with respect to other FlowFiles that have the same last queue date.
+ * I.e., if two FlowFiles return the same value for {@link #getLastQueueDate()}, the order
+ * in which those FlowFiles were enqueued can be determined by looking at the result of
+ * this method. However, no guarantee is made by this method about the ordering of FlowFiles
+ * that have different values for the {@link #getLastQueueDate()} method.
+ *
+ * @return the index that can be used to compare two FlowFiles with the same last queue date
+ * to understand the order in which the two FlowFiles were enqueued.
+ */
+ long getQueueDateIndex();
+
+ /**
* <p>
* If a FlowFile is derived from multiple "parent" FlowFiles, all of the
* parents' Lineage Identifiers will be in the set.
http://git-wip-us.apache.org/repos/asf/nifi/blob/32b8a9b9/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java
----------------------------------------------------------------------
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java
index 41bcc74..049c65b 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java
@@ -290,4 +290,14 @@ public class MockFlowFile implements FlowFileRecord {
public long getContentClaimOffset() {
return 0;
}
+
+ @Override
+ public long getLineageStartIndex() {
+ return 0;
+ }
+
+ @Override
+ public long getQueueDateIndex() {
+ return 0;
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/32b8a9b9/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
index 156389b..5234f0e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
@@ -74,7 +74,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
private static final Pattern SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+\\.swap");
private static final Pattern TEMP_SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+\\.swap\\.part");
- public static final int SWAP_ENCODING_VERSION = 8;
+ public static final int SWAP_ENCODING_VERSION = 9;
public static final String EVENT_CATEGORY = "Swap FlowFiles";
private static final Logger logger = LoggerFactory.getLogger(FileSystemSwapManager.class);
@@ -328,7 +328,9 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
}
out.writeLong(flowFile.getLineageStartDate());
+ out.writeLong(flowFile.getLineageStartIndex());
out.writeLong(flowFile.getLastQueueDate());
+ out.writeLong(flowFile.getQueueDateIndex());
out.writeLong(flowFile.getSize());
final ContentClaim claim = flowFile.getContentClaim();
@@ -447,10 +449,29 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
lineageIdentifiers.add(in.readUTF());
}
ffBuilder.lineageIdentifiers(lineageIdentifiers);
- ffBuilder.lineageStartDate(in.readLong());
+
+ // version 9 adds in a 'lineage start index'
+ final long lineageStartDate = in.readLong();
+ final long lineageStartIndex;
+ if (serializationVersion > 8) {
+ lineageStartIndex = in.readLong();
+ } else {
+ lineageStartIndex = 0L;
+ }
+
+ ffBuilder.lineageStart(lineageStartDate, lineageStartIndex);
if (serializationVersion > 5) {
- ffBuilder.lastQueueDate(in.readLong());
+ // Version 9 adds in a 'queue date index'
+ final long lastQueueDate = in.readLong();
+ final long queueDateIndex;
+ if (serializationVersion > 8) {
+ queueDateIndex = in.readLong();
+ } else {
+ queueDateIndex = 0L;
+ }
+
+ ffBuilder.lastQueued(lastQueueDate, queueDateIndex);
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/32b8a9b9/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 3d2eca2..8c710fa 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -3671,7 +3671,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
.entryDate(System.currentTimeMillis())
.id(flowFileRepository.getNextFlowFileSequence())
.lineageIdentifiers(lineageIdentifiers)
- .lineageStartDate(event.getLineageStartDate())
+ .lineageStart(event.getLineageStartDate(), 0L)
.size(contentSize.longValue())
// Create a new UUID and add attributes indicating that this is a replay
.addAttribute("flowfile.replay", "true")
http://git-wip-us.apache.org/repos/asf/nifi/blob/32b8a9b9/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java
index d3d8d40..cd1ba14 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java
@@ -48,6 +48,7 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord {
private final long id;
private final long entryDate;
private final long lineageStartDate;
+ private final long lineageStartIndex;
private final Set<String> lineageIdentifiers;
private final long size;
private final long penaltyExpirationMs;
@@ -55,18 +56,21 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord {
private final ContentClaim claim;
private final long claimOffset;
private final long lastQueueDate;
+ private final long queueDateIndex;
private StandardFlowFileRecord(final Builder builder) {
this.id = builder.bId;
this.attributes = builder.bAttributes;
this.entryDate = builder.bEntryDate;
this.lineageStartDate = builder.bLineageStartDate;
+ this.lineageStartIndex = builder.bLineageStartIndex;
this.lineageIdentifiers = builder.bLineageIdentifiers;
this.penaltyExpirationMs = builder.bPenaltyExpirationMs;
this.size = builder.bSize;
this.claim = builder.bClaim;
this.claimOffset = builder.bClaimOffset;
this.lastQueueDate = builder.bLastQueueDate;
+ this.queueDateIndex = builder.bQueueDateIndex;
}
@Override
@@ -124,6 +128,16 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord {
return this.claimOffset;
}
+ @Override
+ public long getLineageStartIndex() {
+ return lineageStartIndex;
+ }
+
+ @Override
+ public long getQueueDateIndex() {
+ return queueDateIndex;
+ }
+
/**
* Provides the natural ordering for FlowFile objects which is based on their identifier.
*
@@ -167,6 +181,7 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord {
private long bId;
private long bEntryDate = System.currentTimeMillis();
private long bLineageStartDate = bEntryDate;
+ private long bLineageStartIndex = 0L;
private final Set<String> bLineageIdentifiers = new HashSet<>();
private long bPenaltyExpirationMs = -1L;
private long bSize = 0L;
@@ -174,6 +189,7 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord {
private ContentClaim bClaim = null;
private long bClaimOffset = 0L;
private long bLastQueueDate = System.currentTimeMillis();
+ private long bQueueDateIndex = 0L;
public Builder id(final long id) {
bId = id;
@@ -204,8 +220,9 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord {
return this;
}
- public Builder lineageStartDate(final long lineageStartDate) {
+ public Builder lineageStart(final long lineageStartDate, final long lineageStartIndex) {
bLineageStartDate = lineageStartDate;
+ bLineageStartIndex = lineageStartIndex;
return this;
}
@@ -298,8 +315,9 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord {
return this;
}
- public Builder lastQueueDate(final long lastQueueDate) {
+ public Builder lastQueued(final long lastQueueDate, final long queueDateIndex) {
this.bLastQueueDate = lastQueueDate;
+ this.bQueueDateIndex = queueDateIndex;
return this;
}
@@ -310,6 +328,7 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord {
bId = specFlowFile.getId();
bEntryDate = specFlowFile.getEntryDate();
bLineageStartDate = specFlowFile.getLineageStartDate();
+ bLineageStartIndex = specFlowFile.getLineageStartIndex();
bLineageIdentifiers.clear();
bLineageIdentifiers.addAll(specFlowFile.getLineageIdentifiers());
bPenaltyExpirationMs = specFlowFile.getPenaltyExpirationMillis();
@@ -317,6 +336,8 @@ public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord {
bAttributes.putAll(specFlowFile.getAttributes());
bClaim = specFlowFile.getContentClaim();
bClaimOffset = specFlowFile.getContentClaimOffset();
+ bLastQueueDate = specFlowFile.getLastQueueDate();
+ bQueueDateIndex = specFlowFile.getQueueDateIndex();
return this;
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/32b8a9b9/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 062e515..f6eca71 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -92,6 +92,7 @@ import org.slf4j.LoggerFactory;
public final class StandardProcessSession implements ProcessSession, ProvenanceEventEnricher {
private static final AtomicLong idGenerator = new AtomicLong(0L);
+ private static final AtomicLong enqueuedIndex = new AtomicLong(0L);
// determines how many things must be transferred, removed, modified in order to avoid logging the FlowFile ID's on commit/rollback
public static final int VERBOSE_LOG_THRESHOLD = 10;
@@ -1526,11 +1527,18 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
return newFile;
}
+ private void updateLastQueuedDate(final StandardRepositoryRecord record) {
+ final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent())
+ .lastQueued(System.currentTimeMillis(), enqueuedIndex.getAndIncrement()).build();
+ record.setWorking(newFile);
+ }
+
@Override
public void transfer(final FlowFile flowFile, final Relationship relationship) {
validateRecordState(flowFile);
final StandardRepositoryRecord record = records.get(flowFile);
record.setTransferRelationship(relationship);
+ updateLastQueuedDate(record);
final int numDestinations = context.getConnections(relationship).size();
final int multiplier = Math.max(1, numDestinations);
@@ -1560,6 +1568,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
throw new IllegalArgumentException("Cannot transfer FlowFiles that are created in this Session back to self");
}
record.setTransferRelationship(Relationship.SELF);
+ updateLastQueuedDate(record);
}
@Override
@@ -1589,6 +1598,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
for (final FlowFile flowFile : flowFiles) {
final StandardRepositoryRecord record = records.get(flowFile);
record.setTransferRelationship(relationship);
+ updateLastQueuedDate(record);
+
contentSize += flowFile.getSize() * multiplier;
}
@@ -2482,7 +2493,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final Set<String> lineageIdentifiers = new HashSet<>(parent.getLineageIdentifiers());
lineageIdentifiers.add(parent.getAttribute(CoreAttributes.UUID.key()));
fFileBuilder.lineageIdentifiers(lineageIdentifiers);
- fFileBuilder.lineageStartDate(parent.getLineageStartDate());
+ fFileBuilder.lineageStart(parent.getLineageStartDate(), parent.getLineageStartIndex());
fFileBuilder.addAttributes(newAttributes);
final FlowFileRecord fFile = fFileBuilder.build();
@@ -2516,6 +2527,14 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
}
+ // find the smallest lineage start index that has the same lineage start date as the one we've chosen.
+ long lineageStartIndex = 0L;
+ for (final FlowFile parent : parents) {
+ if (parent.getLineageStartDate() == lineageStartDate && parent.getLineageStartIndex() < lineageStartIndex) {
+ lineageStartIndex = parent.getLineageStartIndex();
+ }
+ }
+
newAttributes.put(CoreAttributes.FILENAME.key(), String.valueOf(System.nanoTime()));
newAttributes.put(CoreAttributes.PATH.key(), DEFAULT_FLOWFILE_PATH);
newAttributes.put(CoreAttributes.UUID.key(), UUID.randomUUID().toString());
@@ -2523,7 +2542,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final FlowFileRecord fFile = new StandardFlowFileRecord.Builder().id(context.getNextFlowFileSequence())
.addAttributes(newAttributes)
.lineageIdentifiers(lineageIdentifiers)
- .lineageStartDate(lineageStartDate)
+ .lineageStart(lineageStartDate, lineageStartIndex)
.build();
final StandardRepositoryRecord record = new StandardRepositoryRecord(null);
http://git-wip-us.apache.org/repos/asf/nifi/blob/32b8a9b9/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
index 211baa7..0f40cbb 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
@@ -388,6 +388,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
}
private static class WriteAheadRecordSerde implements SerDe<RepositoryRecord> {
+ private static final int CURRENT_ENCODING_VERSION = 8;
public static final byte ACTION_CREATE = 0;
public static final byte ACTION_UPDATE = 1;
@@ -474,9 +475,11 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
}
out.writeLong(flowFile.getLineageStartDate());
+ out.writeLong(flowFile.getLineageStartIndex());
final Long queueDate = flowFile.getLastQueueDate();
out.writeLong(queueDate == null ? System.currentTimeMillis() : queueDate);
+ out.writeLong(flowFile.getQueueDateIndex());
out.writeLong(flowFile.getSize());
if (associatedQueue == null) {
@@ -552,10 +555,26 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
lineageIdentifiers.add(in.readUTF());
}
ffBuilder.lineageIdentifiers(lineageIdentifiers);
- ffBuilder.lineageStartDate(in.readLong());
+
+ final long lineageStartDate = in.readLong();
+ final long lineageStartIndex;
+ if (version > 7) {
+ lineageStartIndex = in.readLong();
+ } else {
+ lineageStartIndex = 0L;
+ }
+ ffBuilder.lineageStart(lineageStartDate, lineageStartIndex);
if (version > 5) {
- ffBuilder.lastQueueDate(in.readLong());
+ final long lastQueueDate = in.readLong();
+ final long queueDateIndex;
+ if (version > 7) {
+ queueDateIndex = in.readLong();
+ } else {
+ queueDateIndex = 0L;
+ }
+
+ ffBuilder.lastQueued(lastQueueDate, queueDateIndex);
}
}
@@ -648,10 +667,26 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
lineageIdentifiers.add(in.readUTF());
}
ffBuilder.lineageIdentifiers(lineageIdentifiers);
- ffBuilder.lineageStartDate(in.readLong());
+
+ final long lineageStartDate = in.readLong();
+ final long lineageStartIndex;
+ if (version > 7) {
+ lineageStartIndex = in.readLong();
+ } else {
+ lineageStartIndex = 0L;
+ }
+ ffBuilder.lineageStart(lineageStartDate, lineageStartIndex);
if (version > 5) {
- ffBuilder.lastQueueDate(in.readLong());
+ final long lastQueueDate = in.readLong();
+ final long queueDateIndex;
+ if (version > 7) {
+ queueDateIndex = in.readLong();
+ } else {
+ queueDateIndex = 0L;
+ }
+
+ ffBuilder.lastQueued(lastQueueDate, queueDateIndex);
}
}
@@ -872,7 +907,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
@Override
public int getVersion() {
- return 7;
+ return CURRENT_ENCODING_VERSION;
}
@Override
http://git-wip-us.apache.org/repos/asf/nifi/blob/32b8a9b9/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
index fcfd524..c27fad3 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
@@ -242,5 +242,15 @@ public class TestFileSystemSwapManager {
public long getContentClaimOffset() {
return 0;
}
+
+ @Override
+ public long getLineageStartIndex() {
+ return 0;
+ }
+
+ @Override
+ public long getQueueDateIndex() {
+ return 0;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/32b8a9b9/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
index 18c55c6..85d9838 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
@@ -692,6 +692,16 @@ public class TestStandardFlowFileQueue {
public long getContentClaimOffset() {
return 0;
}
+
+ @Override
+ public long getLineageStartIndex() {
+ return 0;
+ }
+
+ @Override
+ public long getQueueDateIndex() {
+ return 0;
+ }
}
private static class FlowFileSizePrioritizer implements FlowFilePrioritizer {
http://git-wip-us.apache.org/repos/asf/nifi/blob/32b8a9b9/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java
index 26766d6..a34d78b 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestUtil.java
@@ -77,6 +77,16 @@ public class TestUtil {
public int compareTo(final FlowFile o) {
return 0;
}
+
+ @Override
+ public long getLineageStartIndex() {
+ return 0;
+ }
+
+ @Override
+ public long getQueueDateIndex() {
+ return 0;
+ }
};
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/32b8a9b9/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/test/java/org/apache/nifi/provenance/TestVolatileProvenanceRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/test/java/org/apache/nifi/provenance/TestVolatileProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/test/java/org/apache/nifi/provenance/TestVolatileProvenanceRepository.java
index 7a7a334..4daa31c 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/test/java/org/apache/nifi/provenance/TestVolatileProvenanceRepository.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/test/java/org/apache/nifi/provenance/TestVolatileProvenanceRepository.java
@@ -172,6 +172,16 @@ public class TestVolatileProvenanceRepository {
public Long getLastQueueDate() {
return System.currentTimeMillis();
}
+
+ @Override
+ public long getLineageStartIndex() {
+ return 0;
+ }
+
+ @Override
+ public long getQueueDateIndex() {
+ return 0;
+ }
};
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/32b8a9b9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/main/java/org/apache/nifi/prioritizer/FirstInFirstOutPrioritizer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/main/java/org/apache/nifi/prioritizer/FirstInFirstOutPrioritizer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/main/java/org/apache/nifi/prioritizer/FirstInFirstOutPrioritizer.java
index 6107e87..08437c7 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/main/java/org/apache/nifi/prioritizer/FirstInFirstOutPrioritizer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/main/java/org/apache/nifi/prioritizer/FirstInFirstOutPrioritizer.java
@@ -31,7 +31,12 @@ public class FirstInFirstOutPrioritizer implements FlowFilePrioritizer {
return 1;
}
- return o1.getLastQueueDate().compareTo(o2.getLastQueueDate());
+ final int dateComparison = o1.getLastQueueDate().compareTo(o2.getLastQueueDate());
+ if (dateComparison != 0) {
+ return dateComparison;
+ }
+
+ return Long.compare(o1.getQueueDateIndex(), o2.getQueueDateIndex());
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/32b8a9b9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/main/java/org/apache/nifi/prioritizer/NewestFlowFileFirstPrioritizer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/main/java/org/apache/nifi/prioritizer/NewestFlowFileFirstPrioritizer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/main/java/org/apache/nifi/prioritizer/NewestFlowFileFirstPrioritizer.java
index 515993e..4893cf0 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/main/java/org/apache/nifi/prioritizer/NewestFlowFileFirstPrioritizer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/main/java/org/apache/nifi/prioritizer/NewestFlowFileFirstPrioritizer.java
@@ -36,7 +36,7 @@ public class NewestFlowFileFirstPrioritizer implements FlowFilePrioritizer {
return lineageDateCompare;
}
- return Long.compare(o2.getId(), o1.getId());
+ return Long.compare(o2.getLineageStartIndex(), o1.getLineageStartIndex());
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/32b8a9b9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/main/java/org/apache/nifi/prioritizer/OldestFlowFileFirstPrioritizer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/main/java/org/apache/nifi/prioritizer/OldestFlowFileFirstPrioritizer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/main/java/org/apache/nifi/prioritizer/OldestFlowFileFirstPrioritizer.java
index b4781f3..386d912 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/main/java/org/apache/nifi/prioritizer/OldestFlowFileFirstPrioritizer.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-prioritizers/src/main/java/org/apache/nifi/prioritizer/OldestFlowFileFirstPrioritizer.java
@@ -36,7 +36,7 @@ public class OldestFlowFileFirstPrioritizer implements FlowFilePrioritizer {
return lineageDateCompare;
}
- return Long.compare(o1.getId(), o2.getId());
+ return Long.compare(o1.getLineageStartIndex(), o2.getLineageStartIndex());
}
}