You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ij...@apache.org on 2020/04/16 06:57:14 UTC
[nifi] branch master updated: NIFI-7348 Wait - Removes
WAIT_START_TIMESTAMP after expiration
This is an automated email from the ASF dual-hosted git repository.
ijokarumawak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 8e3f420 NIFI-7348 Wait - Removes WAIT_START_TIMESTAMP after expiration
8e3f420 is described below
commit 8e3f42051fb3c7da27873de8b6d4507827a7b80d
Author: EndzeitBegins <16...@users.noreply.github.com>
AuthorDate: Mon Apr 13 16:19:55 2020 +0200
NIFI-7348 Wait - Removes WAIT_START_TIMESTAMP after expiration
This closes #4201.
Signed-off-by: Koji Kawamura <ij...@apache.org>
---
.../org/apache/nifi/processors/standard/Wait.java | 4 +-
.../apache/nifi/processors/standard/TestWait.java | 62 ++++++++++++----------
2 files changed, 37 insertions(+), 29 deletions(-)
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java
index 45ffcb2..37f4479 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java
@@ -87,7 +87,7 @@ import static org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult.REJE
@WritesAttributes({
@WritesAttribute(attribute = "wait.start.timestamp", description = "All FlowFiles will have an attribute 'wait.start.timestamp', which sets the "
+ "initial epoch timestamp when the file first entered this processor. This is used to determine the expiration time of the FlowFile. "
- + "This attribute is not written when the FlowFile is transferred to failure or success"),
+ + "This attribute is not written when the FlowFile is transferred to failure, expired or success"),
@WritesAttribute(attribute = "wait.counter.<counterName>", description = "If a signal exists when the processor runs, "
+ "each count value in the signal is copied.")
})
@@ -375,7 +375,7 @@ public class Wait extends AbstractProcessor {
final Relationship finalRelationship = relationship;
final List<FlowFile> flowFilesWithSignalAttributes = routedFlowFiles.getValue().stream()
.map(f -> {
- if (REL_SUCCESS.equals(finalRelationship)) {
+ if (REL_SUCCESS.equals(finalRelationship) || REL_EXPIRED.equals(finalRelationship)) {
// These flowFiles will be exiting the wait, clear the timer
f = clearWaitState(session, f);
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java
index 7970601..d8fffdc 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java
@@ -69,8 +69,8 @@ public class TestWait {
// no cache key attribute
runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
- // timestamp must be present
- runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0).assertAttributeExists(Wait.WAIT_START_TIMESTAMP);
+ MockFlowFile ff = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
+ ff.assertAttributeExists(Wait.WAIT_START_TIMESTAMP); // timestamp must be set
runner.clearTransferState();
}
@@ -103,7 +103,7 @@ public class TestWait {
runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
MockFlowFile ff = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
- ff.assertAttributeExists(Wait.WAIT_START_TIMESTAMP);
+ ff.assertAttributeExists(Wait.WAIT_START_TIMESTAMP); // timestamp must be set
runner.clearTransferState();
runner.enqueue(ff);
@@ -112,6 +112,8 @@ public class TestWait {
runner.run();
runner.assertAllFlowFilesTransferred(Wait.REL_EXPIRED, 1);
+ ff = runner.getFlowFilesForRelationship(Wait.REL_EXPIRED).get(0);
+ ff.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP); // timestamp must be cleared
runner.clearTransferState();
}
@@ -129,7 +131,7 @@ public class TestWait {
runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
MockFlowFile ff = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
- ff.assertAttributeExists(Wait.WAIT_START_TIMESTAMP);
+ ff.assertAttributeExists(Wait.WAIT_START_TIMESTAMP); // timestamp must be set
runner.clearTransferState();
runner.enqueue(ff);
@@ -145,6 +147,7 @@ public class TestWait {
runner.assertAllFlowFilesTransferred(Wait.REL_EXPIRED, 1);
ff = runner.getFlowFilesForRelationship(Wait.REL_EXPIRED).get(0);
+ ff.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP); // timestamp must be cleared
// Even if wait didn't complete, signal attributes should be set
ff.assertAttributeEquals("wait.counter.total", "3");
ff.assertAttributeEquals("wait.counter.counter-A", "1");
@@ -161,13 +164,14 @@ public class TestWait {
final Map<String, String> props = new HashMap<>();
props.put("releaseSignalAttribute", "1");
- props.put("wait.start.timestamp", "blue bunny");
+ props.put(Wait.WAIT_START_TIMESTAMP, "blue bunny");
runner.enqueue(new byte[]{}, props);
runner.run();
runner.assertAllFlowFilesTransferred(Wait.REL_FAILURE, 1);
- runner.getFlowFilesForRelationship(Wait.REL_FAILURE).get(0).assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP);
+ MockFlowFile ff = runner.getFlowFilesForRelationship(Wait.REL_FAILURE).get(0);
+ ff.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP); // timestamp must be cleared
runner.clearTransferState();
}
@@ -181,8 +185,10 @@ public class TestWait {
runner.run();
runner.assertAllFlowFilesTransferred(Wait.REL_FAILURE, 1);
- runner.getFlowFilesForRelationship(Wait.REL_FAILURE).get(0).assertAttributeNotExists("wait.counter.total");
- runner.getFlowFilesForRelationship(Wait.REL_FAILURE).get(0).assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP);
+ MockFlowFile ff = runner.getFlowFilesForRelationship(Wait.REL_FAILURE).get(0);
+ ff.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP); // timestamp must be cleared
+ ff.assertAttributeNotExists("wait.counter.total");
+
runner.clearTransferState();
}
@@ -217,6 +223,8 @@ public class TestWait {
runner.run(1, false);
runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
+ MockFlowFile ff = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
+ ff.assertAttributeExists(Wait.WAIT_START_TIMESTAMP); // timestamp must be set
runner.clearTransferState();
// The signal id should be penalized
@@ -264,9 +272,8 @@ public class TestWait {
runner.assertTransferCount(Wait.REL_SUCCESS, 1);
final MockFlowFile outputFlowFile = runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0);
+ outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP); // timestamp must be cleared
- // timer cleared
- outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP);
// show a new attribute was copied from the cache
assertEquals("notifyValue", outputFlowFile.getAttribute("notify.only"));
// show that the original attributes are still there
@@ -307,9 +314,8 @@ public class TestWait {
runner.assertTransferCount(Wait.REL_SUCCESS, 1);
final MockFlowFile outputFlowFile = runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0);
+ outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP); // timestamp must be cleared
- // timer cleared
- outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP);
// show a new attribute was copied from the cache
assertEquals("notifyValue", outputFlowFile.getAttribute("notify.only"));
// show that the original attributes are still there
@@ -348,7 +354,9 @@ public class TestWait {
runner.run();
runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
MockFlowFile waitingFlowFile = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
- waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP);
+ waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP); // timestamp must be set
+ String initialTimestamp = waitingFlowFile.getAttribute(Wait.WAIT_START_TIMESTAMP);
+
/*
* 2nd iteration.
*/
@@ -362,7 +370,8 @@ public class TestWait {
runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
// Still waiting since total count doesn't reach to 3.
waitingFlowFile = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
- waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP);
+ waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP); // timestamp must be set
+ waitingFlowFile.assertAttributeEquals(Wait.WAIT_START_TIMESTAMP, initialTimestamp); // timestamp must remain constant
/*
* 3rd iteration.
@@ -374,7 +383,8 @@ public class TestWait {
runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
// Still waiting since total count doesn't reach to 3.
waitingFlowFile = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
- waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP);
+ waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP); // timestamp must be set
+ waitingFlowFile.assertAttributeEquals(Wait.WAIT_START_TIMESTAMP, initialTimestamp); // timestamp must remain constant
/*
* 4th iteration.
@@ -389,9 +399,7 @@ public class TestWait {
runner.assertAllFlowFilesTransferred(Wait.REL_SUCCESS, 1);
final MockFlowFile outputFlowFile = runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0);
-
- // wait timer cleared
- outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP);
+ outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP); // timestamp must be cleared
// show a new attribute was copied from the cache
assertEquals("notifyValue", outputFlowFile.getAttribute("notify.only"));
@@ -434,7 +442,8 @@ public class TestWait {
runner.run();
runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
MockFlowFile waitingFlowFile = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
- waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP);
+ waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP); // timestamp must be set
+ String initialTimestamp = waitingFlowFile.getAttribute(Wait.WAIT_START_TIMESTAMP);
/*
* 2nd iteration.
@@ -449,7 +458,8 @@ public class TestWait {
runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
// Still waiting since counter-B doesn't reach to 2.
waitingFlowFile = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
- waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP);
+ waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP); // timestamp must be set
+ waitingFlowFile.assertAttributeEquals(Wait.WAIT_START_TIMESTAMP, initialTimestamp); // timestamp must remain constant
/*
* 3rd iteration.
@@ -464,7 +474,8 @@ public class TestWait {
runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
// Still waiting since total count doesn't reach to 3.
waitingFlowFile = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
- waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP);
+ waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP); // timestamp must be set
+ waitingFlowFile.assertAttributeEquals(Wait.WAIT_START_TIMESTAMP, initialTimestamp); // timestamp must remain constant
/*
* 4th iteration.
@@ -479,9 +490,8 @@ public class TestWait {
runner.assertAllFlowFilesTransferred(Wait.REL_SUCCESS, 1);
final MockFlowFile outputFlowFile = runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0);
+ outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP); // timestamp must be cleared
- // wait timer cleared
- outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP);
// show a new attribute was copied from the cache
assertEquals("notifyValue", outputFlowFile.getAttribute("notify.only"));
// show that the original attributes are still there
@@ -534,8 +544,7 @@ public class TestWait {
runner.run();
runner.assertAllFlowFilesTransferred(Wait.REL_SUCCESS, 1);
MockFlowFile outputFlowFile = runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0);
- // timer cleared
- outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP);
+ outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP); // timestamp must be cleared
outputFlowFile.assertAttributeEquals("wait.counter.counter", "2");
// expect counter to be decremented to 0 and releasable count remains 1.
@@ -552,8 +561,7 @@ public class TestWait {
runner.run();
runner.assertAllFlowFilesTransferred(Wait.REL_SUCCESS, 1);
outputFlowFile = runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0);
- // timer cleared
- outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP);
+ outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP); // timestamp must be cleared
// All counters are consumed.
outputFlowFile.assertAttributeEquals("wait.counter.counter", "0");