You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by "Alexander Bukarev (JIRA)" <ji...@apache.org> on 2018/12/28 15:02:00 UTC

[jira] [Commented] (NIFI-5918) MergeRecord works wrong with Defragment strategy

    [ https://issues.apache.org/jira/browse/NIFI-5918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16730307#comment-16730307 ] 

Alexander Bukarev commented on NIFI-5918:
-----------------------------------------

Moreover, I think the processor is complex enough (it is well-known the Aggregator pattern is the most complex in EIP), so it should be refactored. IMHO all strategies should be externalized and then be able to plug-in.

Also, the *Defragment* strategy should care about the order of flow files ({{fragment.index}} flow file attribute), now it is ignored. Maybe the ordering should be an optional attribute, however, it should be under the flow developer control.

P.S. Feel free to assign the issue to me :-) I've already created a quick fix for the problem here: https://github.com/javajefe/nifi/commit/901e42c8962936459e81499cd6dde90b8fe9ee5d
By the way, I'm thinking to extract strategies, etc. I've described above.

> MergeRecord works wrong with Defragment strategy
> ------------------------------------------------
>
>                 Key: NIFI-5918
>                 URL: https://issues.apache.org/jira/browse/NIFI-5918
>             Project: Apache NiFi
>          Issue Type: Bug
>          Components: Extensions
>    Affects Versions: 1.8.0
>            Reporter: Alexander Bukarev
>            Priority: Major
>
> *Steps*
> # Create the simple flow: 
> #* {{GenerateFlowFile}} (with constant payload "txt1,txt2" and 10 secs schedulling) 
> #* -> {{SplitContent}} (with comma as a separator)
> #* -> some chain of processors which get "txt1" and "txt2" as a inbound params and produce flowfiles with more than 1 record ((!) that's important). For example, I use {{ExtractText}} (to get "txt1" and "txt2" as an attribute), then {{ExecuteSQLRecord}} (to execute SQL using "txt1" and "txt2" as a parameter)
> #* -> {{MergeRecord}} (with *Defragment* merge strategy - (!) that's important)
> #* -> {{LogAttribute}} or whatever you prefer to observe the merge result
> # Now just run the flow
> *Result:* we'll see an error in logs like {panel}Could not merge bin with 1 FlowFiles because of the 'fragment.count' attribute had a value of '2' but only 1 of 2 FlowFiles were encountered before this bin was evicted (due to to Max Bin Age being reached or due to the Maximum Number of Bins being exceeded).{panel}
> *Expected result:* the flow file containing records from both SQL queries (for "txt1" and "txt2")
> The cause is {{RecordBinManager}} uses {{fragment.count}} flow file attribute to calculate required *record* number to release the bin. However, the attribute contains the number of *flow files* instead. As in above scenario each file contains more than 1 records (at least 2) that means {{RecordBin}} thinks the bin is "full enough" when first flow file arrives (because it contains >= 2 records and {{fragment.count}} is equal to 2 in the scenario). So the bin is released wrongly.
> I think there is a mistake and in *Defragment* mode we are interested in a number of flow files and never in records number. In opposite, we should care about a number of records usin Bin-Packaging Algorithm.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)