You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2020/06/01 18:09:06 UTC
[nifi] branch master updated: NIFI-7477 Optionally adding
validation details as a new attribute of the flowfile
This is an automated email from the ASF dual-hosted git repository.
mattyb149 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 04711ab NIFI-7477 Optionally adding validation details as a new attribute of the flowfile
04711ab is described below
commit 04711ab466a2b2c8c3b618f7617fc243c6576156
Author: jahenaor <ja...@gmail.com>
AuthorDate: Wed May 27 22:55:25 2020 -0500
NIFI-7477 Optionally adding validation details as a new attribute of the flowfile
NIFI-7477 Improving description and unit test now verifies attribute content
NIFI-7477: Fixed checkstyle errors
Signed-off-by: Matthew Burgess <ma...@apache.org>
This closes #4301
---
.../nifi/processors/standard/ValidateRecord.java | 46 ++++++++++++++++++++--
.../processors/standard/TestValidateRecord.java | 46 ++++++++++++++++++++++
2 files changed, 89 insertions(+), 3 deletions(-)
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java
index b3255e6..1210eec 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java
@@ -180,6 +180,26 @@ public class ValidateRecord extends AbstractProcessor {
.defaultValue("true")
.required(true)
.build();
+ static final PropertyDescriptor VALIDATION_DETAILS_ATTRIBUTE_NAME = new PropertyDescriptor.Builder()
+ .name("validation-details-attribute-name")
+ .displayName("Validation Details Attribute Name")
+ .description("If specified, when a validation error occurs, this attribute name will be used to leave the details. The number of characters will be limited "
+ + "by the property 'Maximum Validation Details Length'.")
+ .required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(StandardValidators.ATTRIBUTE_KEY_VALIDATOR)
+ .defaultValue(null)
+ .build();
+ static final PropertyDescriptor MAX_VALIDATION_DETAILS_LENGTH = new PropertyDescriptor.Builder()
+ .name("maximum-validation-details-length")
+ .displayName("Maximum Validation Details Length")
+ .description("Specifies the maximum number of characters that validation details value can have. Any characters beyond the max will be truncated. "
+ + "This property is only used if 'Validation Details Attribute Name' is set")
+ .required(false)
+ .defaultValue("1024")
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .build();
static final Relationship REL_VALID = new Relationship.Builder()
.name("valid")
@@ -207,6 +227,8 @@ public class ValidateRecord extends AbstractProcessor {
properties.add(SCHEMA_TEXT);
properties.add(ALLOW_EXTRA_FIELDS);
properties.add(STRICT_TYPE_CHECKING);
+ properties.add(VALIDATION_DETAILS_ATTRIBUTE_NAME);
+ properties.add(MAX_VALIDATION_DETAILS_LENGTH);
return properties;
}
@@ -350,7 +372,7 @@ public class ValidateRecord extends AbstractProcessor {
}
if (validWriter != null) {
- completeFlowFile(session, validFlowFile, validWriter, REL_VALID, null);
+ completeFlowFile(context, session, validFlowFile, validWriter, REL_VALID, null);
}
if (invalidWriter != null) {
@@ -389,7 +411,7 @@ public class ValidateRecord extends AbstractProcessor {
}
final String validationErrorString = errorBuilder.toString();
- completeFlowFile(session, invalidFlowFile, invalidWriter, REL_INVALID, validationErrorString);
+ completeFlowFile(context, session, invalidFlowFile, invalidWriter, REL_INVALID, validationErrorString);
}
} finally {
closeQuietly(validWriter);
@@ -424,14 +446,32 @@ public class ValidateRecord extends AbstractProcessor {
}
}
- private void completeFlowFile(final ProcessSession session, final FlowFile flowFile, final RecordSetWriter writer, final Relationship relationship, final String details) throws IOException {
+ private void completeFlowFile(final ProcessContext context, final ProcessSession session, final FlowFile flowFile, final RecordSetWriter writer,
+ final Relationship relationship, final String details) throws IOException {
final WriteResult writeResult = writer.finishRecordSet();
writer.close();
+ final String validationDetailsAttributeName = context.getProperty(VALIDATION_DETAILS_ATTRIBUTE_NAME)
+ .evaluateAttributeExpressions(flowFile).getValue();
+
+ final Integer maxValidationDetailsLength = context.getProperty(MAX_VALIDATION_DETAILS_LENGTH).evaluateAttributeExpressions(flowFile).asInteger();
+
final Map<String, String> attributes = new HashMap<>();
attributes.putAll(writeResult.getAttributes());
attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
+
+ if(validationDetailsAttributeName != null && details != null && !details.isEmpty()) {
+ String truncatedDetails = details;
+
+ //Truncating only when it exceeds the configured maximum
+ if (truncatedDetails.length() > maxValidationDetailsLength) {
+ truncatedDetails = truncatedDetails.substring(0, maxValidationDetailsLength);
+ }
+
+ attributes.put(validationDetailsAttributeName, truncatedDetails);
+ }
+
session.putAllAttributes(flowFile, attributes);
session.transfer(flowFile, relationship);
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateRecord.java
index 74c3a17..895e3de 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateRecord.java
@@ -594,4 +594,50 @@ public class TestValidateRecord {
}
}
+ @Test
+ public void testValidationsDetailsAttributeForInvalidRecords() throws InitializationException, UnsupportedEncodingException, IOException {
+ final String schema = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/person-with-name-string.avsc")), "UTF-8");
+
+ final CSVReader csvReader = new CSVReader();
+ runner.addControllerService("reader", csvReader);
+ runner.setProperty(csvReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+ runner.setProperty(csvReader, SchemaAccessUtils.SCHEMA_TEXT, schema);
+ runner.setProperty(csvReader, CSVUtils.FIRST_LINE_IS_HEADER, "false");
+ runner.setProperty(csvReader, CSVUtils.QUOTE_MODE, CSVUtils.QUOTE_MINIMAL.getValue());
+ runner.setProperty(csvReader, CSVUtils.TRAILING_DELIMITER, "false");
+ runner.enableControllerService(csvReader);
+
+ final MockRecordWriter validWriter = new MockRecordWriter("valid", false);
+ runner.addControllerService("writer", validWriter);
+ runner.enableControllerService(validWriter);
+
+ final MockRecordWriter invalidWriter = new MockRecordWriter("invalid", true);
+ runner.addControllerService("invalid-writer", invalidWriter);
+ runner.enableControllerService(invalidWriter);
+
+ runner.setProperty(ValidateRecord.RECORD_READER, "reader");
+ runner.setProperty(ValidateRecord.RECORD_WRITER, "writer");
+ runner.setProperty(ValidateRecord.INVALID_RECORD_WRITER, "invalid-writer");
+ runner.setProperty(ValidateRecord.ALLOW_EXTRA_FIELDS, "false");
+ runner.setProperty(ValidateRecord.MAX_VALIDATION_DETAILS_LENGTH, "150");
+ runner.setProperty(ValidateRecord.VALIDATION_DETAILS_ATTRIBUTE_NAME, "valDetails");
+
+ final String content = "1, John Doe\n"
+ + "2, Jane Doe\n"
+ + "Three, Jack Doe\n";
+
+ runner.enqueue(content);
+ runner.run();
+
+ runner.assertTransferCount(ValidateRecord.REL_INVALID, 1);
+ runner.assertTransferCount(ValidateRecord.REL_FAILURE, 0);
+
+ final MockFlowFile invalidFlowFile = runner.getFlowFilesForRelationship(ValidateRecord.REL_INVALID).get(0);
+ invalidFlowFile.assertAttributeEquals("record.count", "1");
+ invalidFlowFile.assertContentEquals("invalid\n\"Three\",\"Jack Doe\"\n");
+ invalidFlowFile.assertAttributeExists("valDetails");
+ invalidFlowFile.assertAttributeEquals("valDetails", "Records in this FlowFile were invalid for the following reasons: ; "
+ + "The following 1 fields had values whose type did not match the schema: [/id]");
+ }
+
}