You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2017/06/22 21:03:34 UTC

nifi git commit: NIFI-4017: Emit provenance event from Notify.

Repository: nifi
Updated Branches:
  refs/heads/master f54e14656 -> 5c755c006


NIFI-4017: Emit provenance event from Notify.

This closes #1890.

Signed-off-by: Andy LoPresto <al...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/5c755c00
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/5c755c00
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/5c755c00

Branch: refs/heads/master
Commit: 5c755c006b37c0b65cce930d291c747155f58916
Parents: f54e146
Author: Koji Kawamura <ij...@apache.org>
Authored: Mon Jun 5 18:41:56 2017 +0900
Committer: Andy LoPresto <al...@apache.org>
Committed: Thu Jun 22 16:49:31 2017 -0400

----------------------------------------------------------------------
 .../org/apache/nifi/processors/standard/Notify.java    | 13 ++++++++++---
 .../apache/nifi/processors/standard/TestNotify.java    |  7 +++++++
 2 files changed, 17 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/5c755c00/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Notify.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Notify.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Notify.java
index 346f1fb..4493390 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Notify.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Notify.java
@@ -30,6 +30,7 @@ import org.apache.nifi.annotation.behavior.EventDriven;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
@@ -53,10 +54,14 @@ import org.apache.nifi.processor.util.StandardValidators;
 @CapabilityDescription("Caches a release signal identifier in the distributed cache, optionally along with "
         + "the FlowFile's attributes.  Any flow files held at a corresponding Wait processor will be "
         + "released once this signal in the cache is discovered.")
+@WritesAttribute(attribute = "notified", description = "All FlowFiles will have an attribute 'notified'. The value of this " +
+        "attribute is true, is the FlowFile is notified, otherwise false.")
 @SeeAlso(classNames = {"org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService", "org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer",
         "org.apache.nifi.processors.standard.Wait"})
 public class Notify extends AbstractProcessor {
 
+    public static final String NOTIFIED_ATTRIBUTE_NAME = "notified";
+
     // Identifies the distributed map cache client
     public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder()
             .name("distributed-cache-service")
@@ -210,7 +215,8 @@ public class Notify extends AbstractProcessor {
             // if the computed value is null, or empty, we transfer the flow file to failure relationship
             if (StringUtils.isBlank(signalId)) {
                 logger.error("FlowFile {} has no attribute for given Release Signal Identifier", new Object[] {flowFile});
-                session.transfer(flowFile, REL_FAILURE);
+                // set 'notified' attribute
+                session.transfer(session.putAttribute(flowFile, NOTIFIED_ATTRIBUTE_NAME, String.valueOf(false)), REL_FAILURE);
                 continue;
             }
 
@@ -226,7 +232,7 @@ public class Notify extends AbstractProcessor {
                     delta = Integer.parseInt(deltaStr);
                 } catch (final NumberFormatException e) {
                     logger.error("Failed to calculate delta for FlowFile {} due to {}", new Object[] {flowFile, e}, e);
-                    session.transfer(flowFile, REL_FAILURE);
+                    session.transfer(session.putAttribute(flowFile, NOTIFIED_ATTRIBUTE_NAME, String.valueOf(false)), REL_FAILURE);
                     continue;
                 }
             }
@@ -256,7 +262,8 @@ public class Notify extends AbstractProcessor {
             // retry after yielding for a while.
             try {
                 protocol.notify(signalId, signalBuffer.deltas, signalBuffer.attributesToCache);
-                session.transfer(signalBuffer.flowFiles, REL_SUCCESS);
+                signalBuffer.flowFiles.forEach(flowFile ->
+                        session.transfer(session.putAttribute(flowFile, NOTIFIED_ATTRIBUTE_NAME, String.valueOf(true)), REL_SUCCESS));
             } catch (IOException e) {
                 throw new RuntimeException(String.format("Unable to communicate with cache when processing %s due to %s", signalId, e), e);
             }

http://git-wip-us.apache.org/repos/asf/nifi/blob/5c755c00/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestNotify.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestNotify.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestNotify.java
index 3e0cd68..ab99af9 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestNotify.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestNotify.java
@@ -72,6 +72,7 @@ public class TestNotify {
         runner.run();
 
         runner.assertAllFlowFilesTransferred(Notify.REL_SUCCESS, 1);
+        runner.getFlowFilesForRelationship(Notify.REL_SUCCESS).get(0).assertAttributeEquals(Notify.NOTIFIED_ATTRIBUTE_NAME, "true");
         runner.clearTransferState();
 
         final Signal signal = new WaitNotifyProtocol(service).getSignal("1");
@@ -107,6 +108,7 @@ public class TestNotify {
         runner.run(3);
 
         runner.assertAllFlowFilesTransferred(Notify.REL_SUCCESS, 3);
+        runner.getFlowFilesForRelationship(Notify.REL_SUCCESS).forEach(flowFile -> flowFile.assertAttributeEquals(Notify.NOTIFIED_ATTRIBUTE_NAME, "true"));
         runner.clearTransferState();
 
         final Signal signal = new WaitNotifyProtocol(service).getSignal("someDataProcessing");
@@ -146,6 +148,7 @@ public class TestNotify {
 
         // Limited by the buffer count
         runner.assertAllFlowFilesTransferred(Notify.REL_SUCCESS, 2);
+        runner.getFlowFilesForRelationship(Notify.REL_SUCCESS).forEach(flowFile -> flowFile.assertAttributeEquals(Notify.NOTIFIED_ATTRIBUTE_NAME, "true"));
         runner.clearTransferState();
 
         Signal signal = new WaitNotifyProtocol(service).getSignal("someDataProcessing");
@@ -158,6 +161,7 @@ public class TestNotify {
         // Run it again, and it should process remaining one flow file.
         runner.run();
         runner.assertAllFlowFilesTransferred(Notify.REL_SUCCESS, 1);
+        runner.getFlowFilesForRelationship(Notify.REL_SUCCESS).forEach(flowFile -> flowFile.assertAttributeEquals(Notify.NOTIFIED_ATTRIBUTE_NAME, "true"));
         runner.clearTransferState();
 
         signal = new WaitNotifyProtocol(service).getSignal("someDataProcessing");
@@ -201,6 +205,7 @@ public class TestNotify {
         runner.run();
 
         runner.assertAllFlowFilesTransferred(Notify.REL_SUCCESS, 3);
+        runner.getFlowFilesForRelationship(Notify.REL_SUCCESS).forEach(flowFile -> flowFile.assertAttributeEquals(Notify.NOTIFIED_ATTRIBUTE_NAME, "true"));
         runner.clearTransferState();
 
         final Signal signal = new WaitNotifyProtocol(service).getSignal("someDataProcessing");
@@ -244,7 +249,9 @@ public class TestNotify {
 
         // Only failed records should be transferred to failure.
         runner.assertTransferCount(Notify.REL_SUCCESS, 2);
+        runner.getFlowFilesForRelationship(Notify.REL_SUCCESS).forEach(flowFile -> flowFile.assertAttributeEquals(Notify.NOTIFIED_ATTRIBUTE_NAME, "true"));
         runner.assertTransferCount(Notify.REL_FAILURE, 1);
+        runner.getFlowFilesForRelationship(Notify.REL_FAILURE).forEach(flowFile -> flowFile.assertAttributeEquals(Notify.NOTIFIED_ATTRIBUTE_NAME, "false"));
         runner.clearTransferState();
 
         final Signal signal = new WaitNotifyProtocol(service).getSignal("someDataProcessing");