You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mt...@apache.org on 2018/07/07 11:58:10 UTC
nifi git commit: NIFI-5384 FlowFile's queued in batches should all
have the same Queue time
Repository: nifi
Updated Branches:
refs/heads/master 0c5e159eb -> d50e3f174
NIFI-5384 FlowFile's queued in batches should all have the same Queue time
This closes #2849
Signed-off-by: Mike Thomsen <mi...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d50e3f17
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d50e3f17
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d50e3f17
Branch: refs/heads/master
Commit: d50e3f17471e4ade3373b9e95e5c6a8e364248a6
Parents: 0c5e159
Author: patricker <pa...@gmail.com>
Authored: Fri Jul 6 10:38:04 2018 -0600
Committer: Mike Thomsen <mi...@gmail.com>
Committed: Sat Jul 7 07:56:53 2018 -0400
----------------------------------------------------------------------
.../repository/StandardProcessSession.java | 11 ++++++---
.../repository/TestStandardProcessSession.java | 25 ++++++++++++++++++++
2 files changed, 33 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/d50e3f17/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 12bcafd..ea4969c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -1861,12 +1861,16 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
return newFile;
}
- private void updateLastQueuedDate(final StandardRepositoryRecord record) {
+ private void updateLastQueuedDate(final StandardRepositoryRecord record, final Long lastQueueDate) {
final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent())
- .lastQueued(System.currentTimeMillis(), enqueuedIndex.getAndIncrement()).build();
+ .lastQueued(lastQueueDate, enqueuedIndex.getAndIncrement()).build();
record.setWorking(newFile);
}
+ private void updateLastQueuedDate(final StandardRepositoryRecord record) {
+ updateLastQueuedDate(record, System.currentTimeMillis());
+ }
+
@Override
public void transfer(FlowFile flowFile, final Relationship relationship) {
verifyTaskActive();
@@ -1938,11 +1942,12 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final int multiplier = Math.max(1, numDestinations);
+ final long queuedTime = System.currentTimeMillis();
long contentSize = 0L;
for (final FlowFile flowFile : flowFiles) {
final StandardRepositoryRecord record = records.get(flowFile);
record.setTransferRelationship(relationship);
- updateLastQueuedDate(record);
+ updateLastQueuedDate(record, queuedTime);
contentSize += flowFile.getSize();
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/d50e3f17/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
index f47be4d..4059557 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
@@ -19,6 +19,7 @@ package org.apache.nifi.controller.repository;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
@@ -1651,6 +1652,30 @@ public class TestStandardProcessSession {
}
@Test
+ public void testBatchQueuedHaveSameQueuedTime() {
+ for (int i = 0; i < 100; i++) {
+ final FlowFileRecord flowFile = new StandardFlowFileRecord.Builder()
+ .id(i)
+ .addAttribute("uuid", "000000000000-0000-0000-0000-0000000" + i)
+ .build();
+ this.flowFileQueue.put(flowFile);
+ }
+
+ final List<FlowFile> flowFiles = session.get(100);
+
+ // FlowFile Queued times should not match yet
+ assertNotEquals("Queued times should not be equal.", flowFiles.get(0).getLastQueueDate(), flowFiles.get(99).getLastQueueDate());
+
+ session.transfer(flowFiles, new Relationship.Builder().name("A").build());
+ session.commit();
+
+ final List<FlowFile> flowFilesUpdated = session.get(100);
+
+ // FlowFile Queued times should match
+ assertEquals("Queued times should be equal.", flowFilesUpdated.get(0).getLastQueueDate(), flowFilesUpdated.get(99).getLastQueueDate());
+ }
+
+ @Test
public void testAttributesModifiedEmitted() throws IOException {
final FlowFileRecord flowFile = new StandardFlowFileRecord.Builder()
.id(1L)