You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2020/06/01 18:09:06 UTC

[nifi] branch master updated: NIFI-7477 Optionally adding validation details as a new attribute of the flowfile

This is an automated email from the ASF dual-hosted git repository.

mattyb149 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 04711ab  NIFI-7477 Optionally adding validation details as a new attribute of the flowfile
04711ab is described below

commit 04711ab466a2b2c8c3b618f7617fc243c6576156
Author: jahenaor <ja...@gmail.com>
AuthorDate: Wed May 27 22:55:25 2020 -0500

    NIFI-7477 Optionally adding validation details as a new attribute of the flowfile
    
    NIFI-7477 Improving description and unit test now verifies attribute content
    
    NIFI-7477: Fixed checkstyle errors
    
    Signed-off-by: Matthew Burgess <ma...@apache.org>
    
    This closes #4301
---
 .../nifi/processors/standard/ValidateRecord.java   | 46 ++++++++++++++++++++--
 .../processors/standard/TestValidateRecord.java    | 46 ++++++++++++++++++++++
 2 files changed, 89 insertions(+), 3 deletions(-)

diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java
index b3255e6..1210eec 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java
@@ -180,6 +180,26 @@ public class ValidateRecord extends AbstractProcessor {
         .defaultValue("true")
         .required(true)
         .build();
+    static final PropertyDescriptor VALIDATION_DETAILS_ATTRIBUTE_NAME = new PropertyDescriptor.Builder()
+        .name("validation-details-attribute-name")
+        .displayName("Validation Details Attribute Name")
+        .description("If specified, when a validation error occurs, this attribute name will be used to leave the details. The number of characters will be limited "
+            + "by the property 'Maximum Validation Details Length'.")
+        .required(false)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .addValidator(StandardValidators.ATTRIBUTE_KEY_VALIDATOR)
+        .defaultValue(null)
+        .build();
+    static final PropertyDescriptor MAX_VALIDATION_DETAILS_LENGTH = new PropertyDescriptor.Builder()
+        .name("maximum-validation-details-length")
+        .displayName("Maximum Validation Details Length")
+        .description("Specifies the maximum number of characters that validation details value can have. Any characters beyond the max will be truncated. "
+            + "This property is only used if 'Validation Details Attribute Name' is set")
+        .required(false)
+        .defaultValue("1024")
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+        .build();
 
     static final Relationship REL_VALID = new Relationship.Builder()
         .name("valid")
@@ -207,6 +227,8 @@ public class ValidateRecord extends AbstractProcessor {
         properties.add(SCHEMA_TEXT);
         properties.add(ALLOW_EXTRA_FIELDS);
         properties.add(STRICT_TYPE_CHECKING);
+        properties.add(VALIDATION_DETAILS_ATTRIBUTE_NAME);
+        properties.add(MAX_VALIDATION_DETAILS_LENGTH);
         return properties;
     }
 
@@ -350,7 +372,7 @@ public class ValidateRecord extends AbstractProcessor {
                 }
 
                 if (validWriter != null) {
-                    completeFlowFile(session, validFlowFile, validWriter, REL_VALID, null);
+                    completeFlowFile(context, session, validFlowFile, validWriter, REL_VALID, null);
                 }
 
                 if (invalidWriter != null) {
@@ -389,7 +411,7 @@ public class ValidateRecord extends AbstractProcessor {
                     }
 
                     final String validationErrorString = errorBuilder.toString();
-                    completeFlowFile(session, invalidFlowFile, invalidWriter, REL_INVALID, validationErrorString);
+                    completeFlowFile(context, session, invalidFlowFile, invalidWriter, REL_INVALID, validationErrorString);
                 }
             } finally {
                 closeQuietly(validWriter);
@@ -424,14 +446,32 @@ public class ValidateRecord extends AbstractProcessor {
         }
     }
 
