You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ij...@apache.org on 2020/04/16 06:57:14 UTC

[nifi] branch master updated: NIFI-7348 Wait - Removes WAIT_START_TIMESTAMP after expiration

This is an automated email from the ASF dual-hosted git repository.

ijokarumawak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e3f420  NIFI-7348 Wait - Removes WAIT_START_TIMESTAMP after expiration
8e3f420 is described below

commit 8e3f42051fb3c7da27873de8b6d4507827a7b80d
Author: EndzeitBegins <16...@users.noreply.github.com>
AuthorDate: Mon Apr 13 16:19:55 2020 +0200

    NIFI-7348 Wait - Removes WAIT_START_TIMESTAMP after expiration
    
    This closes #4201.
    
    Signed-off-by: Koji Kawamura <ij...@apache.org>
---
 .../org/apache/nifi/processors/standard/Wait.java  |  4 +-
 .../apache/nifi/processors/standard/TestWait.java  | 62 ++++++++++++----------
 2 files changed, 37 insertions(+), 29 deletions(-)

diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java
index 45ffcb2..37f4479 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java
@@ -87,7 +87,7 @@ import static org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult.REJE
 @WritesAttributes({
         @WritesAttribute(attribute = "wait.start.timestamp", description = "All FlowFiles will have an attribute 'wait.start.timestamp', which sets the "
         + "initial epoch timestamp when the file first entered this processor.  This is used to determine the expiration time of the FlowFile.  "
-        + "This attribute is not written when the FlowFile is transferred to failure or success"),
+        + "This attribute is not written when the FlowFile is transferred to failure, expired or success"),
         @WritesAttribute(attribute = "wait.counter.<counterName>", description = "If a signal exists when the processor runs, "
         + "each count value in the signal is copied.")
 })
@@ -375,7 +375,7 @@ public class Wait extends AbstractProcessor {
             final Relationship finalRelationship = relationship;
             final List<FlowFile> flowFilesWithSignalAttributes = routedFlowFiles.getValue().stream()
                     .map(f -> {
-                        if (REL_SUCCESS.equals(finalRelationship)) {
+                        if (REL_SUCCESS.equals(finalRelationship) || REL_EXPIRED.equals(finalRelationship)) {
                             // These flowFiles will be exiting the wait, clear the timer
                             f = clearWaitState(session, f);
                         }
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java
index 7970601..d8fffdc 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java
@@ -69,8 +69,8 @@ public class TestWait {
 
         // no cache key attribute
         runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
-        // timestamp must be present
-        runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0).assertAttributeExists(Wait.WAIT_START_TIMESTAMP);
+        MockFlowFile ff = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
+        ff.assertAttributeExists(Wait.WAIT_START_TIMESTAMP); // timestamp must be set
         runner.clearTransferState();
     }
 
@@ -103,7 +103,7 @@ public class TestWait {
 
         runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
         MockFlowFile ff = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
-        ff.assertAttributeExists(Wait.WAIT_START_TIMESTAMP);
+        ff.assertAttributeExists(Wait.WAIT_START_TIMESTAMP); // timestamp must be set
 
         runner.clearTransferState();
         runner.enqueue(ff);
@@ -112,6 +112,8 @@ public class TestWait {
         runner.run();
 
         runner.assertAllFlowFilesTransferred(Wait.REL_EXPIRED, 1);
+        ff = runner.getFlowFilesForRelationship(Wait.REL_EXPIRED).get(0);
+        ff.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP); // timestamp must be cleared
         runner.clearTransferState();
     }
 
@@ -129,7 +131,7 @@ public class TestWait {
 
         runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
         MockFlowFile ff = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
-        ff.assertAttributeExists(Wait.WAIT_START_TIMESTAMP);
+        ff.assertAttributeExists(Wait.WAIT_START_TIMESTAMP); // timestamp must be set
         runner.clearTransferState();
         runner.enqueue(ff);
 
@@ -145,6 +147,7 @@ public class TestWait {
 
         runner.assertAllFlowFilesTransferred(Wait.REL_EXPIRED, 1);
         ff = runner.getFlowFilesForRelationship(Wait.REL_EXPIRED).get(0);
+        ff.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP); // timestamp must be cleared
         // Even if wait didn't complete, signal attributes should be set
         ff.assertAttributeEquals("wait.counter.total", "3");
         ff.assertAttributeEquals("wait.counter.counter-A", "1");
@@ -161,13 +164,14 @@ public class TestWait {
 
         final Map<String, String> props = new HashMap<>();
         props.put("releaseSignalAttribute", "1");
-        props.put("wait.start.timestamp", "blue bunny");
+        props.put(Wait.WAIT_START_TIMESTAMP, "blue bunny");
         runner.enqueue(new byte[]{}, props);
 
         runner.run();
 
         runner.assertAllFlowFilesTransferred(Wait.REL_FAILURE, 1);
-        runner.getFlowFilesForRelationship(Wait.REL_FAILURE).get(0).assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP);
+        MockFlowFile ff = runner.getFlowFilesForRelationship(Wait.REL_FAILURE).get(0);
+        ff.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP); // timestamp must be cleared
         runner.clearTransferState();
     }
 
@@ -181,8 +185,10 @@ public class TestWait {
         runner.run();
 
         runner.assertAllFlowFilesTransferred(Wait.REL_FAILURE, 1);
-        runner.getFlowFilesForRelationship(Wait.REL_FAILURE).get(0).assertAttributeNotExists("wait.counter.total");
-        runner.getFlowFilesForRelationship(Wait.REL_FAILURE).get(0).assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP);
+        MockFlowFile ff = runner.getFlowFilesForRelationship(Wait.REL_FAILURE).get(0);
+        ff.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP); // timestamp must be cleared
+        ff.assertAttributeNotExists("wait.counter.total");
+
         runner.clearTransferState();
     }
 
