You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ze...@apache.org on 2018/06/08 13:47:19 UTC

nifi git commit: NIFI-5264 - Added attribute for validation error message in ValidateCSV

Repository: nifi
Updated Branches:
  refs/heads/master 49228aa5d -> 6e067734d


NIFI-5264 - Added attribute for validation error message in ValidateCSV

This closes #2769

Signed-off-by: zenfenan <ze...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/6e067734
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/6e067734
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/6e067734

Branch: refs/heads/master
Commit: 6e067734d5bed7dbabc4af2dae1f23bb980e3957
Parents: 49228aa
Author: Pierre Villard <pi...@gmail.com>
Authored: Thu Jun 7 15:03:16 2018 +0200
Committer: zenfenan <si...@gmail.com>
Committed: Fri Jun 8 19:16:20 2018 +0530

----------------------------------------------------------------------
 .../apache/nifi/processors/standard/ValidateCsv.java    | 12 +++++++++++-
 .../nifi/processors/standard/TestValidateCsv.java       |  4 ++++
 2 files changed, 15 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/6e067734/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java
index 667d4f9..9796822 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java
@@ -92,7 +92,8 @@ import org.supercsv.prefs.CsvPreference;
 @WritesAttributes({
     @WritesAttribute(attribute="count.valid.lines", description="If line by line validation, number of valid lines extracted from the source data"),
     @WritesAttribute(attribute="count.invalid.lines", description="If line by line validation, number of invalid lines extracted from the source data"),
-    @WritesAttribute(attribute="count.total.lines", description="If line by line validation, total number of lines in the source data")
+    @WritesAttribute(attribute="count.total.lines", description="If line by line validation, total number of lines in the source data"),
+    @WritesAttribute(attribute="validation.error.message", description="For flow files routed to invalid, message of the first validation error")
 })
 public class ValidateCsv extends AbstractProcessor {
 
@@ -455,6 +456,7 @@ public class ValidateCsv extends AbstractProcessor {
         final AtomicReference<Integer> totalCount = new AtomicReference<Integer>(0);
         final AtomicReference<FlowFile> invalidFF = new AtomicReference<FlowFile>(null);
         final AtomicReference<FlowFile> validFF = new AtomicReference<FlowFile>(null);
+        final AtomicReference<String> validationError = new AtomicReference<String>(null);
 
         if(!isWholeFFValidation) {
             invalidFF.set(session.create(flowFile));
@@ -514,6 +516,7 @@ public class ValidateCsv extends AbstractProcessor {
                         } catch (final SuperCsvException e) {
                             valid.set(false);
                             if(isWholeFFValidation) {
+                                validationError.set(e.getLocalizedMessage());
                                 logger.debug("Failed to validate {} against schema due to {}; routing to 'invalid'", new Object[]{flowFile}, e);
                                 break;
                             } else {
@@ -528,6 +531,10 @@ public class ValidateCsv extends AbstractProcessor {
                                 if(isFirstLineInvalid.get()) {
                                     isFirstLineInvalid.set(false);
                                 }
+
+                                if(validationError.get() == null) {
+                                    validationError.set(e.getLocalizedMessage());
+                                }
                             }
                         } finally {
                             if(!isWholeFFValidation) {
@@ -554,6 +561,7 @@ public class ValidateCsv extends AbstractProcessor {
                 session.transfer(flowFile, REL_VALID);
             } else {
                 session.getProvenanceReporter().route(flowFile, REL_INVALID);
+                session.putAttribute(flowFile, "validation.error.message", validationError.get());
                 session.transfer(flowFile, REL_INVALID);
             }
         } else {
@@ -578,6 +586,7 @@ public class ValidateCsv extends AbstractProcessor {
                 session.getProvenanceReporter().route(invalidFF.get(), REL_INVALID, (totalCount.get() - okCount.get()) + " invalid line(s)");
                 session.putAttribute(invalidFF.get(), "count.invalid.lines", Integer.toString((totalCount.get() - okCount.get())));
                 session.putAttribute(invalidFF.get(), "count.total.lines", Integer.toString(totalCount.get()));
+                session.putAttribute(invalidFF.get(), "validation.error.message", validationError.get());
                 session.transfer(invalidFF.get(), REL_INVALID);
                 session.remove(flowFile);
             } else {
@@ -585,6 +594,7 @@ public class ValidateCsv extends AbstractProcessor {
                 session.getProvenanceReporter().route(invalidFF.get(), REL_INVALID, "All " + totalCount.get() + " line(s) are invalid");
                 session.putAttribute(invalidFF.get(), "count.invalid.lines", Integer.toString(totalCount.get()));
                 session.putAttribute(invalidFF.get(), "count.total.lines", Integer.toString(totalCount.get()));
+                session.putAttribute(invalidFF.get(), "validation.error.message", validationError.get());
                 session.transfer(invalidFF.get(), REL_INVALID);
                 session.remove(validFF.get());
                 session.remove(flowFile);

http://git-wip-us.apache.org/repos/asf/nifi/blob/6e067734/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateCsv.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateCsv.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateCsv.java
index 097aad8..b03aed4 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateCsv.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateCsv.java
@@ -121,6 +121,8 @@ public class TestValidateCsv {
         runner.enqueue("John,22/111954,63.2\r\nBob,01/03/2004,45.0");
         runner.run();
         runner.assertTransferCount(ValidateCsv.REL_INVALID, 1);
+        runner.getFlowFilesForRelationship(ValidateCsv.REL_INVALID).get(0).assertAttributeEquals("validation.error.message",
+                "'22/111954' could not be parsed as a Date");
     }
 
     @Test
@@ -197,6 +199,8 @@ public class TestValidateCsv {
         runner.enqueue("test,test,testapache.org");
         runner.run();
         runner.assertTransferCount(ValidateCsv.REL_INVALID, 1);
+        runner.getFlowFilesForRelationship(ValidateCsv.REL_INVALID).get(0).assertAttributeEquals("validation.error.message",
+                "'testapache.org' does not match the regular expression '[a-z0-9\\._]+@[a-z0-9\\.]+'");
     }
 
     @Test