You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mt...@apache.org on 2018/07/07 11:58:10 UTC

nifi git commit: NIFI-5384 FlowFile's queued in batches should all have the same Queue time

Repository: nifi
Updated Branches:
  refs/heads/master 0c5e159eb -> d50e3f174


NIFI-5384 FlowFile's queued in batches should all have the same Queue time

This closes #2849

Signed-off-by: Mike Thomsen <mi...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d50e3f17
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d50e3f17
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d50e3f17

Branch: refs/heads/master
Commit: d50e3f17471e4ade3373b9e95e5c6a8e364248a6
Parents: 0c5e159
Author: patricker <pa...@gmail.com>
Authored: Fri Jul 6 10:38:04 2018 -0600
Committer: Mike Thomsen <mi...@gmail.com>
Committed: Sat Jul 7 07:56:53 2018 -0400

----------------------------------------------------------------------
 .../repository/StandardProcessSession.java      | 11 ++++++---
 .../repository/TestStandardProcessSession.java  | 25 ++++++++++++++++++++
 2 files changed, 33 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/d50e3f17/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 12bcafd..ea4969c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -1861,12 +1861,16 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
         return newFile;
     }
 
-    private void updateLastQueuedDate(final StandardRepositoryRecord record) {
+    private void updateLastQueuedDate(final StandardRepositoryRecord record, final Long lastQueueDate) {
         final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent())
-            .lastQueued(System.currentTimeMillis(), enqueuedIndex.getAndIncrement()).build();
+                .lastQueued(lastQueueDate, enqueuedIndex.getAndIncrement()).build();
         record.setWorking(newFile);
     }
 
+    private void updateLastQueuedDate(final StandardRepositoryRecord record) {
+        updateLastQueuedDate(record, System.currentTimeMillis());
+    }
+
     @Override
     public void transfer(FlowFile flowFile, final Relationship relationship) {
         verifyTaskActive();
@@ -1938,11 +1942,12 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
 
         final int multiplier = Math.max(1, numDestinations);
 
+        final long queuedTime = System.currentTimeMillis();
         long contentSize = 0L;
         for (final FlowFile flowFile : flowFiles) {
             final StandardRepositoryRecord record = records.get(flowFile);
             record.setTransferRelationship(relationship);
-            updateLastQueuedDate(record);
+            updateLastQueuedDate(record, queuedTime);
 
             contentSize += flowFile.getSize();
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/d50e3f17/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
index f47be4d..4059557 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
@@ -19,6 +19,7 @@ package org.apache.nifi.controller.repository;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
@@ -1651,6 +1652,30 @@ public class TestStandardProcessSession {
     }
 
     @Test
+    public void testBatchQueuedHaveSameQueuedTime() {
+        for (int i = 0; i < 100; i++) {
+            final FlowFileRecord flowFile = new StandardFlowFileRecord.Builder()
+                    .id(i)
+                    .addAttribute("uuid", "000000000000-0000-0000-0000-0000000" + i)
+                    .build();
+            this.flowFileQueue.put(flowFile);
+        }
+
+        final List<FlowFile> flowFiles = session.get(100);
+
+        // FlowFile Queued times should not match yet
+        assertNotEquals("Queued times should not be equal.", flowFiles.get(0).getLastQueueDate(), flowFiles.get(99).getLastQueueDate());
+
+        session.transfer(flowFiles, new Relationship.Builder().name("A").build());
+        session.commit();
+
+        final List<FlowFile> flowFilesUpdated = session.get(100);
+
+        // FlowFile Queued times should match
+        assertEquals("Queued times should be equal.", flowFilesUpdated.get(0).getLastQueueDate(), flowFilesUpdated.get(99).getLastQueueDate());
+    }
+
+    @Test
     public void testAttributesModifiedEmitted() throws IOException {
         final FlowFileRecord flowFile = new StandardFlowFileRecord.Builder()
                 .id(1L)