You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@nifi.apache.org by Arnaud G <gr...@gmail.com> on 2017/02/17 09:12:55 UTC

Issue with a simple flow

Hello,

I'm trying something very simple but I cannot get to make it work properly
(Nifi 1.1.1)

You will see the flow here: https://goo.gl/photos/9ewQV6ovKDDZe2pw9

My goal is to pull a json file out of HDFS, get some information and store
it back as a CSV something really trivial.

My flow is structured like that:

1) GetHDFS (here I check every 2 minutes and only take 1 file
2) EvaluateJsonPath: The JSON is looking like that {version: xxxx,
attributesX[{.....}], I need to add to each line of my CSV the version #.
So I use this processor to store the version in an attribute
3) SplitJson: I split the JSON on the attributesX field ($.attributesY),
this provide me each line as expected
4) EvaluateJsonPath: I'm extracting the different information and I store
them as attribute
5) UpdateAttribute: I use that one to remove new line characters in one
attribute
6) ReplaceText: I transform my attributes in CSV format
7) MergeContent: This is where I have a problem: now I want to regroup all
my lines to recreate the original file but in CSV. I have tried different
strategies but I'm unable to get a consistent result. I use the ${filename}
as the correlation attribute but what happen is that most of the time I
have only 25% of the lines when it finalizes the bin and send it to the
next processor (I'm using the last updateattribute to change the {filename}
to .csv), then I get the rest of the line but with the same filename which
will either fail or overwrite the first file.

Can you tell me what I'm doing wrong in the flow, how can I ensure that all
the line are merged before the merge process finish? Is there something as
minimal bin age that will let me make sure that I have waited enough to get
all the lines?

Thank you


AG

Re: Issue with a simple flow

Posted by James Wing <jv...@gmail.com>.
MergeContent's "Defragment" merge strategy might be enough.  It is not as
full-featured as the Wait/Notify design Andy suggested, but it might be
something you can use now.

Defragment reads attributes from the flowfiles to more precisely group your
content together than correlation attribute alone:
 - fragment.identifier, group id similar to your correlation attribute
${filename}, but see below
 - fragment.index, part number 0..n
 - fragment.count, total number of n parts to wait for

Conveniently, SplitJson provides these attributes automagically (take a
look at your flowfiles).  You may only need to switch MergeContent's Merge
Strategy to "Defragment".  I recommend you increase the Max Bin Age to
generously allow time for the flowfiles to arrive, otherwise the unmerged
parts will be sent to the failure relationship.

Thanks,

James

On Fri, Feb 17, 2017 at 10:15 AM, Andy LoPresto <al...@apache.org>
wrote:

> Hi Arnaud,
>
> If you are running from master (unreleased 1.2.0-SNAPSHOT) you can use an
> awesome feature Koji added recently called “Wait/Notify” [1]. He explains
> it much better with graphics and such, but the long and short of it is that
> the Split* processors output the total number of split records from an
> input, and then you can use counters to ensure you reconstitute the whole
> thing when merging.
>
> If you are not on master, I’d say you might have to do a little more work.
> There is a “minimum number of entries” property in MergeContent, but I’m
> not sure off the top of my head how you can get that value from the Split
> and update it dynamically, as it could obviously be different for each
> input. Hopefully Koji or someone else on the list will have a better
> solution.
>
> [1] https://github.com/apache/nifi/pull/1420
>
> Andy LoPresto
> alopresto@apache.org
> *alopresto.apache@gmail.com <al...@gmail.com>*
> PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4  BACE 3C6E F65B 2F7D EF69
>
> On Feb 17, 2017, at 1:12 AM, Arnaud G <gr...@gmail.com> wrote:
>
> Hello,
>
> I'm trying something very simple but I cannot get to make it work properly
> (Nifi 1.1.1)
>
> You will see the flow here: https://goo.gl/photos/9ewQV6ovKDDZe2pw9
>
> My goal is to pull a json file out of HDFS, get some information and store
> it back as a CSV something really trivial.
>
> My flow is structured like that:
>
> 1) GetHDFS (here I check every 2 minutes and only take 1 file
> 2) EvaluateJsonPath: The JSON is looking like that {version: xxxx,
> attributesX[{.....}], I need to add to each line of my CSV the version #.
> So I use this processor to store the version in an attribute
> 3) SplitJson: I split the JSON on the attributesX field ($.attributesY),
> this provide me each line as expected
> 4) EvaluateJsonPath: I'm extracting the different information and I store
> them as attribute
> 5) UpdateAttribute: I use that one to remove new line characters in one
> attribute
> 6) ReplaceText: I transform my attributes in CSV format
> 7) MergeContent: This is where I have a problem: now I want to regroup all
> my lines to recreate the original file but in CSV. I have tried different
> strategies but I'm unable to get a consistent result. I use the ${filename}
> as the correlation attribute but what happen is that most of the time I
> have only 25% of the lines when it finalizes the bin and send it to the
> next processor (I'm using the last updateattribute to change the {filename}
> to .csv), then I get the rest of the line but with the same filename which
> will either fail or overwrite the first file.
>
> Can you tell me what I'm doing wrong in the flow, how can I ensure that
> all the line are merged before the merge process finish? Is there something
> as minimal bin age that will let me make sure that I have waited enough to
> get all the lines?
>
> Thank you
>
>
> AG
>
>
>

