You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@nifi.apache.org by Mark Petronic <ma...@gmail.com> on 2015/10/25 04:44:45 UTC

Suggestions for good approach to ETL strategy

So, I stumbled onto Nifi at a Laurel, MD Spark meetup and was pretty
excited about using it. I'm running HDP and need to construct an ETL
like flow and would like to try to start, as a new user to Nifi, using
a "best practice" approach. Wondering if some of you more seasoned
users might provide some thoughts on my problem?

1. 160 zip files/day show up on an NFS share in various sub
directories and their filenames contain the yyyymmddHHMMSS of when the
stats where generated.
2. Each zip file contains 4 or more large CSV files
3. I need just one of those CSVs from each zip file each day and they
all add up to about 10GB uncompressed
4. I need to extract that one file from each zip, strip off the first
line (the headers), and store it in HDFS compressed again using gzip
or snappy
5. I cannot delete the NFS file after the copy to HDFS because others
need access to it for some time

So, where I am having a hard time visualizing doing this in Nifi is
with the first step. I need to scan the NFS files after 8 AM every day
(when I know all files for the previous 24 hours will be present),
find that set of files for that day using the yyymmdd part of file
names, then perform the extract of the one file I need and process it
into HDFS.

I could imagine a processor that runs once every 24 hours on a cron
schedule. I could imaging running an ExecuteProcess processor against
a bash script to get the list of all the files that match the
yyyymmdd. Then I get stuck. How to take this list of 160 file paths
and start the job of processing each one of them in parallel to run
the ETL flow?

Thanks in advance for any ideas

Re: Suggestions for good approach to ETL strategy

Posted by Mark Petronic <ma...@gmail.com>.
Lee, Mark, Joe, I have to say I am overwhelmed with the enthusiastic,
quick, and helpful responses from you all. Seems there is clearly very
active interest in the project. You've given me some good ideas to
consider.

So, pondering all that you've suggested, here's where I am now - a
three-stage flow.

1. ExecuteProcess - Runs streamer.sh below which streams out the
contents of a specific file type ${StatType} from within the many
(160) zip files that exist across GW1...GW13 directories that also
have a specific yyyymmdd date in the filename. I also just strip off
the headers using grep here (any line that contains the work "Device"
is a header). This script will give me a long running (5 minutes or
so) of streaming data around 10 GBs total. I set Batch Duration = 5
secs

====== streamer.sh =======
datefilter=$1
stattype=$2
for f in $(ls /import/nms/prod/stats/Terminal/GW{1..13}/ConsolidatedTermStats_${datefilter}*)
do
  unzip -p $f *${stattype}* | grep -v Device
done
=======================

2. MergeContent - Mark suggested this and I like this idea to control
the size of my HDFS files. I use all defaults except Maximum Group
Size = 512 MB

3. PutHDFS - Using the built in compression codec here to shrink the
512 MB files

I like this approach because I don't want to copy 10 GBs to a local
store and then unzip all the files and copy them again to HDFS. I
really like to maintain a streaming architecture as much as possible
and so this is what I think I achieve here. However, I still have some
confusion.

1. I don't fully understand what is happening in the ExecuteProcess.
There, I set Batch Duration = 5 secs. I was thinking this would read
in 5 seconds of stream, then generate a flow file that would be sent
to the next state (MergeContent) where it would be merged and
eventually output from that state as another merged flow file to the
putHDFS. Or is it like this... Say there are 5 batched flowfiles that
get merged and reach the 512 MB threshold. Internally, are references
to those 5 just passed along to the next stage for processing or is a
6th flow file produced that is the concatenation of the 5 and that is
passed to the next stage and the 5 inputs released because we are done
with them?

Mainly, my confusion is with the interplay of this batching and the
Run Schedule and Run Duration.

- My understanding it that, if both Run Schedule and Duration are
zero, and I set this processor up as a cron schedule, then, when it is
time to run, it will launch my streaming script and run until my
script terminates. Is that correct?

- And during that running time, it will generate a new flow file every
5 seconds due to the batching configuration. Correct?

- So, under the hood, what really happens with all the flow files that
are batched and merged once the merged file is output? Does that disk
space held by the batched flow files get released in the content
repository? Because, it seemed like disk usage kept going up in the
content repository while this flow was running. I was anticipating
that the batching would sort of limit the growth on my local disk
since I was streaming my puts to HDFS in batches.

Sorry for so many questions but documention is very sparse on this
project (i guess being so new to OSS) and I really want to understand
it thoroughly. I did clone the project and have been scanning the code
as well.


