You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ze...@apache.org on 2018/06/08 13:47:19 UTC
nifi git commit: NIFI-5264 - Added attribute for validation error
message in ValidateCSV
Repository: nifi
Updated Branches:
refs/heads/master 49228aa5d -> 6e067734d
NIFI-5264 - Added attribute for validation error message in ValidateCSV
This closes #2769
Signed-off-by: zenfenan <ze...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/6e067734
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/6e067734
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/6e067734
Branch: refs/heads/master
Commit: 6e067734d5bed7dbabc4af2dae1f23bb980e3957
Parents: 49228aa
Author: Pierre Villard <pi...@gmail.com>
Authored: Thu Jun 7 15:03:16 2018 +0200
Committer: zenfenan <si...@gmail.com>
Committed: Fri Jun 8 19:16:20 2018 +0530
----------------------------------------------------------------------
.../apache/nifi/processors/standard/ValidateCsv.java | 12 +++++++++++-
.../nifi/processors/standard/TestValidateCsv.java | 4 ++++
2 files changed, 15 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/6e067734/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java
index 667d4f9..9796822 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java
@@ -92,7 +92,8 @@ import org.supercsv.prefs.CsvPreference;
@WritesAttributes({
@WritesAttribute(attribute="count.valid.lines", description="If line by line validation, number of valid lines extracted from the source data"),
@WritesAttribute(attribute="count.invalid.lines", description="If line by line validation, number of invalid lines extracted from the source data"),
- @WritesAttribute(attribute="count.total.lines", description="If line by line validation, total number of lines in the source data")
+ @WritesAttribute(attribute="count.total.lines", description="If line by line validation, total number of lines in the source data"),
+ @WritesAttribute(attribute="validation.error.message", description="For flow files routed to invalid, message of the first validation error")
})
public class ValidateCsv extends AbstractProcessor {
@@ -455,6 +456,7 @@ public class ValidateCsv extends AbstractProcessor {
final AtomicReference<Integer> totalCount = new AtomicReference<Integer>(0);
final AtomicReference<FlowFile> invalidFF = new AtomicReference<FlowFile>(null);
final AtomicReference<FlowFile> validFF = new AtomicReference<FlowFile>(null);
+ final AtomicReference<String> validationError = new AtomicReference<String>(null);
if(!isWholeFFValidation) {
invalidFF.set(session.create(flowFile));
@@ -514,6 +516,7 @@ public class ValidateCsv extends AbstractProcessor {
} catch (final SuperCsvException e) {
valid.set(false);
if(isWholeFFValidation) {
+ validationError.set(e.getLocalizedMessage());
logger.debug("Failed to validate {} against schema due to {}; routing to 'invalid'", new Object[]{flowFile}, e);
break;
} else {
@@ -528,6 +531,10 @@ public class ValidateCsv extends AbstractProcessor {
if(isFirstLineInvalid.get()) {
isFirstLineInvalid.set(false);
}
+
+ if(validationError.get() == null) {
+ validationError.set(e.getLocalizedMessage());
+ }
}
} finally {
if(!isWholeFFValidation) {
@@ -554,6 +561,7 @@ public class ValidateCsv extends AbstractProcessor {
session.transfer(flowFile, REL_VALID);
} else {
session.getProvenanceReporter().route(flowFile, REL_INVALID);
+ session.putAttribute(flowFile, "validation.error.message", validationError.get());
session.transfer(flowFile, REL_INVALID);
}
} else {
@@ -578,6 +586,7 @@ public class ValidateCsv extends AbstractProcessor {
session.getProvenanceReporter().route(invalidFF.get(), REL_INVALID, (totalCount.get() - okCount.get()) + " invalid line(s)");
session.putAttribute(invalidFF.get(), "count.invalid.lines", Integer.toString((totalCount.get() - okCount.get())));
session.putAttribute(invalidFF.get(), "count.total.lines", Integer.toString(totalCount.get()));
+ session.putAttribute(invalidFF.get(), "validation.error.message", validationError.get());
session.transfer(invalidFF.get(), REL_INVALID);
session.remove(flowFile);
} else {
@@ -585,6 +594,7 @@ public class ValidateCsv extends AbstractProcessor {
session.getProvenanceReporter().route(invalidFF.get(), REL_INVALID, "All " + totalCount.get() + " line(s) are invalid");
session.putAttribute(invalidFF.get(), "count.invalid.lines", Integer.toString(totalCount.get()));
session.putAttribute(invalidFF.get(), "count.total.lines", Integer.toString(totalCount.get()));
+ session.putAttribute(invalidFF.get(), "validation.error.message", validationError.get());
session.transfer(invalidFF.get(), REL_INVALID);
session.remove(validFF.get());
session.remove(flowFile);
http://git-wip-us.apache.org/repos/asf/nifi/blob/6e067734/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateCsv.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateCsv.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateCsv.java
index 097aad8..b03aed4 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateCsv.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateCsv.java
@@ -121,6 +121,8 @@ public class TestValidateCsv {
runner.enqueue("John,22/111954,63.2\r\nBob,01/03/2004,45.0");
runner.run();
runner.assertTransferCount(ValidateCsv.REL_INVALID, 1);
+ runner.getFlowFilesForRelationship(ValidateCsv.REL_INVALID).get(0).assertAttributeEquals("validation.error.message",
+ "'22/111954' could not be parsed as a Date");
}
@Test
@@ -197,6 +199,8 @@ public class TestValidateCsv {
runner.enqueue("test,test,testapache.org");
runner.run();
runner.assertTransferCount(ValidateCsv.REL_INVALID, 1);
+ runner.getFlowFilesForRelationship(ValidateCsv.REL_INVALID).get(0).assertAttributeEquals("validation.error.message",
+ "'testapache.org' does not match the regular expression '[a-z0-9\\._]+@[a-z0-9\\.]+'");
}
@Test