@@ -217,6 +223,8 @@ public class TestWait {
         runner.run(1, false);
 
         runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
+        MockFlowFile ff = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
+        ff.assertAttributeExists(Wait.WAIT_START_TIMESTAMP); // timestamp must be set
         runner.clearTransferState();
 
         // The signal id should be penalized
@@ -264,9 +272,8 @@ public class TestWait {
         runner.assertTransferCount(Wait.REL_SUCCESS, 1);
 
         final MockFlowFile outputFlowFile = runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0);
+        outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP); // timestamp must be cleared
 
-        // timer cleared
-        outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP);
         // show a new attribute was copied from the cache
         assertEquals("notifyValue", outputFlowFile.getAttribute("notify.only"));
         // show that the original attributes are still there
@@ -307,9 +314,8 @@ public class TestWait {
         runner.assertTransferCount(Wait.REL_SUCCESS, 1);
 
         final MockFlowFile outputFlowFile = runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0);
+        outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP); // timestamp must be cleared
 
-        // timer cleared
-        outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP);
         // show a new attribute was copied from the cache
         assertEquals("notifyValue", outputFlowFile.getAttribute("notify.only"));
         // show that the original attributes are still there
@@ -348,7 +354,9 @@ public class TestWait {
         runner.run();
         runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
         MockFlowFile waitingFlowFile = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
-        waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP);
+        waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP); // timestamp must be set
+        String initialTimestamp = waitingFlowFile.getAttribute(Wait.WAIT_START_TIMESTAMP);
+
         /*
          * 2nd iteration.
          */
@@ -362,7 +370,8 @@ public class TestWait {
         runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
         // Still waiting since total count doesn't reach to 3.
         waitingFlowFile = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
-        waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP);
+        waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP); // timestamp must be set
+        waitingFlowFile.assertAttributeEquals(Wait.WAIT_START_TIMESTAMP, initialTimestamp); // timestamp must remain constant
 
         /*
          * 3rd iteration.
@@ -374,7 +383,8 @@ public class TestWait {
         runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
         // Still waiting since total count doesn't reach to 3.
         waitingFlowFile = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
-        waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP);
+        waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP); // timestamp must be set
+        waitingFlowFile.assertAttributeEquals(Wait.WAIT_START_TIMESTAMP, initialTimestamp); // timestamp must remain constant
 
         /*
          * 4th iteration.
@@ -389,9 +399,7 @@ public class TestWait {
         runner.assertAllFlowFilesTransferred(Wait.REL_SUCCESS, 1);
 
         final MockFlowFile outputFlowFile = runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0);
-
-        // wait timer cleared
-        outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP);
+        outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP);  // timestamp must be cleared
 
         // show a new attribute was copied from the cache
         assertEquals("notifyValue", outputFlowFile.getAttribute("notify.only"));
@@ -434,7 +442,8 @@ public class TestWait {
         runner.run();
         runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
         MockFlowFile waitingFlowFile = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
-        waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP);
+        waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP); // timestamp must be set
+        String initialTimestamp = waitingFlowFile.getAttribute(Wait.WAIT_START_TIMESTAMP);
 
         /*
          * 2nd iteration.
@@ -449,7 +458,8 @@ public class TestWait {
         runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
         // Still waiting since counter-B doesn't reach to 2.
         waitingFlowFile = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
-        waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP);
+        waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP); // timestamp must be set
+        waitingFlowFile.assertAttributeEquals(Wait.WAIT_START_TIMESTAMP, initialTimestamp); // timestamp must remain constant
 
         /*
          * 3rd iteration.
@@ -464,7 +474,8 @@ public class TestWait {
         runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
         // Still waiting since total count doesn't reach to 3.
         waitingFlowFile = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
-        waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP);
+        waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP); // timestamp must be set
+        waitingFlowFile.assertAttributeEquals(Wait.WAIT_START_TIMESTAMP, initialTimestamp); // timestamp must remain constant
 
         /*
          * 4th iteration.
@@ -479,9 +490,8 @@ public class TestWait {
         runner.assertAllFlowFilesTransferred(Wait.REL_SUCCESS, 1);
 
         final MockFlowFile outputFlowFile = runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0);
+        outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP); // timestamp must be cleared
 
-        // wait timer cleared
-        outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP);
         // show a new attribute was copied from the cache
         assertEquals("notifyValue", outputFlowFile.getAttribute("notify.only"));
         // show that the original attributes are still there
@@ -534,8 +544,7 @@ public class TestWait {
         runner.run();
         runner.assertAllFlowFilesTransferred(Wait.REL_SUCCESS, 1);
         MockFlowFile outputFlowFile = runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0);
-        // timer cleared
-        outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP);
+        outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP); // timestamp must be cleared
         outputFlowFile.assertAttributeEquals("wait.counter.counter", "2");
 
         // expect counter to be decremented to 0 and releasable count remains 1.
@@ -552,8 +561,7 @@ public class TestWait {
         runner.run();
         runner.assertAllFlowFilesTransferred(Wait.REL_SUCCESS, 1);
         outputFlowFile = runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0);
-        // timer cleared
-        outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP);
+        outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP); // timestamp must be cleared
         // All counters are consumed.
         outputFlowFile.assertAttributeEquals("wait.counter.counter", "0");