You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by martin-mucha <gi...@git.apache.org> on 2018/01/23 11:56:00 UTC

[GitHub] nifi pull request #2425: Emit failures array

GitHub user martin-mucha opened a pull request:

    https://github.com/apache/nifi/pull/2425

    Emit failures array

    2 patches:
    1st is proposal on small refactoring of ValidateRecord, which should significantly simplify inner structure
    2nd is introduction of new property; if set, array of validation errors will be stored in FlowFile attribute named accordingly to new property value.
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/martin-mucha/nifi emitFailuresArray

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/nifi/pull/2425.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2425
    
----
commit cc62bd8d743496932c1fec71ed973378b4ed27b4
Author: Martin Mucha <al...@...>
Date:   2018-01-22T10:59:01Z

    NIFI-4745: validate record refactoring
    
    all potential future expansion of this class is made pointlessly
    harder, due to its questionable structure, therefore I decided to
    provide refactor of it
    
    Signed-off-by: Martin Mucha <al...@gmail.com>

commit 548c1fe6d88dfd72a487a48e998f4eea1da57220
Author: Martin Mucha <al...@...>
Date:   2018-01-23T08:03:21Z

    NIFI-4745: new property to store validation errors
    
    added new property. If set, array of failures will be stored into
    attribute named according to value of this property
    
    Signed-off-by: Martin Mucha <al...@gmail.com>

----


---

[GitHub] nifi pull request #2425: Emit failures array

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2425#discussion_r164487280
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java ---
    @@ -242,11 +279,12 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
             final boolean allowExtraFields = context.getProperty(ALLOW_EXTRA_FIELDS).asBoolean();
             final boolean strictTypeChecking = context.getProperty(STRICT_TYPE_CHECKING).asBoolean();
     
    -        RecordSetWriter validWriter = null;
    -        RecordSetWriter invalidWriter = null;
             FlowFile validFlowFile = null;
             FlowFile invalidFlowFile = null;
     
    +        final List<Record> validRecords = new LinkedList<>();
    --- End diff --
    
    We need to be sure that we are not storing collections of records in heap but rather that we are writing them out in a streaming fashion. One of the goals of the record API is to allow arbitrarily large FlowFiles that are made up of small records. So if we have a 1 GB CSV file, for instance, this would result in OutOfMemoryError's very quickly.


---

[GitHub] nifi pull request #2425: Emit failures array

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2425#discussion_r164489104
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java ---
    @@ -166,6 +171,16 @@
             .required(true)
             .build();
     
    +
    +    static final PropertyDescriptor ATTRIBUTE_NAME_TO_STORE_FAILURE_DESCRIPTION = new PropertyDescriptor.Builder()
    +        .name("emit-failure-description-property")
    +        .displayName("Variable Describing Parse Failure")
    --- End diff --
    
    We should use the word 'attribute' here, rather than 'variable', as these have different meanings in the context of NiFi.


---

[GitHub] nifi pull request #2425: Emit failures array

Posted by martin-mucha <gi...@git.apache.org>.
Github user martin-mucha commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2425#discussion_r166457681
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java ---
    @@ -242,11 +279,12 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
             final boolean allowExtraFields = context.getProperty(ALLOW_EXTRA_FIELDS).asBoolean();
             final boolean strictTypeChecking = context.getProperty(STRICT_TYPE_CHECKING).asBoolean();
     
    -        RecordSetWriter validWriter = null;
    -        RecordSetWriter invalidWriter = null;
             FlowFile validFlowFile = null;
             FlowFile invalidFlowFile = null;
     
    +        final List<Record> validRecords = new LinkedList<>();
    --- End diff --
    
    Understood, but one question. I did all this refactoring to get rid of 'surprising' complexity of code. Now, if I do "writer.write(record);" given record won't be held in heap before completeFlowFile is called? Where is the FlowFile stored until 'completed'? If it's held outside of heap, then all this refactoring is invalid, indeed. If it's also in heap ...


---

[GitHub] nifi pull request #2425: Emit failures array

Posted by ijokarumawak <gi...@git.apache.org>.
Github user ijokarumawak commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2425#discussion_r168057832
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java ---
    @@ -242,11 +279,12 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
             final boolean allowExtraFields = context.getProperty(ALLOW_EXTRA_FIELDS).asBoolean();
             final boolean strictTypeChecking = context.getProperty(STRICT_TYPE_CHECKING).asBoolean();
     
    -        RecordSetWriter validWriter = null;
    -        RecordSetWriter invalidWriter = null;
             FlowFile validFlowFile = null;
             FlowFile invalidFlowFile = null;
     
    +        final List<Record> validRecords = new LinkedList<>();
    --- End diff --
    
    Hi @martin-mucha 
    Let me try to answer your question. @markap14 will correct me if I'm wrong :)
    
    [ValidateRecord.completeFlowFile](https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java#L408) method calls `writer.finishRecordSet()`, which let the writer to write the ending mark of record set, as some record format requires this, e.g. JSON '}' or XML '</root>' would be easy to imagine. Actual bytes for record contents had been written in advance.
    
    I'd recommend reading [NiFi in depth, Content Repository](https://nifi.apache.org/docs/nifi-docs/html/nifi-in-depth.html#content-repository) on how NiFi reads/writes FlowFile content in streaming manner without loading whole content on heap.
    
    If you're interested in reading code, [StandardProcessSession.write](https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java#L2433) might be a good starting point for how FlowFile and its OutputStream is created. 
    
    And the OutputStream is passed to RecordSetWriter implementations. For example, when a processor writes a record, then it is sent to a method of a configured RecordSetWriter like this, 
    [WriteCSVResult.writeRecord](https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java#L147).
    
    These RecordSetWriter does not hold contents on heap. They write records in streaming manner.
    
    If we create a List and hold `Record` instances, then we keep content on heap as `Record` instances which can lead to a OOM.
    
    Hope this helps!


---

[GitHub] nifi pull request #2425: Emit failures array

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2425#discussion_r164488915
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java ---
    @@ -179,6 +194,27 @@
             .description("If the records cannot be read, validated, or written, for any reason, the original FlowFile will be routed to this relationship")
             .build();
     
    +    private static final ObjectMapper objectMapper = new ObjectMapper();
    +
    +
    +    private static Validator createAttributeNameValidator() {
    --- End diff --
    
    I don't believe this validator is needed. Attribute names can be anything except for null and empty string. There does exist a validator for this already: StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR.


---