On Sun, Oct 25, 2015 at 11:45 AM, Joe Witt <jo...@gmail.com> wrote:
> Thanks for jumping in Lee!
>
> Mark,
>
> This is a great writeup.  We should turn this into a blog w/full
> explanation and template.  Great use case and you just gave us a
> perfect user perspective/explanation of how you're thinking of it.
>
> We will make that happen quickly.
>
> https://issues.apache.org/jira/browse/NIFI-1064
>
> Thanks
> Joe
>
> On Sun, Oct 25, 2015 at 9:45 AM, Mark Payne <ma...@hotmail.com> wrote:
>> Hey Mark,
>>
>> Thanks for sharing your use case with us in pretty good details so that we can understand
>> what you're trying to do here.
>>
>> There are actually a few processors coming in the next release that I think should help here.
>> First, there's the FetchFile processor that you noticed in NIFI-631. Hopefully the ListFile will
>> make its way in there as well because it's much easier that way :) In either case, you can right-click
>> on the Processor and click Configure. If you go to the Scheduling tab, you can change the Scheduling
>> Strategy to CRON-Driven and set the schedule to run whenever you'd like.
>>
>> As-is, the GetFile is expected
>> to remove the file from the current location, as the idea was that NiFi would sort of assume
>> ownership of the file. It turns out that in the Open Source world, that's often not desirable, so
>> we are moving more toward the List/Fetch pattern as described in that ticket.
>>
>> Once you pull the files into NiFI, though, UnpackContent should unzip the files, each into its
>> own FlowFile. You could then use a RouteOnAttribute to pull out just the file that you care about,
>> based on its filename. You can then allow the others to be routed to Unmatched and auto-terminate
>> them from the flow.
>>
>> Stripping off the first line could probably be done using the ReplaceText, but in the next version
>> of NiFi, we will have a RouteText processor that should make working with CSV's far easier. You could,
>> for instance, route any line that begins with # to one relationship and the rest to a second relationship.
>> This effectively allows you to filter out the header line.
>>
>> Finally, you can use PutHDFS and set the Compression Codec to whatever you prefer. GZIP, Snappy, etc.
>> Prior to that, if you need to, you could also add in a MergeContent processor in order to concatenate
>> together these CSV files in order to make them larger.
>>
>> Thanks
>> -Mark
>>
>>
>>> On Oct 25, 2015, at 12:25 AM, Mark Petronic <ma...@gmail.com> wrote:
>>>
>>> Reading some other posts, stumbled on this JIRA [1] which seems to
>>> directly relate to my question in this post.
>>>
>>> [1] https://issues.apache.org/jira/browse/NIFI-631
>>>
>>> On Sat, Oct 24, 2015 at 11:44 PM, Mark Petronic <ma...@gmail.com> wrote:
>>>> So, I stumbled onto Nifi at a Laurel, MD Spark meetup and was pretty
>>>> excited about using it. I'm running HDP and need to construct an ETL
>>>> like flow and would like to try to start, as a new user to Nifi, using
>>>> a "best practice" approach. Wondering if some of you more seasoned
>>>> users might provide some thoughts on my problem?
>>>>
>>>> 1. 160 zip files/day show up on an NFS share in various sub
>>>> directories and their filenames contain the yyyymmddHHMMSS of when the
>>>> stats where generated.
>>>> 2. Each zip file contains 4 or more large CSV files
>>>> 3. I need just one of those CSVs from each zip file each day and they
>>>> all add up to about 10GB uncompressed
>>>> 4. I need to extract that one file from each zip, strip off the first
>>>> line (the headers), and store it in HDFS compressed again using gzip
>>>> or snappy
>>>> 5. I cannot delete the NFS file after the copy to HDFS because others
>>>> need access to it for some time
>>>>
>>>> So, where I am having a hard time visualizing doing this in Nifi is
>>>> with the first step. I need to scan the NFS files after 8 AM every day
>>>> (when I know all files for the previous 24 hours will be present),
>>>> find that set of files for that day using the yyymmdd part of file
>>>> names, then perform the extract of the one file I need and process it
>>>> into HDFS.
>>>>
>>>> I could imagine a processor that runs once every 24 hours on a cron
>>>> schedule. I could imaging running an ExecuteProcess processor against
>>>> a bash script to get the list of all the files that match the
>>>> yyyymmdd. Then I get stuck. How to take this list of 160 file paths
>>>> and start the job of processing each one of them in parallel to run
>>>> the ETL flow?
>>>>
>>>> Thanks in advance for any ideas
>>