Re: Issue with a simple flow

Posted by Andy LoPresto <al...@apache.org>.
Hi Arnaud,

If you are running from master (unreleased 1.2.0-SNAPSHOT) you can use an awesome feature Koji added recently called “Wait/Notify” [1]. He explains it much better with graphics and such, but the long and short of it is that the Split* processors output the total number of split records from an input, and then you can use counters to ensure you reconstitute the whole thing when merging.

If you are not on master, I’d say you might have to do a little more work. There is a “minimum number of entries” property in MergeContent, but I’m not sure off the top of my head how you can get that value from the Split and update it dynamically, as it could obviously be different for each input. Hopefully Koji or someone else on the list will have a better solution.

[1] https://github.com/apache/nifi/pull/1420 <https://github.com/apache/nifi/pull/1420>

Andy LoPresto
alopresto@apache.org
alopresto.apache@gmail.com
PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4  BACE 3C6E F65B 2F7D EF69

> On Feb 17, 2017, at 1:12 AM, Arnaud G <gr...@gmail.com> wrote:
> 
> Hello,
> 
> I'm trying something very simple but I cannot get to make it work properly (Nifi 1.1.1)
> 
> You will see the flow here: https://goo.gl/photos/9ewQV6ovKDDZe2pw9 <https://goo.gl/photos/9ewQV6ovKDDZe2pw9>
> 
> My goal is to pull a json file out of HDFS, get some information and store it back as a CSV something really trivial.
> 
> My flow is structured like that:
> 
> 1) GetHDFS (here I check every 2 minutes and only take 1 file
> 2) EvaluateJsonPath: The JSON is looking like that {version: xxxx, attributesX[{.....}], I need to add to each line of my CSV the version #. So I use this processor to store the version in an attribute
> 3) SplitJson: I split the JSON on the attributesX field ($.attributesY), this provide me each line as expected
> 4) EvaluateJsonPath: I'm extracting the different information and I store them as attribute
> 5) UpdateAttribute: I use that one to remove new line characters in one attribute
> 6) ReplaceText: I transform my attributes in CSV format
> 7) MergeContent: This is where I have a problem: now I want to regroup all my lines to recreate the original file but in CSV. I have tried different strategies but I'm unable to get a consistent result. I use the ${filename} as the correlation attribute but what happen is that most of the time I have only 25% of the lines when it finalizes the bin and send it to the next processor (I'm using the last updateattribute to change the {filename} to .csv), then I get the rest of the line but with the same filename which will either fail or overwrite the first file.
> 
> Can you tell me what I'm doing wrong in the flow, how can I ensure that all the line are merged before the merge process finish? Is there something as minimal bin age that will let me make sure that I have waited enough to get all the lines?
> 
> Thank you
> 
> 
> AG
>