You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2017/11/14 02:54:32 UTC

nifi git commit: NIFI-4577 - ValidateCsv - add attributes to indicate number of valid/invalid lines

Repository: nifi
Updated Branches:
  refs/heads/master af3a57871 -> 9e9c129c2


NIFI-4577 - ValidateCsv - add attributes to indicate number of valid/invalid lines

Signed-off-by: Matthew Burgess <ma...@apache.org>

This closes #2268


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/9e9c129c
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/9e9c129c
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/9e9c129c

Branch: refs/heads/master
Commit: 9e9c129c21061f5c66ba4246774738de7184406b
Parents: af3a578
Author: Pierre Villard <pi...@gmail.com>
Authored: Fri Nov 10 22:18:44 2017 +0100
Committer: Matthew Burgess <ma...@apache.org>
Committed: Mon Nov 13 21:48:30 2017 -0500

----------------------------------------------------------------------
 .../apache/nifi/processors/standard/ValidateCsv.java | 15 +++++++++++++++
 .../nifi/processors/standard/TestValidateCsv.java    |  8 ++++++--
 2 files changed, 21 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/9e9c129c/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java
index 43d3ef9..6bb4205 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java
@@ -35,6 +35,8 @@ import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SideEffectFree;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
@@ -86,6 +88,11 @@ import org.supercsv.prefs.CsvPreference;
 @Tags({"csv", "schema", "validation"})
 @CapabilityDescription("Validates the contents of FlowFiles against a user-specified CSV schema. " +
         "Take a look at the additional documentation of this processor for some schema examples.")
+@WritesAttributes({
+    @WritesAttribute(attribute="count.valid.lines", description="If line by line validation, number of valid lines extracted from the source data"),
+    @WritesAttribute(attribute="count.invalid.lines", description="If line by line validation, number of invalid lines extracted from the source data"),
+    @WritesAttribute(attribute="count.total.lines", description="If line by line validation, total number of lines in the source data")
+})
 public class ValidateCsv extends AbstractProcessor {
 
     private final static List<String> allowedOperators = Arrays.asList("ParseBigDecimal", "ParseBool", "ParseChar", "ParseDate",
@@ -542,6 +549,8 @@ public class ValidateCsv extends AbstractProcessor {
             if (valid.get()) {
                 logger.debug("Successfully validated {} against schema; routing to 'valid'", new Object[]{validFF.get()});
                 session.getProvenanceReporter().route(validFF.get(), REL_VALID, "All " + totalCount.get() + " line(s) are valid");
+                session.putAttribute(validFF.get(), "count.valid.lines", Integer.toString(totalCount.get()));
+                session.putAttribute(validFF.get(), "count.total.lines", Integer.toString(totalCount.get()));
                 session.transfer(validFF.get(), REL_VALID);
                 session.remove(invalidFF.get());
                 session.remove(flowFile);
@@ -552,13 +561,19 @@ public class ValidateCsv extends AbstractProcessor {
                 logger.debug("Successfully validated {}/{} line(s) in {} against schema; routing valid lines to 'valid' and invalid lines to 'invalid'",
                         new Object[]{okCount.get(), totalCount.get(), flowFile});
                 session.getProvenanceReporter().route(validFF.get(), REL_VALID, okCount.get() + " valid line(s)");
+                session.putAttribute(validFF.get(), "count.total.lines", Integer.toString(totalCount.get()));
+                session.putAttribute(validFF.get(), "count.valid.lines", Integer.toString(okCount.get()));
                 session.transfer(validFF.get(), REL_VALID);
                 session.getProvenanceReporter().route(invalidFF.get(), REL_INVALID, (totalCount.get() - okCount.get()) + " invalid line(s)");
+                session.putAttribute(invalidFF.get(), "count.invalid.lines", Integer.toString((totalCount.get() - okCount.get())));
+                session.putAttribute(invalidFF.get(), "count.total.lines", Integer.toString(totalCount.get()));
                 session.transfer(invalidFF.get(), REL_INVALID);
                 session.remove(flowFile);
             } else {
                 logger.debug("All lines in {} are invalid; routing to 'invalid'", new Object[]{invalidFF.get()});
                 session.getProvenanceReporter().route(invalidFF.get(), REL_INVALID, "All " + totalCount.get() + " line(s) are invalid");
+                session.putAttribute(invalidFF.get(), "count.invalid.lines", Integer.toString(totalCount.get()));
+                session.putAttribute(invalidFF.get(), "count.total.lines", Integer.toString(totalCount.get()));
                 session.transfer(invalidFF.get(), REL_INVALID);
                 session.remove(validFF.get());
                 session.remove(flowFile);

http://git-wip-us.apache.org/repos/asf/nifi/blob/9e9c129c/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateCsv.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateCsv.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateCsv.java
index c2d4d3f..5092ba8 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateCsv.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateCsv.java
@@ -90,14 +90,18 @@ public class TestValidateCsv {
 
         runner.setProperty(ValidateCsv.SCHEMA, "Unique()");
 
-        runner.enqueue("John\r\nBob\r\nBob\r\nJohn");
+        runner.enqueue("John\r\nBob\r\nBob\r\nJohn\r\nTom");
         runner.run();
 
         runner.assertTransferCount(ValidateCsv.REL_VALID, 1);
         runner.assertTransferCount(ValidateCsv.REL_INVALID, 1);
 
-        runner.getFlowFilesForRelationship(ValidateCsv.REL_VALID).get(0).assertContentEquals("John\r\nBob");
+        runner.getFlowFilesForRelationship(ValidateCsv.REL_VALID).get(0).assertContentEquals("John\r\nBob\r\nTom");
+        runner.getFlowFilesForRelationship(ValidateCsv.REL_VALID).get(0).assertAttributeEquals("count.total.lines", "5");
+        runner.getFlowFilesForRelationship(ValidateCsv.REL_VALID).get(0).assertAttributeEquals("count.valid.lines", "3");
         runner.getFlowFilesForRelationship(ValidateCsv.REL_INVALID).get(0).assertContentEquals("Bob\r\nJohn");
+        runner.getFlowFilesForRelationship(ValidateCsv.REL_INVALID).get(0).assertAttributeEquals("count.invalid.lines", "2");
+        runner.getFlowFilesForRelationship(ValidateCsv.REL_INVALID).get(0).assertAttributeEquals("count.total.lines", "5");
     }
 
     @Test