Re: Suggestions for good approach to ETL strategy

Posted by Joe Witt <jo...@gmail.com>.
Thanks for jumping in Lee!

Mark,

This is a great writeup.  We should turn this into a blog w/full
explanation and template.  Great use case and you just gave us a
perfect user perspective/explanation of how you're thinking of it.

We will make that happen quickly.

https://issues.apache.org/jira/browse/NIFI-1064

Thanks
Joe

On Sun, Oct 25, 2015 at 9:45 AM, Mark Payne <ma...@hotmail.com> wrote:
> Hey Mark,
>
> Thanks for sharing your use case with us in pretty good details so that we can understand
> what you're trying to do here.
>
> There are actually a few processors coming in the next release that I think should help here.
> First, there's the FetchFile processor that you noticed in NIFI-631. Hopefully the ListFile will
> make its way in there as well because it's much easier that way :) In either case, you can right-click
> on the Processor and click Configure. If you go to the Scheduling tab, you can change the Scheduling
> Strategy to CRON-Driven and set the schedule to run whenever you'd like.
>
> As-is, the GetFile is expected
> to remove the file from the current location, as the idea was that NiFi would sort of assume
> ownership of the file. It turns out that in the Open Source world, that's often not desirable, so
> we are moving more toward the List/Fetch pattern as described in that ticket.
>
> Once you pull the files into NiFI, though, UnpackContent should unzip the files, each into its
> own FlowFile. You could then use a RouteOnAttribute to pull out just the file that you care about,
> based on its filename. You can then allow the others to be routed to Unmatched and auto-terminate
> them from the flow.
>
> Stripping off the first line could probably be done using the ReplaceText, but in the next version
> of NiFi, we will have a RouteText processor that should make working with CSV's far easier. You could,
> for instance, route any line that begins with # to one relationship and the rest to a second relationship.
> This effectively allows you to filter out the header line.
>
> Finally, you can use PutHDFS and set the Compression Codec to whatever you prefer. GZIP, Snappy, etc.
> Prior to that, if you need to, you could also add in a MergeContent processor in order to concatenate
> together these CSV files in order to make them larger.
>
> Thanks
> -Mark
>
>
>> On Oct 25, 2015, at 12:25 AM, Mark Petronic <ma...@gmail.com> wrote:
>>
>> Reading some other posts, stumbled on this JIRA [1] which seems to
>> directly relate to my question in this post.
>>
>> [1] https://issues.apache.org/jira/browse/NIFI-631
>>
>> On Sat, Oct 24, 2015 at 11:44 PM, Mark Petronic <ma...@gmail.com> wrote:
>>> So, I stumbled onto Nifi at a Laurel, MD Spark meetup and was pretty
>>> excited about using it. I'm running HDP and need to construct an ETL
>>> like flow and would like to try to start, as a new user to Nifi, using
>>> a "best practice" approach. Wondering if some of you more seasoned
>>> users might provide some thoughts on my problem?
>>>
>>> 1. 160 zip files/day show up on an NFS share in various sub
>>> directories and their filenames contain the yyyymmddHHMMSS of when the
>>> stats where generated.
>>> 2. Each zip file contains 4 or more large CSV files
>>> 3. I need just one of those CSVs from each zip file each day and they
>>> all add up to about 10GB uncompressed
>>> 4. I need to extract that one file from each zip, strip off the first
>>> line (the headers), and store it in HDFS compressed again using gzip
>>> or snappy
>>> 5. I cannot delete the NFS file after the copy to HDFS because others
>>> need access to it for some time
>>>
>>> So, where I am having a hard time visualizing doing this in Nifi is
>>> with the first step. I need to scan the NFS files after 8 AM every day
>>> (when I know all files for the previous 24 hours will be present),
>>> find that set of files for that day using the yyymmdd part of file
>>> names, then perform the extract of the one file I need and process it
>>> into HDFS.
>>>
>>> I could imagine a processor that runs once every 24 hours on a cron
>>> schedule. I could imaging running an ExecuteProcess processor against
>>> a bash script to get the list of all the files that match the
>>> yyyymmdd. Then I get stuck. How to take this list of 160 file paths
>>> and start the job of processing each one of them in parallel to run
>>> the ETL flow?
>>>
>>> Thanks in advance for any ideas
>

Re: Suggestions for good approach to ETL strategy