-    private void completeFlowFile(final ProcessSession session, final FlowFile flowFile, final RecordSetWriter writer, final Relationship relationship, final String details) throws IOException {
+    private void completeFlowFile(final ProcessContext context, final ProcessSession session, final FlowFile flowFile, final RecordSetWriter writer,
+            final Relationship relationship, final String details) throws IOException {
         final WriteResult writeResult = writer.finishRecordSet();
         writer.close();
 
+        final String validationDetailsAttributeName = context.getProperty(VALIDATION_DETAILS_ATTRIBUTE_NAME)
+                .evaluateAttributeExpressions(flowFile).getValue();
+
+        final Integer maxValidationDetailsLength = context.getProperty(MAX_VALIDATION_DETAILS_LENGTH).evaluateAttributeExpressions(flowFile).asInteger();
+
         final Map<String, String> attributes = new HashMap<>();
         attributes.putAll(writeResult.getAttributes());
         attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
         attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
+
+        if(validationDetailsAttributeName != null && details != null && !details.isEmpty()) {
+            String truncatedDetails = details;
+
+            //Truncating only when it exceeds the configured maximum
+            if (truncatedDetails.length() > maxValidationDetailsLength) {
+                truncatedDetails = truncatedDetails.substring(0, maxValidationDetailsLength);
+            }
+
+            attributes.put(validationDetailsAttributeName, truncatedDetails);
+        }
+
         session.putAllAttributes(flowFile, attributes);
 
         session.transfer(flowFile, relationship);
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateRecord.java
index 74c3a17..895e3de 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateRecord.java
@@ -594,4 +594,50 @@ public class TestValidateRecord {
         }
     }
 
+    @Test
+    public void testValidationsDetailsAttributeForInvalidRecords()  throws InitializationException, UnsupportedEncodingException, IOException {
+        final String schema = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/person-with-name-string.avsc")), "UTF-8");
+
+        final CSVReader csvReader = new CSVReader();
+        runner.addControllerService("reader", csvReader);
+        runner.setProperty(csvReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+        runner.setProperty(csvReader, SchemaAccessUtils.SCHEMA_TEXT, schema);
+        runner.setProperty(csvReader, CSVUtils.FIRST_LINE_IS_HEADER, "false");
+        runner.setProperty(csvReader, CSVUtils.QUOTE_MODE, CSVUtils.QUOTE_MINIMAL.getValue());
+        runner.setProperty(csvReader, CSVUtils.TRAILING_DELIMITER, "false");
+        runner.enableControllerService(csvReader);
+
+        final MockRecordWriter validWriter = new MockRecordWriter("valid", false);
+        runner.addControllerService("writer", validWriter);
+        runner.enableControllerService(validWriter);
+
+        final MockRecordWriter invalidWriter = new MockRecordWriter("invalid", true);
+        runner.addControllerService("invalid-writer", invalidWriter);
+        runner.enableControllerService(invalidWriter);
+
+        runner.setProperty(ValidateRecord.RECORD_READER, "reader");
+        runner.setProperty(ValidateRecord.RECORD_WRITER, "writer");
+        runner.setProperty(ValidateRecord.INVALID_RECORD_WRITER, "invalid-writer");
+        runner.setProperty(ValidateRecord.ALLOW_EXTRA_FIELDS, "false");
+        runner.setProperty(ValidateRecord.MAX_VALIDATION_DETAILS_LENGTH, "150");
+        runner.setProperty(ValidateRecord.VALIDATION_DETAILS_ATTRIBUTE_NAME, "valDetails");
+
+        final String content = "1, John Doe\n"
+            + "2, Jane Doe\n"
+            + "Three, Jack Doe\n";
+
+        runner.enqueue(content);
+        runner.run();
+
+        runner.assertTransferCount(ValidateRecord.REL_INVALID, 1);
+        runner.assertTransferCount(ValidateRecord.REL_FAILURE, 0);
+
+        final MockFlowFile invalidFlowFile = runner.getFlowFilesForRelationship(ValidateRecord.REL_INVALID).get(0);
+        invalidFlowFile.assertAttributeEquals("record.count", "1");
+        invalidFlowFile.assertContentEquals("invalid\n\"Three\",\"Jack Doe\"\n");
+        invalidFlowFile.assertAttributeExists("valDetails");
+        invalidFlowFile.assertAttributeEquals("valDetails", "Records in this FlowFile were invalid for the following reasons: ; "
+                + "The following 1 fields had values whose type did not match the schema: [/id]");
+    }
+
 }