Posted by Mark Payne <ma...@hotmail.com>.
Hey Mark,

Thanks for sharing your use case with us in pretty good details so that we can understand
what you're trying to do here.

There are actually a few processors coming in the next release that I think should help here.
First, there's the FetchFile processor that you noticed in NIFI-631. Hopefully the ListFile will
make its way in there as well because it's much easier that way :) In either case, you can right-click
on the Processor and click Configure. If you go to the Scheduling tab, you can change the Scheduling
Strategy to CRON-Driven and set the schedule to run whenever you'd like.

As-is, the GetFile is expected
to remove the file from the current location, as the idea was that NiFi would sort of assume
ownership of the file. It turns out that in the Open Source world, that's often not desirable, so
we are moving more toward the List/Fetch pattern as described in that ticket.

Once you pull the files into NiFI, though, UnpackContent should unzip the files, each into its
own FlowFile. You could then use a RouteOnAttribute to pull out just the file that you care about,
based on its filename. You can then allow the others to be routed to Unmatched and auto-terminate
them from the flow.

Stripping off the first line could probably be done using the ReplaceText, but in the next version
of NiFi, we will have a RouteText processor that should make working with CSV's far easier. You could,
for instance, route any line that begins with # to one relationship and the rest to a second relationship.
This effectively allows you to filter out the header line.

Finally, you can use PutHDFS and set the Compression Codec to whatever you prefer. GZIP, Snappy, etc.
Prior to that, if you need to, you could also add in a MergeContent processor in order to concatenate
together these CSV files in order to make them larger.

Thanks
-Mark


> On Oct 25, 2015, at 12:25 AM, Mark Petronic <ma...@gmail.com> wrote:
> 
> Reading some other posts, stumbled on this JIRA [1] which seems to
> directly relate to my question in this post.
> 
> [1] https://issues.apache.org/jira/browse/NIFI-631
> 
> On Sat, Oct 24, 2015 at 11:44 PM, Mark Petronic <ma...@gmail.com> wrote:
>> So, I stumbled onto Nifi at a Laurel, MD Spark meetup and was pretty
>> excited about using it. I'm running HDP and need to construct an ETL
>> like flow and would like to try to start, as a new user to Nifi, using
>> a "best practice" approach. Wondering if some of you more seasoned
>> users might provide some thoughts on my problem?
>> 
>> 1. 160 zip files/day show up on an NFS share in various sub
>> directories and their filenames contain the yyyymmddHHMMSS of when the
>> stats where generated.
>> 2. Each zip file contains 4 or more large CSV files
>> 3. I need just one of those CSVs from each zip file each day and they
>> all add up to about 10GB uncompressed
>> 4. I need to extract that one file from each zip, strip off the first
>> line (the headers), and store it in HDFS compressed again using gzip
>> or snappy
>> 5. I cannot delete the NFS file after the copy to HDFS because others
>> need access to it for some time
>> 
>> So, where I am having a hard time visualizing doing this in Nifi is
>> with the first step. I need to scan the NFS files after 8 AM every day
>> (when I know all files for the previous 24 hours will be present),
>> find that set of files for that day using the yyymmdd part of file
>> names, then perform the extract of the one file I need and process it
>> into HDFS.
>> 
>> I could imagine a processor that runs once every 24 hours on a cron
>> schedule. I could imaging running an ExecuteProcess processor against
>> a bash script to get the list of all the files that match the
>> yyyymmdd. Then I get stuck. How to take this list of 160 file paths
>> and start the job of processing each one of them in parallel to run
>> the ETL flow?
>> 
>> Thanks in advance for any ideas


Re: Suggestions for good approach to ETL strategy

Posted by Lee Laim <le...@gmail.com>.
Mark,

I'm far from seasoned but I'll take a swing at it to check my understanding
(or lack thereof).
I'd break the task  into 2 parts:

Identify and move  files to a staging location, then process the zip files
from the staging location.


Flow1:

Run a cron driven* GenerateFlowFile* Processor to start the process every
24 hours after 8AM  ->

*ExecuteStreamCommand* to run your bash script to stream the the 160 files
of interest into ->

*SplitText processor* to generate a new flow file for each zip filename.
This can be routed into a

*DistributeLoad processor* which can will distribute the flowfiles to

*ExtractText processors  *to extract the text out of the flowfile (extract
contents: filename and path), then pass to

*UpdateAttribute *of the flow file to be able to access the filename and
path via Nifi expression language.  Pass flow file to

*ExecuteStreamProcess*(cp /${path_attribute}/${filename}
/location2/${filename} ) * this will copy the zipfile to a another
directory(location2), to keep files at the source for other users.*



Flow2:
       *  GetFile from location2* -> *Unpack Contents* ->*
RouteOnAttribute* (to
select CSV of interest, discard the rest)  -> (*ExecuteStreamProcess *(sed
'1d') to remove header -> *CompressContent* ->* PutHDFS*



Hope this helps, and I hope this isn't too far off.

Thanks,
Lee

On Sat, Oct 24, 2015 at 10:25 PM, Mark Petronic <ma...@gmail.com>
wrote:

> Reading some other posts, stumbled on this JIRA [1] which seems to
> directly relate to my question in this post.
>
> [1] https://issues.apache.org/jira/browse/NIFI-631
>
> On Sat, Oct 24, 2015 at 11:44 PM, Mark Petronic <ma...@gmail.com>
> wrote:
> > So, I stumbled onto Nifi at a Laurel, MD Spark meetup and was pretty
> > excited about using it. I'm running HDP and need to construct an ETL
> > like flow and would like to try to start, as a new user to Nifi, using
> > a "best practice" approach. Wondering if some of you more seasoned
> > users might provide some thoughts on my problem?
> >
> > 1. 160 zip files/day show up on an NFS share in various sub
> > directories and their filenames contain the yyyymmddHHMMSS of when the
> > stats where generated.
> > 2. Each zip file contains 4 or more large CSV files
> > 3. I need just one of those CSVs from each zip file each day and they
> > all add up to about 10GB uncompressed
> > 4. I need to extract that one file from each zip, strip off the first
> > line (the headers), and store it in HDFS compressed again using gzip
> > or snappy
> > 5. I cannot delete the NFS file after the copy to HDFS because others
> > need access to it for some time
> >
> > So, where I am having a hard time visualizing doing this in Nifi is
> > with the first step. I need to scan the NFS files after 8 AM every day
> > (when I know all files for the previous 24 hours will be present),
> > find that set of files for that day using the yyymmdd part of file
> > names, then perform the extract of the one file I need and process it
> > into HDFS.
> >
> > I could imagine a processor that runs once every 24 hours on a cron
> > schedule. I could imaging running an ExecuteProcess processor against
> > a bash script to get the list of all the files that match the
> > yyyymmdd. Then I get stuck. How to take this list of 160 file paths
> > and start the job of processing each one of them in parallel to run
> > the ETL flow?
> >
> > Thanks in advance for any ideas
>

Re: Suggestions for good approach to ETL strategy

Posted by Mark Petronic <ma...@gmail.com>.
Reading some other posts, stumbled on this JIRA [1] which seems to
directly relate to my question in this post.

[1] https://issues.apache.org/jira/browse/NIFI-631

On Sat, Oct 24, 2015 at 11:44 PM, Mark Petronic <ma...@gmail.com> wrote:
> So, I stumbled onto Nifi at a Laurel, MD Spark meetup and was pretty
> excited about using it. I'm running HDP and need to construct an ETL
> like flow and would like to try to start, as a new user to Nifi, using
> a "best practice" approach. Wondering if some of you more seasoned
> users might provide some thoughts on my problem?
>
> 1. 160 zip files/day show up on an NFS share in various sub
> directories and their filenames contain the yyyymmddHHMMSS of when the
> stats where generated.
> 2. Each zip file contains 4 or more large CSV files
> 3. I need just one of those CSVs from each zip file each day and they
> all add up to about 10GB uncompressed
> 4. I need to extract that one file from each zip, strip off the first
> line (the headers), and store it in HDFS compressed again using gzip
> or snappy
> 5. I cannot delete the NFS file after the copy to HDFS because others
> need access to it for some time
>
> So, where I am having a hard time visualizing doing this in Nifi is
> with the first step. I need to scan the NFS files after 8 AM every day
> (when I know all files for the previous 24 hours will be present),
> find that set of files for that day using the yyymmdd part of file
> names, then perform the extract of the one file I need and process it
> into HDFS.
>
> I could imagine a processor that runs once every 24 hours on a cron
> schedule. I could imaging running an ExecuteProcess processor against
> a bash script to get the list of all the files that match the
> yyyymmdd. Then I get stuck. How to take this list of 160 file paths
> and start the job of processing each one of them in parallel to run
> the ETL flow?
>
> Thanks in advance for any ideas