You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nifi.apache.org by Martin Eden <ma...@gmail.com> on 2017/05/31 09:20:25 UTC

SplitText processor OOM larger input files

Hi all,

I have a vanilla Nifi 1.2.0 node with 1GB of heap.

The flow I am trying to run is:
ListHDFS -> FetchHDFS -> SplitText -> RouteOnContent -> MergeContent ->
PutHDFS

When I give it a 300MB input zip file (2.5GB uncompressed) I am getting
Java OutOfMemoryError as below.

Does NiFi read in the entire contents of files in memory? This is
unexpected. I thought it is chunking through files. Giving more ram is not
a solution as you can always get larger input files in the future.

Does this mean NiFi is not suitable as a scalable ETL solution?

Can someone please explain what is happening and how to mitigate large
files in NiFi? Any patterns?

Thanks,
M

ERROR [Timer-Driven Process Thread-9]
o.a.nifi.processors.standard.SplitText
SplitText[id=e16939ca-f28f-1178-b66e-054e43a0a724]
SplitText[id=e16939ca-f28f-1178-b66e-054e43a0a724] failed to process
session due to java.lang.OutOfMemoryError: Java heap space: {}

java.lang.OutOfMemoryError: Java heap space

        at java.util.HashMap$EntrySet.iterator(HashMap.java:1013)

        at java.util.HashMap.putMapEntries(HashMap.java:511)

        at java.util.HashMap.<init>(HashMap.java:489)

        at
org.apache.nifi.controller.repository.StandardFlowFileRecord$Builder.initializeAttributes(StandardFlowFileRecord.java:219)

        at
org.apache.nifi.controller.repository.StandardFlowFileRecord$Builder.addAttributes(StandardFlowFileRecord.java:234)

        at
org.apache.nifi.controller.repository.StandardProcessSession.putAllAttributes(StandardProcessSession.java:1723)

        at
org.apache.nifi.processors.standard.SplitText.updateAttributes(SplitText.java:367)

        at
org.apache.nifi.processors.standard.SplitText.generateSplitFlowFiles(SplitText.java:320)

        at
org.apache.nifi.processors.standard.SplitText.onTrigger(SplitText.java:258)

        at
org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)

        at
org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1118)

        at
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:144)

        at
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)

        at
org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:132)

        at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)

        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)

        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)

        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

        at java.lang.Thread.run(Thread.java:748)

Re: SplitText processor OOM larger input files

Posted by Mark Payne <ma...@hotmail.com>.
Martin,

I agree with Andrew on the point of a single virtual core not being much. I've not really
dealt with Google Compute Cloud personally but on the AWS t1.micro instances, which
offer a similar VM, I don't expect much out of it.

That being said, let's look a bit deeper at some of the performance considerations that we
should take into account.

Given that you have < 4 GB RAM and your CSV file is 2.5 GB uncompressed,
it is quite possible that your operating system's disk cache is not benefitting you
much. In such a case, you'd be hitting the disks quite a lot, just to read the data. So adding
some more RAM would likely help there as well. Quite often, with a pretty reasonable amount
of RAM, NiFi will never (or almost never) have to read FlowFile content from disk because the
disk cache will have it buffered for us. This makes a huge difference in performance.

You're also splitting each incoming FlowFile into *LOTS* of tiny FlowFiles. This has quite a bit
of overhead. Each FlowFile has to be tracked in the FlowFile Repository, and everything that happens
gets recorded in the Provenance Repository. This can be very expensive for large numbers of FlowFiles.
RouteOnContent could probably also be looked into, as I think it may be doing some not-so-efficient
things when scanning content (namely, it is not buffering the data itself and is instead using a BufferedInputStream
to read one byte at a time, and this is very expensive).

The good news is that you can probably simplify your flow and significantly improve NiFi's performance without
a tremendous amount of effort :)

Because you are using CSV data, rather than SplitText -> SplitText -> RouteOnContent -> MergeContent
you can probably replace all of this with a single RouteText processor.
The RouteText processor allows you to use Expression Language to route each line of text individually. So, if for
example, you have CSV coming in that looks like this:

id, name, dob, gender
1, John Doe, 11/02/1992, M
2, Jane Doe, 11/02/1993, F
3, Jacob Doe, 10/02/2014, M
4, Janine Doe, 02/05/2012, F

You could add a new property named 'boys' with a value of ${line:substringAfterLast(', '):equals('M')}
and another property named 'girls' with a value of ${line:substringAfterLast(', '):equals('F')}

This will take this one incoming FlowFile and output two FlowFiles: one to the 'boys' relationship that has
the line for John Doe and Jacob Doe, and one FlowFile to the 'girls' relationship that has the line for
Jane Doe and Janine Doe. So this means that the FlowFiles never have to be split up and merged again.
It also bypasses the RouteOnContent processor, which appears to be your bottleneck.

Beyond this, there are a few processors on the 'master' branch right now, which I presume will be released in version
1.3.0 of NiFi, that may help to make this easier, as well. Namely, the PartitionRecord processor. Version 1.2.0 of NiFi
introduced the notion of Record Readers and Record Writers. There is a CSV Reader and a CSV Writer. So you could make
use of the PartitionRecord processor, as well. For example, you could configure PartitionRecord with a property named
'gender' and a value of '/gender' and this will automatically group together CSV records that have the same value for the
'gender' column and will also add an Attribute named 'gender' to the FlowFile that has a value of either 'M' or 'F' in this case.
So you can use RouteOnAttribute afterward to route as appropriate. The Processor has a good bit of documentation and
examples, if that's a route that you're interested in taking.

I hope this helps!

-Mark




> On Jun 2, 2017, at 8:22 AM, Andrew Grande <ap...@gmail.com> wrote:
> 
> 1 vcore, which is not even a full core (a shared and oversubscribed cpu
> core). I'm not sure what you expected to see when you raised concurrency to
> 10 :)
> 
> There's a lot of things NiFi is doing behind the scenes, especially around
> provenance recording. I don't recommend anything below 4 cores to have
> meaningful experience​. If in a cloud, go to 8 cores per VM, unless you are
> designing for a low footprint with MiNiFi.
> 
> Andrew
> 
> On Fri, Jun 2, 2017, 6:30 AM Martin Eden <ma...@gmail.com> wrote:
> 
>> Thanks Andrew,
>> 
>> I have added UpdateAttribute processors to update the file names like you
>> said. Now it works, writing out 1MB files at a time (updated the
>> MergeContent MaxNumberOfEntries to 10000 to achieve that since each line in
>> my csv is 100 bytes).
>> 
>> The current flow is:
>> ListHDFS -> FetchHDFS -> UnpackContent -> SplitText(5000) -> SplitText(1)
>> -> RouteOnContent -> MergeContent -> UpdateAttribute -> PutHDFS
>> 
>> 
>>    -> MergeContent -> UpdateAttribute -> PutHDFS
>> 
>> 
>>    -> MergeContent -> UpdateAttribute -> PutHDFS
>> 
>> So now let's talk performance.
>> 
>> With a 1 node NiFi running on a Google Compute Engine instance with 1 core
>> and 3.7 GB RAM and a 20GB disk, when I feed one 300MB zip file
>> (uncompressed 2.5GB csv text) to this flow it is basically never finishing
>> the job of transferring all the data.
>> 
>> The inbound queue of RouteOnContent is always red and outbound queues are
>> mostly green so that indicates that this processor is the bottleneck. To
>> mitigate this I increased its number of concurrent tasks to 10 and then
>> observed tasks in progress 10, outbound queues temporarily red, avg task
>> latency increased from 2ms to 20ms, cpu on box maxed out to 100% by the
>> NiFi java process, load avg 5.
>> 
>> I then decreased the number of concurrent tasks of RouteOnContent to 5 and
>> the task average time dropped to about half as expected, with cpu still
>> 100% taken by the NiFi java process.
>> 
>> The RouteOnContent has 3 simple regexes that it applies.
>> 
>> Questions:
>> 
>> 1. Is it safe to say that I maxed out the performance of this flow on one
>> box with 1 core and 3.8 GB ram?
>> 
>> 2. The performance seems a lot lower than expected though which is
>> worrying. Is this expected? I am planning to do this at much larger scale,
>> hundreds of GBs.
>> 
>> 3. Is the RouteOnContent that I am using hitting NiFi hard? Is this not a
>> recommended use case? Is there anything obviously wrong in my flow?
>> Doing a bit of digging around in docs, presentations and other people's
>> experience it seems that NiFi's sweet spot is routing files based on
>> metadata (properties) and not really based on the actual contents of the
>> files.
>> 
>> 4. Is Nifi suitable for large scale ETL. Copying and doing simple massaging
>> of data from File System A to File System B? From Database A to Database B?
>> This is what I am evaluating it for.
>> 
>> I do see how running this on a box with more CPU and RAM, faster disks
>> (vertical scaling) would improve the performance and then adding another
>> node to the cluster. But I want to first validate the choice of
>> benchmarking flow and understand the performance on one box.
>> 
>> Thanks a lot for all the people for helping me on this thread on my NiFi
>> evaluation journey. This is a really big plus for community support of
>> NiFi.
>> 
>> M
>> 
>> 
>> 
>> 
>> 
>> 
>> On Thu, Jun 1, 2017 at 1:30 PM, Andrew Grande <ap...@gmail.com> wrote:
>> 
>>> It looks like your max bin size is 1000 and 10MB. Every time you hit
>> those,
>>> it will write out a merged file. Update tge filename attribute to be
>> unique
>>> before writing via PutHDFS.
>>> 
>>> Andrew
>>> 
>>> On Thu, Jun 1, 2017, 2:24 AM Martin Eden <ma...@gmail.com>
>> wrote:
>>> 
>>>> Hi Joe,
>>>> 
>>>> Thanks for the explanations. Really useful in understanding how it
>> works.
>>>> Good to know that in the future this will be improved.
>>>> 
>>>> About the appending to HDFS issue let me recap. My flow is:
>>>> ListHDFS -> FetchHDFS -> UnpackContent -> SplitText(5000) ->
>> SplitText(1)
>>>> -> RouteOnContent -> MergeContent -> PutHDFS -> hdfs://dir1/f.csv
>>>> 
>>>> 
>>>>    -> MergeContent -> PutHDFS -> hdfs://dir2/f.csv
>>>> 
>>>> 
>>>>    -> MergeContent -> PutHDFS -> hdfs://dir3/f.csv
>>>> 
>>>> ListHDFS is monitoring an input folder where 300MB zip files are added
>>>> periodically. Each file uncompressed is 2.5 GB csv.
>>>> 
>>>> So I am writing out to hdfs from multiple PutHDFS processors all of
>> them
>>>> having conflict resolution set to *APPEND* and different output
>> folders.
>>>> 
>>>> The name of the file will be however the same *f.csv*. It gets picked
>> up
>>>> from the name of the flow files which bear the name of the original
>>>> uncompressed file. This happens I think in the MergeContent processor.
>>>> 
>>>> Since all of these processors are running with 1 concurrent task, it
>>> seems
>>>> that we cannot append concurrently to hdfs even if we are appending to
>>>> different files in different folders for some reason. Any ideas how to
>>>> mitigate this?
>>>> 
>>>> It seems other people have encountered this
>>>> <
>>>> https://community.hortonworks.com/questions/61096/puthdfs-
>>> leaseexpiredexception-error-when-running-m.html
>>>>> 
>>>> with NiFi but there is no conclusive solution. It does seem also that
>>>> appending to hdfs is somewhat problematic
>>>> <
>>>> http://community.cloudera.com/t5/Storage-Random-Access-HDFS/
>>> How-to-append-files-to-HDFS-with-Java-quot-current-leaseholder/td-p/41369
>>>>> 
>>>> .
>>>> 
>>>> So stepping back, the reason I am doing append in the PutHDFS is
>> because
>>> I
>>>> did not manage to find a setting in the MergeContent processors that
>>>> basically allows creation of multiple bundled flow files with the same
>>> root
>>>> name but different sequence numbers or timestamps (like f.csv.1,
>> f.csv.2
>>>> ....). They all get the same name which is f.csv. Is that possible
>>> somehow?
>>>> See my detailed MergeContent processor config below.
>>>> 
>>>> So basically I have a 2.5GB csv file that eventually gets broken up in
>>>> lines and the lines gets merged together in bundles of 10 MB but when
>>> those
>>>> bundles are emitted to the PutHDFS they have the same name as the
>>> original
>>>> file over and over again. I would like them to have a different name
>>> based
>>>> on a timestamp or sequence number let's say so that I can avoid the
>>> append
>>>> conflict resolution in PutHDFS which is causing me grief right now. Is
>>> that
>>>> possible?
>>>> 
>>>> Thanks,
>>>> M
>>>> 
>>>> 
>>>> Currently my MergeContent processor config is:
>>>>  <properties>
>>>> *   <entry> <key>Merge Strategy</key> <value>Bin-Packing
>>> Algorithm</value>
>>>> </entry>*
>>>> *   <entry> <key>Merge Format</key> <value>Binary Concatenation</value>
>>>> </entry>*
>>>>   <entry> <key>Attribute Strategy</key><value>Keep Only Common
>>>> Attributes</value> </entry>
>>>>   <entry> <key>Correlation Attribute Name</key> </entry>
>>>>   <entry> <key>Minimum Number of Entries</key><value>1</value>
>> </entry>
>>>>   <entry> <key>Maximum Number of Entries</key> <value>1000</value>
>>>> </entry>
>>>>   <entry> <key>Minimum Group Size</key> <value>0 B</value> </entry>
>>>> *   <entry> <key>Maximum Group Size</key> <value>10 MB</value>
>> </entry>*
>>>>   <entry> <key>Max Bin Age</key> </entry>
>>>>   <entry> <key>Maximum number of Bins</key> <value>5</value> </entry>
>>>>   <entry> <key>Delimiter Strategy</key><value>Text</value> </entry>
>>>>   <entry> <key>Header File</key> </entry>
>>>>   <entry> <key>Footer File</key> </entry>
>>>>   <entry> <key>Demarcator File</key> <value></value> </entry>
>>>>   <entry> <key>Compression Level</key> <value>1</value></entry>
>>>>   <entry> <key>Keep Path</key> <value>false</value> </entry>
>>>>  </properties>
>>>> 
>>>> 
>>>> On Wed, May 31, 2017 at 3:52 PM, Joe Witt <jo...@gmail.com> wrote:
>>>> 
>>>>> Split failed before even with backpressure:
>>>>> - yes that backpressure kicks in when destination queues for a given
>>>>> processor have reached their target size (in count of flowfiles or
>>>>> total size represented).  However, to clarify why the OOM happened it
>>>>> is important to realize that it is not about 'flow files over a quick
>>>>> period of time' but rather 'flow files held within a single process
>>>>> session.  Your SplitText was pulling a single flowfile but then
>>>>> creating lets say 1,000,000 resulting flow files and then committing
>>>>> that change.  That happens within a session.  But all those flow file
>>>>> objects (not their content) are held in memory and at such high
>>>>> numbers it creates excessive heap usage.  The two phase
>> divide/conquer
>>>>> approach Koji suggested solves that and eventually we need to solve
>>>>> that by swapping out the flowfiles to disk within a session.  We
>>>>> actually do swap out flowfiles sitting on queues after a certain
>>>>> threshold is reached for this very reason.  This means you should be
>>>>> able to have many millions of flowfiles sitting around in the flow
>> for
>>>>> whatever reason and not hit memory problems.
>>>>> 
>>>>> Hope that helps there.
>>>>> 
>>>>> On PutHDFS it looks like possibly two things are trying to append to
>>>>> the same file?  If yes I'd really recommend not appending but rather
>>>>> use MergeContent to create data bundles of a given size then write
>>>>> those to HDFS.
>>>>> 
>>>>> Thanks
>>>>> Joe
>>>>> 
>>>>> On Wed, May 31, 2017 at 10:33 AM, Martin Eden <
>> martineden131@gmail.com
>>>> 
>>>>> wrote:
>>>>>> Hi Koji,
>>>>>> 
>>>>>> Good to know that it can handle large files. I thought it was the
>>> case
>>>>> but
>>>>>> I was just not seeing in practice.
>>>>>> 
>>>>>> Yes I am using 'Line Split Count' as 1 at SplitText.
>>>>>> 
>>>>>> I added the extra SplitText processor exactly as you suggested and
>>> the
>>>>> OOM
>>>>>> went away. So, big thanks!!!
>>>>>> 
>>>>>> However I have 2 follow-up questions:
>>>>>> 
>>>>>> 1. Before adding the extra SplitText processor I also played with
>> the
>>>>>> back-pressure settings on the outbound queue of the original
>>> SplitText
>>>>>> processor, since you mentioned that it is generating files at a
>> rate
>>>> that
>>>>>> is too high, I figure the queue should slow it down. I tried a
>> limit
>>> of
>>>>>> 100MB or 1000 files and I still got the OOMs in the SplitText
>>>> processor.
>>>>>> Why isn't the queue back-pressure helping me in this case? Where
>>> would
>>>>> that
>>>>>> come in handy then? Why id the extra SplitText processor needed to
>>> fix
>>>>>> things and not just the queue back-pressure?
>>>>>> 
>>>>>> 2. I am now close to completing my flow but I am hitting another
>>> error.
>>>>>> This time it's the last stage, the PutHDFS throws
>>>>>> o.apache.nifi.processors.hadoop.PutHDFS
>>>>>> PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec5aa] Failed to write to
>>>> HDFS
>>>>>> due to org.apache.nifi.processor.exception.ProcessException:
>>>> IOException
>>>>>> thrown from PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec
>>>>>> See the full stacktrace below.
>>>>>> I have a parallelism of 1 for my PutHDFS processors. Any ideas why
>>> this
>>>>> is
>>>>>> happening?
>>>>>> 
>>>>>> Thanks,
>>>>>> Martin
>>>>>> 
>>>>>> 2017-05-31 13:50:29,341 ERROR [Timer-Driven Process Thread-5]
>>>>>> o.apache.nifi.processors.hadoop.PutHDFS
>>>>>> PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec5aa] F
>>>>>> 
>>>>>> ailed to write to HDFS due to
>>>>>> org.apache.nifi.processor.exception.ProcessException: IOException
>>>> thrown
>>>>>> from PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec
>>>>>> 
>>>>>> 5aa]:
>>>>>> org.apache.hadoop.ipc.RemoteException(org.apache.
>>> hadoop.hdfs.protocol.
>>>>> AlreadyBeingCreatedException):
>>>>>> Failed to APPEND_FILE /nifi_out/unmatched/log
>>>>>> 
>>>>>> 20160930.csv for DFSClient_NONMAPREDUCE_-1411681085_97 on
>> 10.128.0.7
>>>>>> because DFSClient_NONMAPREDUCE_-1411681085_97 is already the
>> current
>>>>> lease
>>>>>> holder.
>>>>>> 
>>>>>>        at
>>>>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
>>>>> recoverLeaseInternal(FSNamesystem.java:2882)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
>>> appendFileInternal(
>>>>> FSNamesystem.java:2683)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
>>>>> appendFileInt(FSNamesystem.java:2982)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
>>>>> appendFile(FSNamesystem.java:2950)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.
>>>>> append(NameNodeRpcServer.java:655)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSi
>>>>> deTranslatorPB.append(ClientNamenodeProtocolServerSi
>>>>> deTranslatorPB.java:421)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$
>>>>> 
>>>> ClientNamenodeProtocol$2.callBlockingMethod(
>>> ClientNamenodeProtocolProtos.j
>>>>>> 
>>>>>> ava)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$
>>> ProtoBufRpcInvoker.call(
>>>>> ProtobufRpcEngine.java:616)
>>>>>> 
>>>>>>        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
>>>>>> 
>>>>>>        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:
>>> 2049)
>>>>>> 
>>>>>>        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:
>>> 2045)
>>>>>> 
>>>>>>        at java.security.AccessController.doPrivileged(Native
>>> Method)
>>>>>> 
>>>>>>        at javax.security.auth.Subject.doAs(Subject.java:422)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.hadoop.security.UserGroupInformation.doAs(
>>>>> UserGroupInformation.java:1698)
>>>>>> 
>>>>>>        at org.apache.hadoop.ipc.Server$
>>> Handler.run(Server.java:2043)
>>>>>> 
>>>>>> : {}
>>>>>> 
>>>>>> org.apache.nifi.processor.exception.ProcessException: IOException
>>>> thrown
>>>>>> from PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec5aa]:
>>>>>> org.apache.hadoop.ipc.Re
>>>>>> 
>>>>>> moteException(org.apache.hadoop.hdfs.protocol.
>>>>> AlreadyBeingCreatedException):
>>>>>> Failed to APPEND_FILE /nifi_out/unmatched/log20160930.csv for
>>>>> DFSClient_NON
>>>>>> 
>>>>>> MAPREDUCE_-1411681085_97 on 10.128.0.7 because
>>>>>> DFSClient_NONMAPREDUCE_-1411681085_97 is already the current lease
>>>>> holder.
>>>>>> 
>>>>>>        at
>>>>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
>>>>> recoverLeaseInternal(FSNamesystem.java:2882)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
>>> appendFileInternal(
>>>>> FSNamesystem.java:2683)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
>>>>> appendFileInt(FSNamesystem.java:2982)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
>>>>> appendFile(FSNamesystem.java:2950)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.
>>>>> append(NameNodeRpcServer.java:655)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSi
>>>>> deTranslatorPB.append(ClientNamenodeProtocolServerSi
>>>>> deTranslatorPB.java:421)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$
>>>>> 
>>>> ClientNamenodeProtocol$2.callBlockingMethod(
>>> ClientNamenodeProtocolProtos.j
>>>>>> 
>>>>>> ava)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$
>>> ProtoBufRpcInvoker.call(
>>>>> ProtobufRpcEngine.java:616)
>>>>>> 
>>>>>>        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
>>>>>> 
>>>>>>        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:
>>> 2049)
>>>>>> 
>>>>>>        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:
>>> 2045)
>>>>>> 
>>>>>>        at java.security.AccessController.doPrivileged(Native
>>> Method)
>>>>>> 
>>>>>>        at javax.security.auth.Subject.doAs(Subject.java:422)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.hadoop.security.UserGroupInformation.doAs(
>>>>> UserGroupInformation.java:1698)
>>>>>> 
>>>>>>        at org.apache.hadoop.ipc.Server$
>>> Handler.run(Server.java:2043)
>>>>>> 
>>>>>> 
>>>>>>        at
>>>>>> org.apache.nifi.controller.repository.StandardProcessSession.read(
>>>>> StandardProcessSession.java:2148)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.nifi.controller.repository.StandardProcessSession.read(
>>>>> StandardProcessSession.java:2095)
>>>>>> 
>>>>>>        at org.apache.nifi.processors.hadoop.PutHDFS$1.run(PutHDFS.
>>>>> java:293)
>>>>>> 
>>>>>>        at java.security.AccessController.doPrivileged(Native
>>> Method)
>>>>>> 
>>>>>>        at javax.security.auth.Subject.doAs(Subject.java:360)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.hadoop.security.UserGroupInformation.doAs(
>>>>> UserGroupInformation.java:1678)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.nifi.processors.hadoop.PutHDFS.onTrigger(
>>> PutHDFS.java:223)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.nifi.processor.AbstractProcessor.onTrigger(
>>>>> AbstractProcessor.java:27)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.nifi.controller.StandardProcessorNode.onTrigger(
>>>>> StandardProcessorNode.java:1118)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(
>>>>> ContinuallyRunProcessorTask.java:144)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(
>>>>> ContinuallyRunProcessorTask.java:47)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.
>>> run(
>>>>> TimerDrivenSchedulingAgent.java:132)
>>>>>> 
>>>>>>        at
>>>>>> java.util.concurrent.Executors$RunnableAdapter.
>>> call(Executors.java:511)
>>>>>> 
>>>>>>        at java.util.concurrent.FutureTask.runAndReset(
>>>>> FutureTask.java:308)
>>>>>> 
>>>>>>        at
>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$
>>>>> ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>>>>>> 
>>>>>>        at
>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$
>>>>> ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>>>>>> 
>>>>>>        at
>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(
>>>>> ThreadPoolExecutor.java:1142)
>>>>>> 
>>>>>>        at
>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(
>>>>> ThreadPoolExecutor.java:617)
>>>>>> 
>>>>>>        at java.lang.Thread.run(Thread.java:748)
>>>>>> 
>>>>>> Caused by: org.apache.hadoop.ipc.RemoteException: Failed to
>>> APPEND_FILE
>>>>>> /nifi_out/unmatched/log20160930.csv for
>>>>>> DFSClient_NONMAPREDUCE_-1411681085_97 on 10.128.0.7 because
>>>>>> DFSClient_NONMAPREDUCE_-1411681085_97 is already the current lease
>>>>> holder.
>>>>>> 
>>>>>>        at
>>>>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
>>>>> recoverLeaseInternal(FSNamesystem.java:2882)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
>>> appendFileInternal(
>>>>> FSNamesystem.java:2683)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
>>>>> appendFileInt(FSNamesystem.java:2982)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
>>>>> appendFile(FSNamesystem.java:2950)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.
>>>>> append(NameNodeRpcServer.java:655)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSi
>>>>> deTranslatorPB.append(ClientNamenodeProtocolServerSi
>>>>> deTranslatorPB.java:421)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$
>>>>> ClientNamenodeProtocol$2.callBlockingMethod(
>>> ClientNamenodeProtocolProtos.
>>>>> java)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$
>>> ProtoBufRpcInvoker.call(
>>>>> ProtobufRpcEngine.java:616)
>>>>>> 
>>>>>>        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
>>>>>> 
>>>>>>        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:
>>> 2049)
>>>>>> 
>>>>>>        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:
>>> 2045)
>>>>>> 
>>>>>>        at java.security.AccessController.doPrivileged(Native
>>> Method)
>>>>>> 
>>>>>>        at javax.security.auth.Subject.doAs(Subject.java:422)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.hadoop.security.UserGroupInformation.doAs(
>>>>> UserGroupInformation.java:1698)
>>>>>> 
>>>>>>        at org.apache.hadoop.ipc.Server$
>>> Handler.run(Server.java:2043)
>>>>>> 
>>>>>> 
>>>>>>        at org.apache.hadoop.ipc.Client.call(Client.java:1475)
>>>>>> 
>>>>>>        at org.apache.hadoop.ipc.Client.call(Client.java:1412)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.
>>>>> invoke(ProtobufRpcEngine.java:229)
>>>>>> 
>>>>>>        at com.sun.proxy.$Proxy188.append(Unknown Source)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslat
>>>>> orPB.append(ClientNamenodeProtocolTranslatorPB.java:328)
>>>>>> 
>>>>>>        at sun.reflect.GeneratedMethodAccessor314.invoke(Unknown
>>>> Source)
>>>>>> 
>>>>>>        at
>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(
>>>>> DelegatingMethodAccessorImpl.java:43)
>>>>>> 
>>>>>>        at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(
>>>>> RetryInvocationHandler.java:191)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(
>>>>> RetryInvocationHandler.java:102)
>>>>>> 
>>>>>>        at com.sun.proxy.$Proxy194.append(Unknown Source)
>>>>>> 
>>>>>>        at org.apache.hadoop.hdfs.DFSClient.callAppend(
>>>>> DFSClient.java:1808)
>>>>>> 
>>>>>>        at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.
>>> java:1877)
>>>>>> 
>>>>>>        at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.
>>> java:1847)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.hadoop.hdfs.DistributedFileSystem$4.
>>>>> doCall(DistributedFileSystem.java:340)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.hadoop.hdfs.DistributedFileSystem$4.
>>>>> doCall(DistributedFileSystem.java:336)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(
>>>>> FileSystemLinkResolver.java:81)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.hadoop.hdfs.DistributedFileSystem.append(
>>>>> DistributedFileSystem.java:348)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.hadoop.hdfs.DistributedFileSystem.append(
>>>>> DistributedFileSystem.java:318)
>>>>>> 
>>>>>>        at org.apache.hadoop.fs.FileSystem.append(FileSystem.
>>> java:1176)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.nifi.processors.hadoop.PutHDFS$1$1.process(
>>> PutHDFS.java:301)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.nifi.controller.repository.StandardProcessSession.read(
>>>>> StandardProcessSession.java:2125)
>>>>>> 
>>>>>>        ... 18 common frames omitted
>>>>>> 
>>>>>> On Wed, May 31, 2017 at 10:29 AM, Koji Kawamura <
>>>> ijokarumawak@gmail.com>
>>>>>> wrote:
>>>>>> 
>>>>>>> Hi Martin,
>>>>>>> 
>>>>>>> Generally, NiFi processor doesn't load entire content of file and
>> is
>>>>>>> capable of handling huge files.
>>>>>>> However, having massive amount of FlowFiles can cause OOM issue as
>>>>>>> FlowFiles and its Attributes resides on heap.
>>>>>>> 
>>>>>>> I assume you are using 'Line Split Count' as 1 at SplitText.
>>>>>>> We recommend to use multiple SplitText processors to not generate
>>> many
>>>>>>> FlowFiles in a short period of time.
>>>>>>> For example, 1st SplitText splits files per 5,000 lines, then the
>>> 2nd
>>>>>>> SplitText splits into each line.
>>>>>>> This way, we can decrease number of FlowFiles at a given time
>>>>>>> requiring less heap.
>>>>>>> 
>>>>>>> I hope this helps.
>>>>>>> 
>>>>>>> Thanks,
>>>>>>> Koji
>>>>>>> 
>>>>>>> On Wed, May 31, 2017 at 6:20 PM, Martin Eden <
>>> martineden131@gmail.com
>>>>> 
>>>>>>> wrote:
>>>>>>>> Hi all,
>>>>>>>> 
>>>>>>>> I have a vanilla Nifi 1.2.0 node with 1GB of heap.
>>>>>>>> 
>>>>>>>> The flow I am trying to run is:
>>>>>>>> ListHDFS -> FetchHDFS -> SplitText -> RouteOnContent ->
>>> MergeContent
>>>>> ->
>>>>>>>> PutHDFS
>>>>>>>> 
>>>>>>>> When I give it a 300MB input zip file (2.5GB uncompressed) I am
>>>>> getting
>>>>>>>> Java OutOfMemoryError as below.
>>>>>>>> 
>>>>>>>> Does NiFi read in the entire contents of files in memory? This
>> is
>>>>>>>> unexpected. I thought it is chunking through files. Giving more
>>> ram
>>>> is
>>>>>>> not
>>>>>>>> a solution as you can always get larger input files in the
>> future.
>>>>>>>> 
>>>>>>>> Does this mean NiFi is not suitable as a scalable ETL solution?
>>>>>>>> 
>>>>>>>> Can someone please explain what is happening and how to mitigate
>>>> large
>>>>>>>> files in NiFi? Any patterns?
>>>>>>>> 
>>>>>>>> Thanks,
>>>>>>>> M
>>>>>>>> 
>>>>>>>> ERROR [Timer-Driven Process Thread-9]
>>>>>>>> o.a.nifi.processors.standard.SplitText
>>>>>>>> SplitText[id=e16939ca-f28f-1178-b66e-054e43a0a724]
>>>>>>>> SplitText[id=e16939ca-f28f-1178-b66e-054e43a0a724] failed to
>>> process
>>>>>>>> session due to java.lang.OutOfMemoryError: Java heap space: {}
>>>>>>>> 
>>>>>>>> java.lang.OutOfMemoryError: Java heap space
>>>>>>>> 
>>>>>>>>        at
>> java.util.HashMap$EntrySet.iterator(HashMap.java:1013)
>>>>>>>> 
>>>>>>>>        at java.util.HashMap.putMapEntries(HashMap.java:511)
>>>>>>>> 
>>>>>>>>        at java.util.HashMap.<init>(HashMap.java:489)
>>>>>>>> 
>>>>>>>>        at
>>>>>>>> org.apache.nifi.controller.repository.StandardFlowFileRecord$
>>>>>>> Builder.initializeAttributes(StandardFlowFileRecord.java:219)
>>>>>>>> 
>>>>>>>>        at
>>>>>>>> org.apache.nifi.controller.repository.StandardFlowFileRecord$
>>>>>>> Builder.addAttributes(StandardFlowFileRecord.java:234)
>>>>>>>> 
>>>>>>>>        at
>>>>>>>> org.apache.nifi.controller.repository.StandardProcessSession.
>>>>>>> putAllAttributes(StandardProcessSession.java:1723)
>>>>>>>> 
>>>>>>>>        at
>>>>>>>> org.apache.nifi.processors.standard.SplitText.
>>>>>>> updateAttributes(SplitText.java:367)
>>>>>>>> 
>>>>>>>>        at
>>>>>>>> 
>>>> org.apache.nifi.processors.standard.SplitText.generateSplitFlowFiles(
>>>>>>> SplitText.java:320)
>>>>>>>> 
>>>>>>>>        at
>>>>>>>> org.apache.nifi.processors.standard.SplitText.onTrigger(
>>>>>>> SplitText.java:258)
>>>>>>>> 
>>>>>>>>        at
>>>>>>>> org.apache.nifi.processor.AbstractProcessor.onTrigger(
>>>>>>> AbstractProcessor.java:27)
>>>>>>>> 
>>>>>>>>        at
>>>>>>>> org.apache.nifi.controller.StandardProcessorNode.onTrigger(
>>>>>>> StandardProcessorNode.java:1118)
>>>>>>>> 
>>>>>>>>        at
>>>>>>>> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.
>>> call(
>>>>>>> ContinuallyRunProcessorTask.java:144)
>>>>>>>> 
>>>>>>>>        at
>>>>>>>> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.
>>> call(
>>>>>>> ContinuallyRunProcessorTask.java:47)
>>>>>>>> 
>>>>>>>>        at
>>>>>>>> org.apache.nifi.controller.scheduling.
>>> TimerDrivenSchedulingAgent$1.
>>>>> run(
>>>>>>> TimerDrivenSchedulingAgent.java:132)
>>>>>>>> 
>>>>>>>>        at
>>>>>>>> java.util.concurrent.Executors$RunnableAdapter.
>>>>> call(Executors.java:511)
>>>>>>>> 
>>>>>>>>        at java.util.concurrent.FutureTask.runAndReset(
>>>>>>> FutureTask.java:308)
>>>>>>>> 
>>>>>>>>        at
>>>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$
>>>>>>> ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.
>>> java:180)
>>>>>>>> 
>>>>>>>>        at
>>>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$
>>>>>>> ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>>>>>>>> 
>>>>>>>>        at
>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(
>>>>>>> ThreadPoolExecutor.java:1142)
>>>>>>>> 
>>>>>>>>        at
>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(
>>>>>>> ThreadPoolExecutor.java:617)
>>>>>>>> 
>>>>>>>>        at java.lang.Thread.run(Thread.java:748)
>>>>>>> 
>>>>> 
>>>> 
>>> 
>> 


Re: SplitText processor OOM larger input files

Posted by Andrew Grande <ap...@gmail.com>.
1 vcore, which is not even a full core (a shared and oversubscribed cpu
core). I'm not sure what you expected to see when you raised concurrency to
10 :)

There's a lot of things NiFi is doing behind the scenes, especially around
provenance recording. I don't recommend anything below 4 cores to have
meaningful experience​. If in a cloud, go to 8 cores per VM, unless you are
designing for a low footprint with MiNiFi.

Andrew

On Fri, Jun 2, 2017, 6:30 AM Martin Eden <ma...@gmail.com> wrote:

> Thanks Andrew,
>
> I have added UpdateAttribute processors to update the file names like you
> said. Now it works, writing out 1MB files at a time (updated the
> MergeContent MaxNumberOfEntries to 10000 to achieve that since each line in
> my csv is 100 bytes).
>
> The current flow is:
> ListHDFS -> FetchHDFS -> UnpackContent -> SplitText(5000) -> SplitText(1)
> -> RouteOnContent -> MergeContent -> UpdateAttribute -> PutHDFS
>
>
>     -> MergeContent -> UpdateAttribute -> PutHDFS
>
>
>     -> MergeContent -> UpdateAttribute -> PutHDFS
>
> So now let's talk performance.
>
> With a 1 node NiFi running on a Google Compute Engine instance with 1 core
> and 3.7 GB RAM and a 20GB disk, when I feed one 300MB zip file
> (uncompressed 2.5GB csv text) to this flow it is basically never finishing
> the job of transferring all the data.
>
> The inbound queue of RouteOnContent is always red and outbound queues are
> mostly green so that indicates that this processor is the bottleneck. To
> mitigate this I increased its number of concurrent tasks to 10 and then
> observed tasks in progress 10, outbound queues temporarily red, avg task
> latency increased from 2ms to 20ms, cpu on box maxed out to 100% by the
> NiFi java process, load avg 5.
>
> I then decreased the number of concurrent tasks of RouteOnContent to 5 and
> the task average time dropped to about half as expected, with cpu still
> 100% taken by the NiFi java process.
>
> The RouteOnContent has 3 simple regexes that it applies.
>
> Questions:
>
> 1. Is it safe to say that I maxed out the performance of this flow on one
> box with 1 core and 3.8 GB ram?
>
> 2. The performance seems a lot lower than expected though which is
> worrying. Is this expected? I am planning to do this at much larger scale,
> hundreds of GBs.
>
> 3. Is the RouteOnContent that I am using hitting NiFi hard? Is this not a
> recommended use case? Is there anything obviously wrong in my flow?
> Doing a bit of digging around in docs, presentations and other people's
> experience it seems that NiFi's sweet spot is routing files based on
> metadata (properties) and not really based on the actual contents of the
> files.
>
> 4. Is Nifi suitable for large scale ETL. Copying and doing simple massaging
> of data from File System A to File System B? From Database A to Database B?
> This is what I am evaluating it for.
>
> I do see how running this on a box with more CPU and RAM, faster disks
> (vertical scaling) would improve the performance and then adding another
> node to the cluster. But I want to first validate the choice of
> benchmarking flow and understand the performance on one box.
>
> Thanks a lot for all the people for helping me on this thread on my NiFi
> evaluation journey. This is a really big plus for community support of
> NiFi.
>
> M
>
>
>
>
>
>
> On Thu, Jun 1, 2017 at 1:30 PM, Andrew Grande <ap...@gmail.com> wrote:
>
> > It looks like your max bin size is 1000 and 10MB. Every time you hit
> those,
> > it will write out a merged file. Update tge filename attribute to be
> unique
> > before writing via PutHDFS.
> >
> > Andrew
> >
> > On Thu, Jun 1, 2017, 2:24 AM Martin Eden <ma...@gmail.com>
> wrote:
> >
> > > Hi Joe,
> > >
> > > Thanks for the explanations. Really useful in understanding how it
> works.
> > > Good to know that in the future this will be improved.
> > >
> > > About the appending to HDFS issue let me recap. My flow is:
> > > ListHDFS -> FetchHDFS -> UnpackContent -> SplitText(5000) ->
> SplitText(1)
> > > -> RouteOnContent -> MergeContent -> PutHDFS -> hdfs://dir1/f.csv
> > >
> > >
> > >     -> MergeContent -> PutHDFS -> hdfs://dir2/f.csv
> > >
> > >
> > >     -> MergeContent -> PutHDFS -> hdfs://dir3/f.csv
> > >
> > > ListHDFS is monitoring an input folder where 300MB zip files are added
> > > periodically. Each file uncompressed is 2.5 GB csv.
> > >
> > > So I am writing out to hdfs from multiple PutHDFS processors all of
> them
> > > having conflict resolution set to *APPEND* and different output
> folders.
> > >
> > > The name of the file will be however the same *f.csv*. It gets picked
> up
> > > from the name of the flow files which bear the name of the original
> > > uncompressed file. This happens I think in the MergeContent processor.
> > >
> > > Since all of these processors are running with 1 concurrent task, it
> > seems
> > > that we cannot append concurrently to hdfs even if we are appending to
> > > different files in different folders for some reason. Any ideas how to
> > > mitigate this?
> > >
> > > It seems other people have encountered this
> > > <
> > > https://community.hortonworks.com/questions/61096/puthdfs-
> > leaseexpiredexception-error-when-running-m.html
> > > >
> > > with NiFi but there is no conclusive solution. It does seem also that
> > > appending to hdfs is somewhat problematic
> > > <
> > > http://community.cloudera.com/t5/Storage-Random-Access-HDFS/
> > How-to-append-files-to-HDFS-with-Java-quot-current-leaseholder/td-p/41369
> > > >
> > > .
> > >
> > > So stepping back, the reason I am doing append in the PutHDFS is
> because
> > I
> > > did not manage to find a setting in the MergeContent processors that
> > > basically allows creation of multiple bundled flow files with the same
> > root
> > > name but different sequence numbers or timestamps (like f.csv.1,
> f.csv.2
> > > ....). They all get the same name which is f.csv. Is that possible
> > somehow?
> > > See my detailed MergeContent processor config below.
> > >
> > > So basically I have a 2.5GB csv file that eventually gets broken up in
> > > lines and the lines gets merged together in bundles of 10 MB but when
> > those
> > > bundles are emitted to the PutHDFS they have the same name as the
> > original
> > > file over and over again. I would like them to have a different name
> > based
> > > on a timestamp or sequence number let's say so that I can avoid the
> > append
> > > conflict resolution in PutHDFS which is causing me grief right now. Is
> > that
> > > possible?
> > >
> > > Thanks,
> > > M
> > >
> > >
> > > Currently my MergeContent processor config is:
> > >   <properties>
> > > *   <entry> <key>Merge Strategy</key> <value>Bin-Packing
> > Algorithm</value>
> > > </entry>*
> > > *   <entry> <key>Merge Format</key> <value>Binary Concatenation</value>
> > > </entry>*
> > >    <entry> <key>Attribute Strategy</key><value>Keep Only Common
> > > Attributes</value> </entry>
> > >    <entry> <key>Correlation Attribute Name</key> </entry>
> > >    <entry> <key>Minimum Number of Entries</key><value>1</value>
> </entry>
> > >    <entry> <key>Maximum Number of Entries</key> <value>1000</value>
> > > </entry>
> > >    <entry> <key>Minimum Group Size</key> <value>0 B</value> </entry>
> > > *   <entry> <key>Maximum Group Size</key> <value>10 MB</value>
> </entry>*
> > >    <entry> <key>Max Bin Age</key> </entry>
> > >    <entry> <key>Maximum number of Bins</key> <value>5</value> </entry>
> > >    <entry> <key>Delimiter Strategy</key><value>Text</value> </entry>
> > >    <entry> <key>Header File</key> </entry>
> > >    <entry> <key>Footer File</key> </entry>
> > >    <entry> <key>Demarcator File</key> <value></value> </entry>
> > >    <entry> <key>Compression Level</key> <value>1</value></entry>
> > >    <entry> <key>Keep Path</key> <value>false</value> </entry>
> > >   </properties>
> > >
> > >
> > > On Wed, May 31, 2017 at 3:52 PM, Joe Witt <jo...@gmail.com> wrote:
> > >
> > > > Split failed before even with backpressure:
> > > > - yes that backpressure kicks in when destination queues for a given
> > > > processor have reached their target size (in count of flowfiles or
> > > > total size represented).  However, to clarify why the OOM happened it
> > > > is important to realize that it is not about 'flow files over a quick
> > > > period of time' but rather 'flow files held within a single process
> > > > session.  Your SplitText was pulling a single flowfile but then
> > > > creating lets say 1,000,000 resulting flow files and then committing
> > > > that change.  That happens within a session.  But all those flow file
> > > > objects (not their content) are held in memory and at such high
> > > > numbers it creates excessive heap usage.  The two phase
> divide/conquer
> > > > approach Koji suggested solves that and eventually we need to solve
> > > > that by swapping out the flowfiles to disk within a session.  We
> > > > actually do swap out flowfiles sitting on queues after a certain
> > > > threshold is reached for this very reason.  This means you should be
> > > > able to have many millions of flowfiles sitting around in the flow
> for
> > > > whatever reason and not hit memory problems.
> > > >
> > > > Hope that helps there.
> > > >
> > > > On PutHDFS it looks like possibly two things are trying to append to
> > > > the same file?  If yes I'd really recommend not appending but rather
> > > > use MergeContent to create data bundles of a given size then write
> > > > those to HDFS.
> > > >
> > > > Thanks
> > > > Joe
> > > >
> > > > On Wed, May 31, 2017 at 10:33 AM, Martin Eden <
> martineden131@gmail.com
> > >
> > > > wrote:
> > > > > Hi Koji,
> > > > >
> > > > > Good to know that it can handle large files. I thought it was the
> > case
> > > > but
> > > > > I was just not seeing in practice.
> > > > >
> > > > > Yes I am using 'Line Split Count' as 1 at SplitText.
> > > > >
> > > > > I added the extra SplitText processor exactly as you suggested and
> > the
> > > > OOM
> > > > > went away. So, big thanks!!!
> > > > >
> > > > > However I have 2 follow-up questions:
> > > > >
> > > > > 1. Before adding the extra SplitText processor I also played with
> the
> > > > > back-pressure settings on the outbound queue of the original
> > SplitText
> > > > > processor, since you mentioned that it is generating files at a
> rate
> > > that
> > > > > is too high, I figure the queue should slow it down. I tried a
> limit
> > of
> > > > > 100MB or 1000 files and I still got the OOMs in the SplitText
> > > processor.
> > > > > Why isn't the queue back-pressure helping me in this case? Where
> > would
> > > > that
> > > > > come in handy then? Why id the extra SplitText processor needed to
> > fix
> > > > > things and not just the queue back-pressure?
> > > > >
> > > > > 2. I am now close to completing my flow but I am hitting another
> > error.
> > > > > This time it's the last stage, the PutHDFS throws
> > > > > o.apache.nifi.processors.hadoop.PutHDFS
> > > > > PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec5aa] Failed to write to
> > > HDFS
> > > > > due to org.apache.nifi.processor.exception.ProcessException:
> > > IOException
> > > > > thrown from PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec
> > > > > See the full stacktrace below.
> > > > > I have a parallelism of 1 for my PutHDFS processors. Any ideas why
> > this
> > > > is
> > > > > happening?
> > > > >
> > > > > Thanks,
> > > > > Martin
> > > > >
> > > > > 2017-05-31 13:50:29,341 ERROR [Timer-Driven Process Thread-5]
> > > > > o.apache.nifi.processors.hadoop.PutHDFS
> > > > > PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec5aa] F
> > > > >
> > > > > ailed to write to HDFS due to
> > > > > org.apache.nifi.processor.exception.ProcessException: IOException
> > > thrown
> > > > > from PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec
> > > > >
> > > > > 5aa]:
> > > > > org.apache.hadoop.ipc.RemoteException(org.apache.
> > hadoop.hdfs.protocol.
> > > > AlreadyBeingCreatedException):
> > > > > Failed to APPEND_FILE /nifi_out/unmatched/log
> > > > >
> > > > > 20160930.csv for DFSClient_NONMAPREDUCE_-1411681085_97 on
> 10.128.0.7
> > > > > because DFSClient_NONMAPREDUCE_-1411681085_97 is already the
> current
> > > > lease
> > > > > holder.
> > > > >
> > > > >         at
> > > > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> > > > recoverLeaseInternal(FSNamesystem.java:2882)
> > > > >
> > > > >         at
> > > > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> > appendFileInternal(
> > > > FSNamesystem.java:2683)
> > > > >
> > > > >         at
> > > > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> > > > appendFileInt(FSNamesystem.java:2982)
> > > > >
> > > > >         at
> > > > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> > > > appendFile(FSNamesystem.java:2950)
> > > > >
> > > > >         at
> > > > > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.
> > > > append(NameNodeRpcServer.java:655)
> > > > >
> > > > >         at
> > > > > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSi
> > > > deTranslatorPB.append(ClientNamenodeProtocolServerSi
> > > > deTranslatorPB.java:421)
> > > > >
> > > > >         at
> > > > > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$
> > > >
> > > ClientNamenodeProtocol$2.callBlockingMethod(
> > ClientNamenodeProtocolProtos.j
> > > > >
> > > > > ava)
> > > > >
> > > > >         at
> > > > > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$
> > ProtoBufRpcInvoker.call(
> > > > ProtobufRpcEngine.java:616)
> > > > >
> > > > >         at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
> > > > >
> > > > >         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:
> > 2049)
> > > > >
> > > > >         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:
> > 2045)
> > > > >
> > > > >         at java.security.AccessController.doPrivileged(Native
> > Method)
> > > > >
> > > > >         at javax.security.auth.Subject.doAs(Subject.java:422)
> > > > >
> > > > >         at
> > > > > org.apache.hadoop.security.UserGroupInformation.doAs(
> > > > UserGroupInformation.java:1698)
> > > > >
> > > > >         at org.apache.hadoop.ipc.Server$
> > Handler.run(Server.java:2043)
> > > > >
> > > > > : {}
> > > > >
> > > > > org.apache.nifi.processor.exception.ProcessException: IOException
> > > thrown
> > > > > from PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec5aa]:
> > > > > org.apache.hadoop.ipc.Re
> > > > >
> > > > > moteException(org.apache.hadoop.hdfs.protocol.
> > > > AlreadyBeingCreatedException):
> > > > > Failed to APPEND_FILE /nifi_out/unmatched/log20160930.csv for
> > > > DFSClient_NON
> > > > >
> > > > > MAPREDUCE_-1411681085_97 on 10.128.0.7 because
> > > > > DFSClient_NONMAPREDUCE_-1411681085_97 is already the current lease
> > > > holder.
> > > > >
> > > > >         at
> > > > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> > > > recoverLeaseInternal(FSNamesystem.java:2882)
> > > > >
> > > > >         at
> > > > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> > appendFileInternal(
> > > > FSNamesystem.java:2683)
> > > > >
> > > > >         at
> > > > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> > > > appendFileInt(FSNamesystem.java:2982)
> > > > >
> > > > >         at
> > > > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> > > > appendFile(FSNamesystem.java:2950)
> > > > >
> > > > >         at
> > > > > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.
> > > > append(NameNodeRpcServer.java:655)
> > > > >
> > > > >         at
> > > > > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSi
> > > > deTranslatorPB.append(ClientNamenodeProtocolServerSi
> > > > deTranslatorPB.java:421)
> > > > >
> > > > >         at
> > > > > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$
> > > >
> > > ClientNamenodeProtocol$2.callBlockingMethod(
> > ClientNamenodeProtocolProtos.j
> > > > >
> > > > > ava)
> > > > >
> > > > >         at
> > > > > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$
> > ProtoBufRpcInvoker.call(
> > > > ProtobufRpcEngine.java:616)
> > > > >
> > > > >         at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
> > > > >
> > > > >         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:
> > 2049)
> > > > >
> > > > >         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:
> > 2045)
> > > > >
> > > > >         at java.security.AccessController.doPrivileged(Native
> > Method)
> > > > >
> > > > >         at javax.security.auth.Subject.doAs(Subject.java:422)
> > > > >
> > > > >         at
> > > > > org.apache.hadoop.security.UserGroupInformation.doAs(
> > > > UserGroupInformation.java:1698)
> > > > >
> > > > >         at org.apache.hadoop.ipc.Server$
> > Handler.run(Server.java:2043)
> > > > >
> > > > >
> > > > >         at
> > > > > org.apache.nifi.controller.repository.StandardProcessSession.read(
> > > > StandardProcessSession.java:2148)
> > > > >
> > > > >         at
> > > > > org.apache.nifi.controller.repository.StandardProcessSession.read(
> > > > StandardProcessSession.java:2095)
> > > > >
> > > > >         at org.apache.nifi.processors.hadoop.PutHDFS$1.run(PutHDFS.
> > > > java:293)
> > > > >
> > > > >         at java.security.AccessController.doPrivileged(Native
> > Method)
> > > > >
> > > > >         at javax.security.auth.Subject.doAs(Subject.java:360)
> > > > >
> > > > >         at
> > > > > org.apache.hadoop.security.UserGroupInformation.doAs(
> > > > UserGroupInformation.java:1678)
> > > > >
> > > > >         at
> > > > > org.apache.nifi.processors.hadoop.PutHDFS.onTrigger(
> > PutHDFS.java:223)
> > > > >
> > > > >         at
> > > > > org.apache.nifi.processor.AbstractProcessor.onTrigger(
> > > > AbstractProcessor.java:27)
> > > > >
> > > > >         at
> > > > > org.apache.nifi.controller.StandardProcessorNode.onTrigger(
> > > > StandardProcessorNode.java:1118)
> > > > >
> > > > >         at
> > > > > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(
> > > > ContinuallyRunProcessorTask.java:144)
> > > > >
> > > > >         at
> > > > > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(
> > > > ContinuallyRunProcessorTask.java:47)
> > > > >
> > > > >         at
> > > > > org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.
> > run(
> > > > TimerDrivenSchedulingAgent.java:132)
> > > > >
> > > > >         at
> > > > > java.util.concurrent.Executors$RunnableAdapter.
> > call(Executors.java:511)
> > > > >
> > > > >         at java.util.concurrent.FutureTask.runAndReset(
> > > > FutureTask.java:308)
> > > > >
> > > > >         at
> > > > > java.util.concurrent.ScheduledThreadPoolExecutor$
> > > > ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> > > > >
> > > > >         at
> > > > > java.util.concurrent.ScheduledThreadPoolExecutor$
> > > > ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> > > > >
> > > > >         at
> > > > > java.util.concurrent.ThreadPoolExecutor.runWorker(
> > > > ThreadPoolExecutor.java:1142)
> > > > >
> > > > >         at
> > > > > java.util.concurrent.ThreadPoolExecutor$Worker.run(
> > > > ThreadPoolExecutor.java:617)
> > > > >
> > > > >         at java.lang.Thread.run(Thread.java:748)
> > > > >
> > > > > Caused by: org.apache.hadoop.ipc.RemoteException: Failed to
> > APPEND_FILE
> > > > > /nifi_out/unmatched/log20160930.csv for
> > > > > DFSClient_NONMAPREDUCE_-1411681085_97 on 10.128.0.7 because
> > > > > DFSClient_NONMAPREDUCE_-1411681085_97 is already the current lease
> > > > holder.
> > > > >
> > > > >         at
> > > > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> > > > recoverLeaseInternal(FSNamesystem.java:2882)
> > > > >
> > > > >         at
> > > > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> > appendFileInternal(
> > > > FSNamesystem.java:2683)
> > > > >
> > > > >         at
> > > > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> > > > appendFileInt(FSNamesystem.java:2982)
> > > > >
> > > > >         at
> > > > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> > > > appendFile(FSNamesystem.java:2950)
> > > > >
> > > > >         at
> > > > > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.
> > > > append(NameNodeRpcServer.java:655)
> > > > >
> > > > >         at
> > > > > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSi
> > > > deTranslatorPB.append(ClientNamenodeProtocolServerSi
> > > > deTranslatorPB.java:421)
> > > > >
> > > > >         at
> > > > > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$
> > > > ClientNamenodeProtocol$2.callBlockingMethod(
> > ClientNamenodeProtocolProtos.
> > > > java)
> > > > >
> > > > >         at
> > > > > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$
> > ProtoBufRpcInvoker.call(
> > > > ProtobufRpcEngine.java:616)
> > > > >
> > > > >         at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
> > > > >
> > > > >         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:
> > 2049)
> > > > >
> > > > >         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:
> > 2045)
> > > > >
> > > > >         at java.security.AccessController.doPrivileged(Native
> > Method)
> > > > >
> > > > >         at javax.security.auth.Subject.doAs(Subject.java:422)
> > > > >
> > > > >         at
> > > > > org.apache.hadoop.security.UserGroupInformation.doAs(
> > > > UserGroupInformation.java:1698)
> > > > >
> > > > >         at org.apache.hadoop.ipc.Server$
> > Handler.run(Server.java:2043)
> > > > >
> > > > >
> > > > >         at org.apache.hadoop.ipc.Client.call(Client.java:1475)
> > > > >
> > > > >         at org.apache.hadoop.ipc.Client.call(Client.java:1412)
> > > > >
> > > > >         at
> > > > > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.
> > > > invoke(ProtobufRpcEngine.java:229)
> > > > >
> > > > >         at com.sun.proxy.$Proxy188.append(Unknown Source)
> > > > >
> > > > >         at
> > > > > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslat
> > > > orPB.append(ClientNamenodeProtocolTranslatorPB.java:328)
> > > > >
> > > > >         at sun.reflect.GeneratedMethodAccessor314.invoke(Unknown
> > > Source)
> > > > >
> > > > >         at
> > > > > sun.reflect.DelegatingMethodAccessorImpl.invoke(
> > > > DelegatingMethodAccessorImpl.java:43)
> > > > >
> > > > >         at java.lang.reflect.Method.invoke(Method.java:498)
> > > > >
> > > > >         at
> > > > > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(
> > > > RetryInvocationHandler.java:191)
> > > > >
> > > > >         at
> > > > > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(
> > > > RetryInvocationHandler.java:102)
> > > > >
> > > > >         at com.sun.proxy.$Proxy194.append(Unknown Source)
> > > > >
> > > > >         at org.apache.hadoop.hdfs.DFSClient.callAppend(
> > > > DFSClient.java:1808)
> > > > >
> > > > >         at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.
> > java:1877)
> > > > >
> > > > >         at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.
> > java:1847)
> > > > >
> > > > >         at
> > > > > org.apache.hadoop.hdfs.DistributedFileSystem$4.
> > > > doCall(DistributedFileSystem.java:340)
> > > > >
> > > > >         at
> > > > > org.apache.hadoop.hdfs.DistributedFileSystem$4.
> > > > doCall(DistributedFileSystem.java:336)
> > > > >
> > > > >         at
> > > > > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(
> > > > FileSystemLinkResolver.java:81)
> > > > >
> > > > >         at
> > > > > org.apache.hadoop.hdfs.DistributedFileSystem.append(
> > > > DistributedFileSystem.java:348)
> > > > >
> > > > >         at
> > > > > org.apache.hadoop.hdfs.DistributedFileSystem.append(
> > > > DistributedFileSystem.java:318)
> > > > >
> > > > >         at org.apache.hadoop.fs.FileSystem.append(FileSystem.
> > java:1176)
> > > > >
> > > > >         at
> > > > > org.apache.nifi.processors.hadoop.PutHDFS$1$1.process(
> > PutHDFS.java:301)
> > > > >
> > > > >         at
> > > > > org.apache.nifi.controller.repository.StandardProcessSession.read(
> > > > StandardProcessSession.java:2125)
> > > > >
> > > > >         ... 18 common frames omitted
> > > > >
> > > > > On Wed, May 31, 2017 at 10:29 AM, Koji Kawamura <
> > > ijokarumawak@gmail.com>
> > > > > wrote:
> > > > >
> > > > >> Hi Martin,
> > > > >>
> > > > >> Generally, NiFi processor doesn't load entire content of file and
> is
> > > > >> capable of handling huge files.
> > > > >> However, having massive amount of FlowFiles can cause OOM issue as
> > > > >> FlowFiles and its Attributes resides on heap.
> > > > >>
> > > > >> I assume you are using 'Line Split Count' as 1 at SplitText.
> > > > >> We recommend to use multiple SplitText processors to not generate
> > many
> > > > >> FlowFiles in a short period of time.
> > > > >> For example, 1st SplitText splits files per 5,000 lines, then the
> > 2nd
> > > > >> SplitText splits into each line.
> > > > >> This way, we can decrease number of FlowFiles at a given time
> > > > >> requiring less heap.
> > > > >>
> > > > >> I hope this helps.
> > > > >>
> > > > >> Thanks,
> > > > >> Koji
> > > > >>
> > > > >> On Wed, May 31, 2017 at 6:20 PM, Martin Eden <
> > martineden131@gmail.com
> > > >
> > > > >> wrote:
> > > > >> > Hi all,
> > > > >> >
> > > > >> > I have a vanilla Nifi 1.2.0 node with 1GB of heap.
> > > > >> >
> > > > >> > The flow I am trying to run is:
> > > > >> > ListHDFS -> FetchHDFS -> SplitText -> RouteOnContent ->
> > MergeContent
> > > > ->
> > > > >> > PutHDFS
> > > > >> >
> > > > >> > When I give it a 300MB input zip file (2.5GB uncompressed) I am
> > > > getting
> > > > >> > Java OutOfMemoryError as below.
> > > > >> >
> > > > >> > Does NiFi read in the entire contents of files in memory? This
> is
> > > > >> > unexpected. I thought it is chunking through files. Giving more
> > ram
> > > is
> > > > >> not
> > > > >> > a solution as you can always get larger input files in the
> future.
> > > > >> >
> > > > >> > Does this mean NiFi is not suitable as a scalable ETL solution?
> > > > >> >
> > > > >> > Can someone please explain what is happening and how to mitigate
> > > large
> > > > >> > files in NiFi? Any patterns?
> > > > >> >
> > > > >> > Thanks,
> > > > >> > M
> > > > >> >
> > > > >> > ERROR [Timer-Driven Process Thread-9]
> > > > >> > o.a.nifi.processors.standard.SplitText
> > > > >> > SplitText[id=e16939ca-f28f-1178-b66e-054e43a0a724]
> > > > >> > SplitText[id=e16939ca-f28f-1178-b66e-054e43a0a724] failed to
> > process
> > > > >> > session due to java.lang.OutOfMemoryError: Java heap space: {}
> > > > >> >
> > > > >> > java.lang.OutOfMemoryError: Java heap space
> > > > >> >
> > > > >> >         at
> java.util.HashMap$EntrySet.iterator(HashMap.java:1013)
> > > > >> >
> > > > >> >         at java.util.HashMap.putMapEntries(HashMap.java:511)
> > > > >> >
> > > > >> >         at java.util.HashMap.<init>(HashMap.java:489)
> > > > >> >
> > > > >> >         at
> > > > >> > org.apache.nifi.controller.repository.StandardFlowFileRecord$
> > > > >> Builder.initializeAttributes(StandardFlowFileRecord.java:219)
> > > > >> >
> > > > >> >         at
> > > > >> > org.apache.nifi.controller.repository.StandardFlowFileRecord$
> > > > >> Builder.addAttributes(StandardFlowFileRecord.java:234)
> > > > >> >
> > > > >> >         at
> > > > >> > org.apache.nifi.controller.repository.StandardProcessSession.
> > > > >> putAllAttributes(StandardProcessSession.java:1723)
> > > > >> >
> > > > >> >         at
> > > > >> > org.apache.nifi.processors.standard.SplitText.
> > > > >> updateAttributes(SplitText.java:367)
> > > > >> >
> > > > >> >         at
> > > > >> >
> > > org.apache.nifi.processors.standard.SplitText.generateSplitFlowFiles(
> > > > >> SplitText.java:320)
> > > > >> >
> > > > >> >         at
> > > > >> > org.apache.nifi.processors.standard.SplitText.onTrigger(
> > > > >> SplitText.java:258)
> > > > >> >
> > > > >> >         at
> > > > >> > org.apache.nifi.processor.AbstractProcessor.onTrigger(
> > > > >> AbstractProcessor.java:27)
> > > > >> >
> > > > >> >         at
> > > > >> > org.apache.nifi.controller.StandardProcessorNode.onTrigger(
> > > > >> StandardProcessorNode.java:1118)
> > > > >> >
> > > > >> >         at
> > > > >> > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.
> > call(
> > > > >> ContinuallyRunProcessorTask.java:144)
> > > > >> >
> > > > >> >         at
> > > > >> > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.
> > call(
> > > > >> ContinuallyRunProcessorTask.java:47)
> > > > >> >
> > > > >> >         at
> > > > >> > org.apache.nifi.controller.scheduling.
> > TimerDrivenSchedulingAgent$1.
> > > > run(
> > > > >> TimerDrivenSchedulingAgent.java:132)
> > > > >> >
> > > > >> >         at
> > > > >> > java.util.concurrent.Executors$RunnableAdapter.
> > > > call(Executors.java:511)
> > > > >> >
> > > > >> >         at java.util.concurrent.FutureTask.runAndReset(
> > > > >> FutureTask.java:308)
> > > > >> >
> > > > >> >         at
> > > > >> > java.util.concurrent.ScheduledThreadPoolExecutor$
> > > > >> ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.
> > java:180)
> > > > >> >
> > > > >> >         at
> > > > >> > java.util.concurrent.ScheduledThreadPoolExecutor$
> > > > >> ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> > > > >> >
> > > > >> >         at
> > > > >> > java.util.concurrent.ThreadPoolExecutor.runWorker(
> > > > >> ThreadPoolExecutor.java:1142)
> > > > >> >
> > > > >> >         at
> > > > >> > java.util.concurrent.ThreadPoolExecutor$Worker.run(
> > > > >> ThreadPoolExecutor.java:617)
> > > > >> >
> > > > >> >         at java.lang.Thread.run(Thread.java:748)
> > > > >>
> > > >
> > >
> >
>

Re: SplitText processor OOM larger input files

Posted by Martin Eden <ma...@gmail.com>.
Thanks Andrew,

I have added UpdateAttribute processors to update the file names like you
said. Now it works, writing out 1MB files at a time (updated the
MergeContent MaxNumberOfEntries to 10000 to achieve that since each line in
my csv is 100 bytes).

The current flow is:
ListHDFS -> FetchHDFS -> UnpackContent -> SplitText(5000) -> SplitText(1)
-> RouteOnContent -> MergeContent -> UpdateAttribute -> PutHDFS


    -> MergeContent -> UpdateAttribute -> PutHDFS


    -> MergeContent -> UpdateAttribute -> PutHDFS

So now let's talk performance.

With a 1 node NiFi running on a Google Compute Engine instance with 1 core
and 3.7 GB RAM and a 20GB disk, when I feed one 300MB zip file
(uncompressed 2.5GB csv text) to this flow it is basically never finishing
the job of transferring all the data.

The inbound queue of RouteOnContent is always red and outbound queues are
mostly green so that indicates that this processor is the bottleneck. To
mitigate this I increased its number of concurrent tasks to 10 and then
observed tasks in progress 10, outbound queues temporarily red, avg task
latency increased from 2ms to 20ms, cpu on box maxed out to 100% by the
NiFi java process, load avg 5.

I then decreased the number of concurrent tasks of RouteOnContent to 5 and
the task average time dropped to about half as expected, with cpu still
100% taken by the NiFi java process.

The RouteOnContent has 3 simple regexes that it applies.

Questions:

1. Is it safe to say that I maxed out the performance of this flow on one
box with 1 core and 3.8 GB ram?

2. The performance seems a lot lower than expected though which is
worrying. Is this expected? I am planning to do this at much larger scale,
hundreds of GBs.

3. Is the RouteOnContent that I am using hitting NiFi hard? Is this not a
recommended use case? Is there anything obviously wrong in my flow?
Doing a bit of digging around in docs, presentations and other people's
experience it seems that NiFi's sweet spot is routing files based on
metadata (properties) and not really based on the actual contents of the
files.

4. Is Nifi suitable for large scale ETL. Copying and doing simple massaging
of data from File System A to File System B? From Database A to Database B?
This is what I am evaluating it for.

I do see how running this on a box with more CPU and RAM, faster disks
(vertical scaling) would improve the performance and then adding another
node to the cluster. But I want to first validate the choice of
benchmarking flow and understand the performance on one box.

Thanks a lot for all the people for helping me on this thread on my NiFi
evaluation journey. This is a really big plus for community support of NiFi.

M






On Thu, Jun 1, 2017 at 1:30 PM, Andrew Grande <ap...@gmail.com> wrote:

> It looks like your max bin size is 1000 and 10MB. Every time you hit those,
> it will write out a merged file. Update tge filename attribute to be unique
> before writing via PutHDFS.
>
> Andrew
>
> On Thu, Jun 1, 2017, 2:24 AM Martin Eden <ma...@gmail.com> wrote:
>
> > Hi Joe,
> >
> > Thanks for the explanations. Really useful in understanding how it works.
> > Good to know that in the future this will be improved.
> >
> > About the appending to HDFS issue let me recap. My flow is:
> > ListHDFS -> FetchHDFS -> UnpackContent -> SplitText(5000) -> SplitText(1)
> > -> RouteOnContent -> MergeContent -> PutHDFS -> hdfs://dir1/f.csv
> >
> >
> >     -> MergeContent -> PutHDFS -> hdfs://dir2/f.csv
> >
> >
> >     -> MergeContent -> PutHDFS -> hdfs://dir3/f.csv
> >
> > ListHDFS is monitoring an input folder where 300MB zip files are added
> > periodically. Each file uncompressed is 2.5 GB csv.
> >
> > So I am writing out to hdfs from multiple PutHDFS processors all of them
> > having conflict resolution set to *APPEND* and different output folders.
> >
> > The name of the file will be however the same *f.csv*. It gets picked up
> > from the name of the flow files which bear the name of the original
> > uncompressed file. This happens I think in the MergeContent processor.
> >
> > Since all of these processors are running with 1 concurrent task, it
> seems
> > that we cannot append concurrently to hdfs even if we are appending to
> > different files in different folders for some reason. Any ideas how to
> > mitigate this?
> >
> > It seems other people have encountered this
> > <
> > https://community.hortonworks.com/questions/61096/puthdfs-
> leaseexpiredexception-error-when-running-m.html
> > >
> > with NiFi but there is no conclusive solution. It does seem also that
> > appending to hdfs is somewhat problematic
> > <
> > http://community.cloudera.com/t5/Storage-Random-Access-HDFS/
> How-to-append-files-to-HDFS-with-Java-quot-current-leaseholder/td-p/41369
> > >
> > .
> >
> > So stepping back, the reason I am doing append in the PutHDFS is because
> I
> > did not manage to find a setting in the MergeContent processors that
> > basically allows creation of multiple bundled flow files with the same
> root
> > name but different sequence numbers or timestamps (like f.csv.1, f.csv.2
> > ....). They all get the same name which is f.csv. Is that possible
> somehow?
> > See my detailed MergeContent processor config below.
> >
> > So basically I have a 2.5GB csv file that eventually gets broken up in
> > lines and the lines gets merged together in bundles of 10 MB but when
> those
> > bundles are emitted to the PutHDFS they have the same name as the
> original
> > file over and over again. I would like them to have a different name
> based
> > on a timestamp or sequence number let's say so that I can avoid the
> append
> > conflict resolution in PutHDFS which is causing me grief right now. Is
> that
> > possible?
> >
> > Thanks,
> > M
> >
> >
> > Currently my MergeContent processor config is:
> >   <properties>
> > *   <entry> <key>Merge Strategy</key> <value>Bin-Packing
> Algorithm</value>
> > </entry>*
> > *   <entry> <key>Merge Format</key> <value>Binary Concatenation</value>
> > </entry>*
> >    <entry> <key>Attribute Strategy</key><value>Keep Only Common
> > Attributes</value> </entry>
> >    <entry> <key>Correlation Attribute Name</key> </entry>
> >    <entry> <key>Minimum Number of Entries</key><value>1</value> </entry>
> >    <entry> <key>Maximum Number of Entries</key> <value>1000</value>
> > </entry>
> >    <entry> <key>Minimum Group Size</key> <value>0 B</value> </entry>
> > *   <entry> <key>Maximum Group Size</key> <value>10 MB</value> </entry>*
> >    <entry> <key>Max Bin Age</key> </entry>
> >    <entry> <key>Maximum number of Bins</key> <value>5</value> </entry>
> >    <entry> <key>Delimiter Strategy</key><value>Text</value> </entry>
> >    <entry> <key>Header File</key> </entry>
> >    <entry> <key>Footer File</key> </entry>
> >    <entry> <key>Demarcator File</key> <value></value> </entry>
> >    <entry> <key>Compression Level</key> <value>1</value></entry>
> >    <entry> <key>Keep Path</key> <value>false</value> </entry>
> >   </properties>
> >
> >
> > On Wed, May 31, 2017 at 3:52 PM, Joe Witt <jo...@gmail.com> wrote:
> >
> > > Split failed before even with backpressure:
> > > - yes that backpressure kicks in when destination queues for a given
> > > processor have reached their target size (in count of flowfiles or
> > > total size represented).  However, to clarify why the OOM happened it
> > > is important to realize that it is not about 'flow files over a quick
> > > period of time' but rather 'flow files held within a single process
> > > session.  Your SplitText was pulling a single flowfile but then
> > > creating lets say 1,000,000 resulting flow files and then committing
> > > that change.  That happens within a session.  But all those flow file
> > > objects (not their content) are held in memory and at such high
> > > numbers it creates excessive heap usage.  The two phase divide/conquer
> > > approach Koji suggested solves that and eventually we need to solve
> > > that by swapping out the flowfiles to disk within a session.  We
> > > actually do swap out flowfiles sitting on queues after a certain
> > > threshold is reached for this very reason.  This means you should be
> > > able to have many millions of flowfiles sitting around in the flow for
> > > whatever reason and not hit memory problems.
> > >
> > > Hope that helps there.
> > >
> > > On PutHDFS it looks like possibly two things are trying to append to
> > > the same file?  If yes I'd really recommend not appending but rather
> > > use MergeContent to create data bundles of a given size then write
> > > those to HDFS.
> > >
> > > Thanks
> > > Joe
> > >
> > > On Wed, May 31, 2017 at 10:33 AM, Martin Eden <martineden131@gmail.com
> >
> > > wrote:
> > > > Hi Koji,
> > > >
> > > > Good to know that it can handle large files. I thought it was the
> case
> > > but
> > > > I was just not seeing in practice.
> > > >
> > > > Yes I am using 'Line Split Count' as 1 at SplitText.
> > > >
> > > > I added the extra SplitText processor exactly as you suggested and
> the
> > > OOM
> > > > went away. So, big thanks!!!
> > > >
> > > > However I have 2 follow-up questions:
> > > >
> > > > 1. Before adding the extra SplitText processor I also played with the
> > > > back-pressure settings on the outbound queue of the original
> SplitText
> > > > processor, since you mentioned that it is generating files at a rate
> > that
> > > > is too high, I figure the queue should slow it down. I tried a limit
> of
> > > > 100MB or 1000 files and I still got the OOMs in the SplitText
> > processor.
> > > > Why isn't the queue back-pressure helping me in this case? Where
> would
> > > that
> > > > come in handy then? Why id the extra SplitText processor needed to
> fix
> > > > things and not just the queue back-pressure?
> > > >
> > > > 2. I am now close to completing my flow but I am hitting another
> error.
> > > > This time it's the last stage, the PutHDFS throws
> > > > o.apache.nifi.processors.hadoop.PutHDFS
> > > > PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec5aa] Failed to write to
> > HDFS
> > > > due to org.apache.nifi.processor.exception.ProcessException:
> > IOException
> > > > thrown from PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec
> > > > See the full stacktrace below.
> > > > I have a parallelism of 1 for my PutHDFS processors. Any ideas why
> this
> > > is
> > > > happening?
> > > >
> > > > Thanks,
> > > > Martin
> > > >
> > > > 2017-05-31 13:50:29,341 ERROR [Timer-Driven Process Thread-5]
> > > > o.apache.nifi.processors.hadoop.PutHDFS
> > > > PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec5aa] F
> > > >
> > > > ailed to write to HDFS due to
> > > > org.apache.nifi.processor.exception.ProcessException: IOException
> > thrown
> > > > from PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec
> > > >
> > > > 5aa]:
> > > > org.apache.hadoop.ipc.RemoteException(org.apache.
> hadoop.hdfs.protocol.
> > > AlreadyBeingCreatedException):
> > > > Failed to APPEND_FILE /nifi_out/unmatched/log
> > > >
> > > > 20160930.csv for DFSClient_NONMAPREDUCE_-1411681085_97 on 10.128.0.7
> > > > because DFSClient_NONMAPREDUCE_-1411681085_97 is already the current
> > > lease
> > > > holder.
> > > >
> > > >         at
> > > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> > > recoverLeaseInternal(FSNamesystem.java:2882)
> > > >
> > > >         at
> > > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> appendFileInternal(
> > > FSNamesystem.java:2683)
> > > >
> > > >         at
> > > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> > > appendFileInt(FSNamesystem.java:2982)
> > > >
> > > >         at
> > > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> > > appendFile(FSNamesystem.java:2950)
> > > >
> > > >         at
> > > > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.
> > > append(NameNodeRpcServer.java:655)
> > > >
> > > >         at
> > > > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSi
> > > deTranslatorPB.append(ClientNamenodeProtocolServerSi
> > > deTranslatorPB.java:421)
> > > >
> > > >         at
> > > > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$
> > >
> > ClientNamenodeProtocol$2.callBlockingMethod(
> ClientNamenodeProtocolProtos.j
> > > >
> > > > ava)
> > > >
> > > >         at
> > > > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$
> ProtoBufRpcInvoker.call(
> > > ProtobufRpcEngine.java:616)
> > > >
> > > >         at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
> > > >
> > > >         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:
> 2049)
> > > >
> > > >         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:
> 2045)
> > > >
> > > >         at java.security.AccessController.doPrivileged(Native
> Method)
> > > >
> > > >         at javax.security.auth.Subject.doAs(Subject.java:422)
> > > >
> > > >         at
> > > > org.apache.hadoop.security.UserGroupInformation.doAs(
> > > UserGroupInformation.java:1698)
> > > >
> > > >         at org.apache.hadoop.ipc.Server$
> Handler.run(Server.java:2043)
> > > >
> > > > : {}
> > > >
> > > > org.apache.nifi.processor.exception.ProcessException: IOException
> > thrown
> > > > from PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec5aa]:
> > > > org.apache.hadoop.ipc.Re
> > > >
> > > > moteException(org.apache.hadoop.hdfs.protocol.
> > > AlreadyBeingCreatedException):
> > > > Failed to APPEND_FILE /nifi_out/unmatched/log20160930.csv for
> > > DFSClient_NON
> > > >
> > > > MAPREDUCE_-1411681085_97 on 10.128.0.7 because
> > > > DFSClient_NONMAPREDUCE_-1411681085_97 is already the current lease
> > > holder.
> > > >
> > > >         at
> > > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> > > recoverLeaseInternal(FSNamesystem.java:2882)
> > > >
> > > >         at
> > > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> appendFileInternal(
> > > FSNamesystem.java:2683)
> > > >
> > > >         at
> > > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> > > appendFileInt(FSNamesystem.java:2982)
> > > >
> > > >         at
> > > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> > > appendFile(FSNamesystem.java:2950)
> > > >
> > > >         at
> > > > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.
> > > append(NameNodeRpcServer.java:655)
> > > >
> > > >         at
> > > > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSi
> > > deTranslatorPB.append(ClientNamenodeProtocolServerSi
> > > deTranslatorPB.java:421)
> > > >
> > > >         at
> > > > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$
> > >
> > ClientNamenodeProtocol$2.callBlockingMethod(
> ClientNamenodeProtocolProtos.j
> > > >
> > > > ava)
> > > >
> > > >         at
> > > > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$
> ProtoBufRpcInvoker.call(
> > > ProtobufRpcEngine.java:616)
> > > >
> > > >         at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
> > > >
> > > >         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:
> 2049)
> > > >
> > > >         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:
> 2045)
> > > >
> > > >         at java.security.AccessController.doPrivileged(Native
> Method)
> > > >
> > > >         at javax.security.auth.Subject.doAs(Subject.java:422)
> > > >
> > > >         at
> > > > org.apache.hadoop.security.UserGroupInformation.doAs(
> > > UserGroupInformation.java:1698)
> > > >
> > > >         at org.apache.hadoop.ipc.Server$
> Handler.run(Server.java:2043)
> > > >
> > > >
> > > >         at
> > > > org.apache.nifi.controller.repository.StandardProcessSession.read(
> > > StandardProcessSession.java:2148)
> > > >
> > > >         at
> > > > org.apache.nifi.controller.repository.StandardProcessSession.read(
> > > StandardProcessSession.java:2095)
> > > >
> > > >         at org.apache.nifi.processors.hadoop.PutHDFS$1.run(PutHDFS.
> > > java:293)
> > > >
> > > >         at java.security.AccessController.doPrivileged(Native
> Method)
> > > >
> > > >         at javax.security.auth.Subject.doAs(Subject.java:360)
> > > >
> > > >         at
> > > > org.apache.hadoop.security.UserGroupInformation.doAs(
> > > UserGroupInformation.java:1678)
> > > >
> > > >         at
> > > > org.apache.nifi.processors.hadoop.PutHDFS.onTrigger(
> PutHDFS.java:223)
> > > >
> > > >         at
> > > > org.apache.nifi.processor.AbstractProcessor.onTrigger(
> > > AbstractProcessor.java:27)
> > > >
> > > >         at
> > > > org.apache.nifi.controller.StandardProcessorNode.onTrigger(
> > > StandardProcessorNode.java:1118)
> > > >
> > > >         at
> > > > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(
> > > ContinuallyRunProcessorTask.java:144)
> > > >
> > > >         at
> > > > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(
> > > ContinuallyRunProcessorTask.java:47)
> > > >
> > > >         at
> > > > org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.
> run(
> > > TimerDrivenSchedulingAgent.java:132)
> > > >
> > > >         at
> > > > java.util.concurrent.Executors$RunnableAdapter.
> call(Executors.java:511)
> > > >
> > > >         at java.util.concurrent.FutureTask.runAndReset(
> > > FutureTask.java:308)
> > > >
> > > >         at
> > > > java.util.concurrent.ScheduledThreadPoolExecutor$
> > > ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> > > >
> > > >         at
> > > > java.util.concurrent.ScheduledThreadPoolExecutor$
> > > ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> > > >
> > > >         at
> > > > java.util.concurrent.ThreadPoolExecutor.runWorker(
> > > ThreadPoolExecutor.java:1142)
> > > >
> > > >         at
> > > > java.util.concurrent.ThreadPoolExecutor$Worker.run(
> > > ThreadPoolExecutor.java:617)
> > > >
> > > >         at java.lang.Thread.run(Thread.java:748)
> > > >
> > > > Caused by: org.apache.hadoop.ipc.RemoteException: Failed to
> APPEND_FILE
> > > > /nifi_out/unmatched/log20160930.csv for
> > > > DFSClient_NONMAPREDUCE_-1411681085_97 on 10.128.0.7 because
> > > > DFSClient_NONMAPREDUCE_-1411681085_97 is already the current lease
> > > holder.
> > > >
> > > >         at
> > > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> > > recoverLeaseInternal(FSNamesystem.java:2882)
> > > >
> > > >         at
> > > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> appendFileInternal(
> > > FSNamesystem.java:2683)
> > > >
> > > >         at
> > > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> > > appendFileInt(FSNamesystem.java:2982)
> > > >
> > > >         at
> > > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> > > appendFile(FSNamesystem.java:2950)
> > > >
> > > >         at
> > > > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.
> > > append(NameNodeRpcServer.java:655)
> > > >
> > > >         at
> > > > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSi
> > > deTranslatorPB.append(ClientNamenodeProtocolServerSi
> > > deTranslatorPB.java:421)
> > > >
> > > >         at
> > > > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$
> > > ClientNamenodeProtocol$2.callBlockingMethod(
> ClientNamenodeProtocolProtos.
> > > java)
> > > >
> > > >         at
> > > > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$
> ProtoBufRpcInvoker.call(
> > > ProtobufRpcEngine.java:616)
> > > >
> > > >         at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
> > > >
> > > >         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:
> 2049)
> > > >
> > > >         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:
> 2045)
> > > >
> > > >         at java.security.AccessController.doPrivileged(Native
> Method)
> > > >
> > > >         at javax.security.auth.Subject.doAs(Subject.java:422)
> > > >
> > > >         at
> > > > org.apache.hadoop.security.UserGroupInformation.doAs(
> > > UserGroupInformation.java:1698)
> > > >
> > > >         at org.apache.hadoop.ipc.Server$
> Handler.run(Server.java:2043)
> > > >
> > > >
> > > >         at org.apache.hadoop.ipc.Client.call(Client.java:1475)
> > > >
> > > >         at org.apache.hadoop.ipc.Client.call(Client.java:1412)
> > > >
> > > >         at
> > > > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.
> > > invoke(ProtobufRpcEngine.java:229)
> > > >
> > > >         at com.sun.proxy.$Proxy188.append(Unknown Source)
> > > >
> > > >         at
> > > > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslat
> > > orPB.append(ClientNamenodeProtocolTranslatorPB.java:328)
> > > >
> > > >         at sun.reflect.GeneratedMethodAccessor314.invoke(Unknown
> > Source)
> > > >
> > > >         at
> > > > sun.reflect.DelegatingMethodAccessorImpl.invoke(
> > > DelegatingMethodAccessorImpl.java:43)
> > > >
> > > >         at java.lang.reflect.Method.invoke(Method.java:498)
> > > >
> > > >         at
> > > > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(
> > > RetryInvocationHandler.java:191)
> > > >
> > > >         at
> > > > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(
> > > RetryInvocationHandler.java:102)
> > > >
> > > >         at com.sun.proxy.$Proxy194.append(Unknown Source)
> > > >
> > > >         at org.apache.hadoop.hdfs.DFSClient.callAppend(
> > > DFSClient.java:1808)
> > > >
> > > >         at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.
> java:1877)
> > > >
> > > >         at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.
> java:1847)
> > > >
> > > >         at
> > > > org.apache.hadoop.hdfs.DistributedFileSystem$4.
> > > doCall(DistributedFileSystem.java:340)
> > > >
> > > >         at
> > > > org.apache.hadoop.hdfs.DistributedFileSystem$4.
> > > doCall(DistributedFileSystem.java:336)
> > > >
> > > >         at
> > > > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(
> > > FileSystemLinkResolver.java:81)
> > > >
> > > >         at
> > > > org.apache.hadoop.hdfs.DistributedFileSystem.append(
> > > DistributedFileSystem.java:348)
> > > >
> > > >         at
> > > > org.apache.hadoop.hdfs.DistributedFileSystem.append(
> > > DistributedFileSystem.java:318)
> > > >
> > > >         at org.apache.hadoop.fs.FileSystem.append(FileSystem.
> java:1176)
> > > >
> > > >         at
> > > > org.apache.nifi.processors.hadoop.PutHDFS$1$1.process(
> PutHDFS.java:301)
> > > >
> > > >         at
> > > > org.apache.nifi.controller.repository.StandardProcessSession.read(
> > > StandardProcessSession.java:2125)
> > > >
> > > >         ... 18 common frames omitted
> > > >
> > > > On Wed, May 31, 2017 at 10:29 AM, Koji Kawamura <
> > ijokarumawak@gmail.com>
> > > > wrote:
> > > >
> > > >> Hi Martin,
> > > >>
> > > >> Generally, NiFi processor doesn't load entire content of file and is
> > > >> capable of handling huge files.
> > > >> However, having massive amount of FlowFiles can cause OOM issue as
> > > >> FlowFiles and its Attributes resides on heap.
> > > >>
> > > >> I assume you are using 'Line Split Count' as 1 at SplitText.
> > > >> We recommend to use multiple SplitText processors to not generate
> many
> > > >> FlowFiles in a short period of time.
> > > >> For example, 1st SplitText splits files per 5,000 lines, then the
> 2nd
> > > >> SplitText splits into each line.
> > > >> This way, we can decrease number of FlowFiles at a given time
> > > >> requiring less heap.
> > > >>
> > > >> I hope this helps.
> > > >>
> > > >> Thanks,
> > > >> Koji
> > > >>
> > > >> On Wed, May 31, 2017 at 6:20 PM, Martin Eden <
> martineden131@gmail.com
> > >
> > > >> wrote:
> > > >> > Hi all,
> > > >> >
> > > >> > I have a vanilla Nifi 1.2.0 node with 1GB of heap.
> > > >> >
> > > >> > The flow I am trying to run is:
> > > >> > ListHDFS -> FetchHDFS -> SplitText -> RouteOnContent ->
> MergeContent
> > > ->
> > > >> > PutHDFS
> > > >> >
> > > >> > When I give it a 300MB input zip file (2.5GB uncompressed) I am
> > > getting
> > > >> > Java OutOfMemoryError as below.
> > > >> >
> > > >> > Does NiFi read in the entire contents of files in memory? This is
> > > >> > unexpected. I thought it is chunking through files. Giving more
> ram
> > is
> > > >> not
> > > >> > a solution as you can always get larger input files in the future.
> > > >> >
> > > >> > Does this mean NiFi is not suitable as a scalable ETL solution?
> > > >> >
> > > >> > Can someone please explain what is happening and how to mitigate
> > large
> > > >> > files in NiFi? Any patterns?
> > > >> >
> > > >> > Thanks,
> > > >> > M
> > > >> >
> > > >> > ERROR [Timer-Driven Process Thread-9]
> > > >> > o.a.nifi.processors.standard.SplitText
> > > >> > SplitText[id=e16939ca-f28f-1178-b66e-054e43a0a724]
> > > >> > SplitText[id=e16939ca-f28f-1178-b66e-054e43a0a724] failed to
> process
> > > >> > session due to java.lang.OutOfMemoryError: Java heap space: {}
> > > >> >
> > > >> > java.lang.OutOfMemoryError: Java heap space
> > > >> >
> > > >> >         at java.util.HashMap$EntrySet.iterator(HashMap.java:1013)
> > > >> >
> > > >> >         at java.util.HashMap.putMapEntries(HashMap.java:511)
> > > >> >
> > > >> >         at java.util.HashMap.<init>(HashMap.java:489)
> > > >> >
> > > >> >         at
> > > >> > org.apache.nifi.controller.repository.StandardFlowFileRecord$
> > > >> Builder.initializeAttributes(StandardFlowFileRecord.java:219)
> > > >> >
> > > >> >         at
> > > >> > org.apache.nifi.controller.repository.StandardFlowFileRecord$
> > > >> Builder.addAttributes(StandardFlowFileRecord.java:234)
> > > >> >
> > > >> >         at
> > > >> > org.apache.nifi.controller.repository.StandardProcessSession.
> > > >> putAllAttributes(StandardProcessSession.java:1723)
> > > >> >
> > > >> >         at
> > > >> > org.apache.nifi.processors.standard.SplitText.
> > > >> updateAttributes(SplitText.java:367)
> > > >> >
> > > >> >         at
> > > >> >
> > org.apache.nifi.processors.standard.SplitText.generateSplitFlowFiles(
> > > >> SplitText.java:320)
> > > >> >
> > > >> >         at
> > > >> > org.apache.nifi.processors.standard.SplitText.onTrigger(
> > > >> SplitText.java:258)
> > > >> >
> > > >> >         at
> > > >> > org.apache.nifi.processor.AbstractProcessor.onTrigger(
> > > >> AbstractProcessor.java:27)
> > > >> >
> > > >> >         at
> > > >> > org.apache.nifi.controller.StandardProcessorNode.onTrigger(
> > > >> StandardProcessorNode.java:1118)
> > > >> >
> > > >> >         at
> > > >> > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.
> call(
> > > >> ContinuallyRunProcessorTask.java:144)
> > > >> >
> > > >> >         at
> > > >> > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.
> call(
> > > >> ContinuallyRunProcessorTask.java:47)
> > > >> >
> > > >> >         at
> > > >> > org.apache.nifi.controller.scheduling.
> TimerDrivenSchedulingAgent$1.
> > > run(
> > > >> TimerDrivenSchedulingAgent.java:132)
> > > >> >
> > > >> >         at
> > > >> > java.util.concurrent.Executors$RunnableAdapter.
> > > call(Executors.java:511)
> > > >> >
> > > >> >         at java.util.concurrent.FutureTask.runAndReset(
> > > >> FutureTask.java:308)
> > > >> >
> > > >> >         at
> > > >> > java.util.concurrent.ScheduledThreadPoolExecutor$
> > > >> ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.
> java:180)
> > > >> >
> > > >> >         at
> > > >> > java.util.concurrent.ScheduledThreadPoolExecutor$
> > > >> ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> > > >> >
> > > >> >         at
> > > >> > java.util.concurrent.ThreadPoolExecutor.runWorker(
> > > >> ThreadPoolExecutor.java:1142)
> > > >> >
> > > >> >         at
> > > >> > java.util.concurrent.ThreadPoolExecutor$Worker.run(
> > > >> ThreadPoolExecutor.java:617)
> > > >> >
> > > >> >         at java.lang.Thread.run(Thread.java:748)
> > > >>
> > >
> >
>

Re: SplitText processor OOM larger input files

Posted by Andrew Grande <ap...@gmail.com>.
It looks like your max bin size is 1000 and 10MB. Every time you hit those,
it will write out a merged file. Update tge filename attribute to be unique
before writing via PutHDFS.

Andrew

On Thu, Jun 1, 2017, 2:24 AM Martin Eden <ma...@gmail.com> wrote:

> Hi Joe,
>
> Thanks for the explanations. Really useful in understanding how it works.
> Good to know that in the future this will be improved.
>
> About the appending to HDFS issue let me recap. My flow is:
> ListHDFS -> FetchHDFS -> UnpackContent -> SplitText(5000) -> SplitText(1)
> -> RouteOnContent -> MergeContent -> PutHDFS -> hdfs://dir1/f.csv
>
>
>     -> MergeContent -> PutHDFS -> hdfs://dir2/f.csv
>
>
>     -> MergeContent -> PutHDFS -> hdfs://dir3/f.csv
>
> ListHDFS is monitoring an input folder where 300MB zip files are added
> periodically. Each file uncompressed is 2.5 GB csv.
>
> So I am writing out to hdfs from multiple PutHDFS processors all of them
> having conflict resolution set to *APPEND* and different output folders.
>
> The name of the file will be however the same *f.csv*. It gets picked up
> from the name of the flow files which bear the name of the original
> uncompressed file. This happens I think in the MergeContent processor.
>
> Since all of these processors are running with 1 concurrent task, it seems
> that we cannot append concurrently to hdfs even if we are appending to
> different files in different folders for some reason. Any ideas how to
> mitigate this?
>
> It seems other people have encountered this
> <
> https://community.hortonworks.com/questions/61096/puthdfs-leaseexpiredexception-error-when-running-m.html
> >
> with NiFi but there is no conclusive solution. It does seem also that
> appending to hdfs is somewhat problematic
> <
> http://community.cloudera.com/t5/Storage-Random-Access-HDFS/How-to-append-files-to-HDFS-with-Java-quot-current-leaseholder/td-p/41369
> >
> .
>
> So stepping back, the reason I am doing append in the PutHDFS is because I
> did not manage to find a setting in the MergeContent processors that
> basically allows creation of multiple bundled flow files with the same root
> name but different sequence numbers or timestamps (like f.csv.1, f.csv.2
> ....). They all get the same name which is f.csv. Is that possible somehow?
> See my detailed MergeContent processor config below.
>
> So basically I have a 2.5GB csv file that eventually gets broken up in
> lines and the lines gets merged together in bundles of 10 MB but when those
> bundles are emitted to the PutHDFS they have the same name as the original
> file over and over again. I would like them to have a different name based
> on a timestamp or sequence number let's say so that I can avoid the append
> conflict resolution in PutHDFS which is causing me grief right now. Is that
> possible?
>
> Thanks,
> M
>
>
> Currently my MergeContent processor config is:
>   <properties>
> *   <entry> <key>Merge Strategy</key> <value>Bin-Packing Algorithm</value>
> </entry>*
> *   <entry> <key>Merge Format</key> <value>Binary Concatenation</value>
> </entry>*
>    <entry> <key>Attribute Strategy</key><value>Keep Only Common
> Attributes</value> </entry>
>    <entry> <key>Correlation Attribute Name</key> </entry>
>    <entry> <key>Minimum Number of Entries</key><value>1</value> </entry>
>    <entry> <key>Maximum Number of Entries</key> <value>1000</value>
> </entry>
>    <entry> <key>Minimum Group Size</key> <value>0 B</value> </entry>
> *   <entry> <key>Maximum Group Size</key> <value>10 MB</value> </entry>*
>    <entry> <key>Max Bin Age</key> </entry>
>    <entry> <key>Maximum number of Bins</key> <value>5</value> </entry>
>    <entry> <key>Delimiter Strategy</key><value>Text</value> </entry>
>    <entry> <key>Header File</key> </entry>
>    <entry> <key>Footer File</key> </entry>
>    <entry> <key>Demarcator File</key> <value></value> </entry>
>    <entry> <key>Compression Level</key> <value>1</value></entry>
>    <entry> <key>Keep Path</key> <value>false</value> </entry>
>   </properties>
>
>
> On Wed, May 31, 2017 at 3:52 PM, Joe Witt <jo...@gmail.com> wrote:
>
> > Split failed before even with backpressure:
> > - yes that backpressure kicks in when destination queues for a given
> > processor have reached their target size (in count of flowfiles or
> > total size represented).  However, to clarify why the OOM happened it
> > is important to realize that it is not about 'flow files over a quick
> > period of time' but rather 'flow files held within a single process
> > session.  Your SplitText was pulling a single flowfile but then
> > creating lets say 1,000,000 resulting flow files and then committing
> > that change.  That happens within a session.  But all those flow file
> > objects (not their content) are held in memory and at such high
> > numbers it creates excessive heap usage.  The two phase divide/conquer
> > approach Koji suggested solves that and eventually we need to solve
> > that by swapping out the flowfiles to disk within a session.  We
> > actually do swap out flowfiles sitting on queues after a certain
> > threshold is reached for this very reason.  This means you should be
> > able to have many millions of flowfiles sitting around in the flow for
> > whatever reason and not hit memory problems.
> >
> > Hope that helps there.
> >
> > On PutHDFS it looks like possibly two things are trying to append to
> > the same file?  If yes I'd really recommend not appending but rather
> > use MergeContent to create data bundles of a given size then write
> > those to HDFS.
> >
> > Thanks
> > Joe
> >
> > On Wed, May 31, 2017 at 10:33 AM, Martin Eden <ma...@gmail.com>
> > wrote:
> > > Hi Koji,
> > >
> > > Good to know that it can handle large files. I thought it was the case
> > but
> > > I was just not seeing in practice.
> > >
> > > Yes I am using 'Line Split Count' as 1 at SplitText.
> > >
> > > I added the extra SplitText processor exactly as you suggested and the
> > OOM
> > > went away. So, big thanks!!!
> > >
> > > However I have 2 follow-up questions:
> > >
> > > 1. Before adding the extra SplitText processor I also played with the
> > > back-pressure settings on the outbound queue of the original SplitText
> > > processor, since you mentioned that it is generating files at a rate
> that
> > > is too high, I figure the queue should slow it down. I tried a limit of
> > > 100MB or 1000 files and I still got the OOMs in the SplitText
> processor.
> > > Why isn't the queue back-pressure helping me in this case? Where would
> > that
> > > come in handy then? Why id the extra SplitText processor needed to fix
> > > things and not just the queue back-pressure?
> > >
> > > 2. I am now close to completing my flow but I am hitting another error.
> > > This time it's the last stage, the PutHDFS throws
> > > o.apache.nifi.processors.hadoop.PutHDFS
> > > PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec5aa] Failed to write to
> HDFS
> > > due to org.apache.nifi.processor.exception.ProcessException:
> IOException
> > > thrown from PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec
> > > See the full stacktrace below.
> > > I have a parallelism of 1 for my PutHDFS processors. Any ideas why this
> > is
> > > happening?
> > >
> > > Thanks,
> > > Martin
> > >
> > > 2017-05-31 13:50:29,341 ERROR [Timer-Driven Process Thread-5]
> > > o.apache.nifi.processors.hadoop.PutHDFS
> > > PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec5aa] F
> > >
> > > ailed to write to HDFS due to
> > > org.apache.nifi.processor.exception.ProcessException: IOException
> thrown
> > > from PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec
> > >
> > > 5aa]:
> > > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.
> > AlreadyBeingCreatedException):
> > > Failed to APPEND_FILE /nifi_out/unmatched/log
> > >
> > > 20160930.csv for DFSClient_NONMAPREDUCE_-1411681085_97 on 10.128.0.7
> > > because DFSClient_NONMAPREDUCE_-1411681085_97 is already the current
> > lease
> > > holder.
> > >
> > >         at
> > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> > recoverLeaseInternal(FSNamesystem.java:2882)
> > >
> > >         at
> > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInternal(
> > FSNamesystem.java:2683)
> > >
> > >         at
> > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> > appendFileInt(FSNamesystem.java:2982)
> > >
> > >         at
> > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> > appendFile(FSNamesystem.java:2950)
> > >
> > >         at
> > > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.
> > append(NameNodeRpcServer.java:655)
> > >
> > >         at
> > > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSi
> > deTranslatorPB.append(ClientNamenodeProtocolServerSi
> > deTranslatorPB.java:421)
> > >
> > >         at
> > > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$
> >
> ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.j
> > >
> > > ava)
> > >
> > >         at
> > > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(
> > ProtobufRpcEngine.java:616)
> > >
> > >         at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
> > >
> > >         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
> > >
> > >         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
> > >
> > >         at java.security.AccessController.doPrivileged(Native Method)
> > >
> > >         at javax.security.auth.Subject.doAs(Subject.java:422)
> > >
> > >         at
> > > org.apache.hadoop.security.UserGroupInformation.doAs(
> > UserGroupInformation.java:1698)
> > >
> > >         at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)
> > >
> > > : {}
> > >
> > > org.apache.nifi.processor.exception.ProcessException: IOException
> thrown
> > > from PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec5aa]:
> > > org.apache.hadoop.ipc.Re
> > >
> > > moteException(org.apache.hadoop.hdfs.protocol.
> > AlreadyBeingCreatedException):
> > > Failed to APPEND_FILE /nifi_out/unmatched/log20160930.csv for
> > DFSClient_NON
> > >
> > > MAPREDUCE_-1411681085_97 on 10.128.0.7 because
> > > DFSClient_NONMAPREDUCE_-1411681085_97 is already the current lease
> > holder.
> > >
> > >         at
> > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> > recoverLeaseInternal(FSNamesystem.java:2882)
> > >
> > >         at
> > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInternal(
> > FSNamesystem.java:2683)
> > >
> > >         at
> > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> > appendFileInt(FSNamesystem.java:2982)
> > >
> > >         at
> > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> > appendFile(FSNamesystem.java:2950)
> > >
> > >         at
> > > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.
> > append(NameNodeRpcServer.java:655)
> > >
> > >         at
> > > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSi
> > deTranslatorPB.append(ClientNamenodeProtocolServerSi
> > deTranslatorPB.java:421)
> > >
> > >         at
> > > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$
> >
> ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.j
> > >
> > > ava)
> > >
> > >         at
> > > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(
> > ProtobufRpcEngine.java:616)
> > >
> > >         at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
> > >
> > >         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
> > >
> > >         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
> > >
> > >         at java.security.AccessController.doPrivileged(Native Method)
> > >
> > >         at javax.security.auth.Subject.doAs(Subject.java:422)
> > >
> > >         at
> > > org.apache.hadoop.security.UserGroupInformation.doAs(
> > UserGroupInformation.java:1698)
> > >
> > >         at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)
> > >
> > >
> > >         at
> > > org.apache.nifi.controller.repository.StandardProcessSession.read(
> > StandardProcessSession.java:2148)
> > >
> > >         at
> > > org.apache.nifi.controller.repository.StandardProcessSession.read(
> > StandardProcessSession.java:2095)
> > >
> > >         at org.apache.nifi.processors.hadoop.PutHDFS$1.run(PutHDFS.
> > java:293)
> > >
> > >         at java.security.AccessController.doPrivileged(Native Method)
> > >
> > >         at javax.security.auth.Subject.doAs(Subject.java:360)
> > >
> > >         at
> > > org.apache.hadoop.security.UserGroupInformation.doAs(
> > UserGroupInformation.java:1678)
> > >
> > >         at
> > > org.apache.nifi.processors.hadoop.PutHDFS.onTrigger(PutHDFS.java:223)
> > >
> > >         at
> > > org.apache.nifi.processor.AbstractProcessor.onTrigger(
> > AbstractProcessor.java:27)
> > >
> > >         at
> > > org.apache.nifi.controller.StandardProcessorNode.onTrigger(
> > StandardProcessorNode.java:1118)
> > >
> > >         at
> > > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(
> > ContinuallyRunProcessorTask.java:144)
> > >
> > >         at
> > > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(
> > ContinuallyRunProcessorTask.java:47)
> > >
> > >         at
> > > org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(
> > TimerDrivenSchedulingAgent.java:132)
> > >
> > >         at
> > > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> > >
> > >         at java.util.concurrent.FutureTask.runAndReset(
> > FutureTask.java:308)
> > >
> > >         at
> > > java.util.concurrent.ScheduledThreadPoolExecutor$
> > ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> > >
> > >         at
> > > java.util.concurrent.ScheduledThreadPoolExecutor$
> > ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> > >
> > >         at
> > > java.util.concurrent.ThreadPoolExecutor.runWorker(
> > ThreadPoolExecutor.java:1142)
> > >
> > >         at
> > > java.util.concurrent.ThreadPoolExecutor$Worker.run(
> > ThreadPoolExecutor.java:617)
> > >
> > >         at java.lang.Thread.run(Thread.java:748)
> > >
> > > Caused by: org.apache.hadoop.ipc.RemoteException: Failed to APPEND_FILE
> > > /nifi_out/unmatched/log20160930.csv for
> > > DFSClient_NONMAPREDUCE_-1411681085_97 on 10.128.0.7 because
> > > DFSClient_NONMAPREDUCE_-1411681085_97 is already the current lease
> > holder.
> > >
> > >         at
> > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> > recoverLeaseInternal(FSNamesystem.java:2882)
> > >
> > >         at
> > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInternal(
> > FSNamesystem.java:2683)
> > >
> > >         at
> > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> > appendFileInt(FSNamesystem.java:2982)
> > >
> > >         at
> > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> > appendFile(FSNamesystem.java:2950)
> > >
> > >         at
> > > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.
> > append(NameNodeRpcServer.java:655)
> > >
> > >         at
> > > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSi
> > deTranslatorPB.append(ClientNamenodeProtocolServerSi
> > deTranslatorPB.java:421)
> > >
> > >         at
> > > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$
> > ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.
> > java)
> > >
> > >         at
> > > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(
> > ProtobufRpcEngine.java:616)
> > >
> > >         at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
> > >
> > >         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
> > >
> > >         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
> > >
> > >         at java.security.AccessController.doPrivileged(Native Method)
> > >
> > >         at javax.security.auth.Subject.doAs(Subject.java:422)
> > >
> > >         at
> > > org.apache.hadoop.security.UserGroupInformation.doAs(
> > UserGroupInformation.java:1698)
> > >
> > >         at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)
> > >
> > >
> > >         at org.apache.hadoop.ipc.Client.call(Client.java:1475)
> > >
> > >         at org.apache.hadoop.ipc.Client.call(Client.java:1412)
> > >
> > >         at
> > > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.
> > invoke(ProtobufRpcEngine.java:229)
> > >
> > >         at com.sun.proxy.$Proxy188.append(Unknown Source)
> > >
> > >         at
> > > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslat
> > orPB.append(ClientNamenodeProtocolTranslatorPB.java:328)
> > >
> > >         at sun.reflect.GeneratedMethodAccessor314.invoke(Unknown
> Source)
> > >
> > >         at
> > > sun.reflect.DelegatingMethodAccessorImpl.invoke(
> > DelegatingMethodAccessorImpl.java:43)
> > >
> > >         at java.lang.reflect.Method.invoke(Method.java:498)
> > >
> > >         at
> > > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(
> > RetryInvocationHandler.java:191)
> > >
> > >         at
> > > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(
> > RetryInvocationHandler.java:102)
> > >
> > >         at com.sun.proxy.$Proxy194.append(Unknown Source)
> > >
> > >         at org.apache.hadoop.hdfs.DFSClient.callAppend(
> > DFSClient.java:1808)
> > >
> > >         at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1877)
> > >
> > >         at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1847)
> > >
> > >         at
> > > org.apache.hadoop.hdfs.DistributedFileSystem$4.
> > doCall(DistributedFileSystem.java:340)
> > >
> > >         at
> > > org.apache.hadoop.hdfs.DistributedFileSystem$4.
> > doCall(DistributedFileSystem.java:336)
> > >
> > >         at
> > > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(
> > FileSystemLinkResolver.java:81)
> > >
> > >         at
> > > org.apache.hadoop.hdfs.DistributedFileSystem.append(
> > DistributedFileSystem.java:348)
> > >
> > >         at
> > > org.apache.hadoop.hdfs.DistributedFileSystem.append(
> > DistributedFileSystem.java:318)
> > >
> > >         at org.apache.hadoop.fs.FileSystem.append(FileSystem.java:1176)
> > >
> > >         at
> > > org.apache.nifi.processors.hadoop.PutHDFS$1$1.process(PutHDFS.java:301)
> > >
> > >         at
> > > org.apache.nifi.controller.repository.StandardProcessSession.read(
> > StandardProcessSession.java:2125)
> > >
> > >         ... 18 common frames omitted
> > >
> > > On Wed, May 31, 2017 at 10:29 AM, Koji Kawamura <
> ijokarumawak@gmail.com>
> > > wrote:
> > >
> > >> Hi Martin,
> > >>
> > >> Generally, NiFi processor doesn't load entire content of file and is
> > >> capable of handling huge files.
> > >> However, having massive amount of FlowFiles can cause OOM issue as
> > >> FlowFiles and its Attributes resides on heap.
> > >>
> > >> I assume you are using 'Line Split Count' as 1 at SplitText.
> > >> We recommend to use multiple SplitText processors to not generate many
> > >> FlowFiles in a short period of time.
> > >> For example, 1st SplitText splits files per 5,000 lines, then the 2nd
> > >> SplitText splits into each line.
> > >> This way, we can decrease number of FlowFiles at a given time
> > >> requiring less heap.
> > >>
> > >> I hope this helps.
> > >>
> > >> Thanks,
> > >> Koji
> > >>
> > >> On Wed, May 31, 2017 at 6:20 PM, Martin Eden <martineden131@gmail.com
> >
> > >> wrote:
> > >> > Hi all,
> > >> >
> > >> > I have a vanilla Nifi 1.2.0 node with 1GB of heap.
> > >> >
> > >> > The flow I am trying to run is:
> > >> > ListHDFS -> FetchHDFS -> SplitText -> RouteOnContent -> MergeContent
> > ->
> > >> > PutHDFS
> > >> >
> > >> > When I give it a 300MB input zip file (2.5GB uncompressed) I am
> > getting
> > >> > Java OutOfMemoryError as below.
> > >> >
> > >> > Does NiFi read in the entire contents of files in memory? This is
> > >> > unexpected. I thought it is chunking through files. Giving more ram
> is
> > >> not
> > >> > a solution as you can always get larger input files in the future.
> > >> >
> > >> > Does this mean NiFi is not suitable as a scalable ETL solution?
> > >> >
> > >> > Can someone please explain what is happening and how to mitigate
> large
> > >> > files in NiFi? Any patterns?
> > >> >
> > >> > Thanks,
> > >> > M
> > >> >
> > >> > ERROR [Timer-Driven Process Thread-9]
> > >> > o.a.nifi.processors.standard.SplitText
> > >> > SplitText[id=e16939ca-f28f-1178-b66e-054e43a0a724]
> > >> > SplitText[id=e16939ca-f28f-1178-b66e-054e43a0a724] failed to process
> > >> > session due to java.lang.OutOfMemoryError: Java heap space: {}
> > >> >
> > >> > java.lang.OutOfMemoryError: Java heap space
> > >> >
> > >> >         at java.util.HashMap$EntrySet.iterator(HashMap.java:1013)
> > >> >
> > >> >         at java.util.HashMap.putMapEntries(HashMap.java:511)
> > >> >
> > >> >         at java.util.HashMap.<init>(HashMap.java:489)
> > >> >
> > >> >         at
> > >> > org.apache.nifi.controller.repository.StandardFlowFileRecord$
> > >> Builder.initializeAttributes(StandardFlowFileRecord.java:219)
> > >> >
> > >> >         at
> > >> > org.apache.nifi.controller.repository.StandardFlowFileRecord$
> > >> Builder.addAttributes(StandardFlowFileRecord.java:234)
> > >> >
> > >> >         at
> > >> > org.apache.nifi.controller.repository.StandardProcessSession.
> > >> putAllAttributes(StandardProcessSession.java:1723)
> > >> >
> > >> >         at
> > >> > org.apache.nifi.processors.standard.SplitText.
> > >> updateAttributes(SplitText.java:367)
> > >> >
> > >> >         at
> > >> >
> org.apache.nifi.processors.standard.SplitText.generateSplitFlowFiles(
> > >> SplitText.java:320)
> > >> >
> > >> >         at
> > >> > org.apache.nifi.processors.standard.SplitText.onTrigger(
> > >> SplitText.java:258)
> > >> >
> > >> >         at
> > >> > org.apache.nifi.processor.AbstractProcessor.onTrigger(
> > >> AbstractProcessor.java:27)
> > >> >
> > >> >         at
> > >> > org.apache.nifi.controller.StandardProcessorNode.onTrigger(
> > >> StandardProcessorNode.java:1118)
> > >> >
> > >> >         at
> > >> > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(
> > >> ContinuallyRunProcessorTask.java:144)
> > >> >
> > >> >         at
> > >> > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(
> > >> ContinuallyRunProcessorTask.java:47)
> > >> >
> > >> >         at
> > >> > org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.
> > run(
> > >> TimerDrivenSchedulingAgent.java:132)
> > >> >
> > >> >         at
> > >> > java.util.concurrent.Executors$RunnableAdapter.
> > call(Executors.java:511)
> > >> >
> > >> >         at java.util.concurrent.FutureTask.runAndReset(
> > >> FutureTask.java:308)
> > >> >
> > >> >         at
> > >> > java.util.concurrent.ScheduledThreadPoolExecutor$
> > >> ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> > >> >
> > >> >         at
> > >> > java.util.concurrent.ScheduledThreadPoolExecutor$
> > >> ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> > >> >
> > >> >         at
> > >> > java.util.concurrent.ThreadPoolExecutor.runWorker(
> > >> ThreadPoolExecutor.java:1142)
> > >> >
> > >> >         at
> > >> > java.util.concurrent.ThreadPoolExecutor$Worker.run(
> > >> ThreadPoolExecutor.java:617)
> > >> >
> > >> >         at java.lang.Thread.run(Thread.java:748)
> > >>
> >
>

Re: SplitText processor OOM larger input files

Posted by Martin Eden <ma...@gmail.com>.
Hi Joe,

Thanks for the explanations. Really useful in understanding how it works.
Good to know that in the future this will be improved.

About the appending to HDFS issue let me recap. My flow is:
ListHDFS -> FetchHDFS -> UnpackContent -> SplitText(5000) -> SplitText(1)
-> RouteOnContent -> MergeContent -> PutHDFS -> hdfs://dir1/f.csv


    -> MergeContent -> PutHDFS -> hdfs://dir2/f.csv


    -> MergeContent -> PutHDFS -> hdfs://dir3/f.csv

ListHDFS is monitoring an input folder where 300MB zip files are added
periodically. Each file uncompressed is 2.5 GB csv.

So I am writing out to hdfs from multiple PutHDFS processors all of them
having conflict resolution set to *APPEND* and different output folders.

The name of the file will be however the same *f.csv*. It gets picked up
from the name of the flow files which bear the name of the original
uncompressed file. This happens I think in the MergeContent processor.

Since all of these processors are running with 1 concurrent task, it seems
that we cannot append concurrently to hdfs even if we are appending to
different files in different folders for some reason. Any ideas how to
mitigate this?

It seems other people have encountered this
<https://community.hortonworks.com/questions/61096/puthdfs-leaseexpiredexception-error-when-running-m.html>
with NiFi but there is no conclusive solution. It does seem also that
appending to hdfs is somewhat problematic
<http://community.cloudera.com/t5/Storage-Random-Access-HDFS/How-to-append-files-to-HDFS-with-Java-quot-current-leaseholder/td-p/41369>
.

So stepping back, the reason I am doing append in the PutHDFS is because I
did not manage to find a setting in the MergeContent processors that
basically allows creation of multiple bundled flow files with the same root
name but different sequence numbers or timestamps (like f.csv.1, f.csv.2
....). They all get the same name which is f.csv. Is that possible somehow?
See my detailed MergeContent processor config below.

So basically I have a 2.5GB csv file that eventually gets broken up in
lines and the lines gets merged together in bundles of 10 MB but when those
bundles are emitted to the PutHDFS they have the same name as the original
file over and over again. I would like them to have a different name based
on a timestamp or sequence number let's say so that I can avoid the append
conflict resolution in PutHDFS which is causing me grief right now. Is that
possible?

Thanks,
M


Currently my MergeContent processor config is:
  <properties>
*   <entry> <key>Merge Strategy</key> <value>Bin-Packing Algorithm</value>
</entry>*
*   <entry> <key>Merge Format</key> <value>Binary Concatenation</value>
</entry>*
   <entry> <key>Attribute Strategy</key><value>Keep Only Common
Attributes</value> </entry>
   <entry> <key>Correlation Attribute Name</key> </entry>
   <entry> <key>Minimum Number of Entries</key><value>1</value> </entry>
   <entry> <key>Maximum Number of Entries</key> <value>1000</value> </entry>
   <entry> <key>Minimum Group Size</key> <value>0 B</value> </entry>
*   <entry> <key>Maximum Group Size</key> <value>10 MB</value> </entry>*
   <entry> <key>Max Bin Age</key> </entry>
   <entry> <key>Maximum number of Bins</key> <value>5</value> </entry>
   <entry> <key>Delimiter Strategy</key><value>Text</value> </entry>
   <entry> <key>Header File</key> </entry>
   <entry> <key>Footer File</key> </entry>
   <entry> <key>Demarcator File</key> <value></value> </entry>
   <entry> <key>Compression Level</key> <value>1</value></entry>
   <entry> <key>Keep Path</key> <value>false</value> </entry>
  </properties>


On Wed, May 31, 2017 at 3:52 PM, Joe Witt <jo...@gmail.com> wrote:

> Split failed before even with backpressure:
> - yes that backpressure kicks in when destination queues for a given
> processor have reached their target size (in count of flowfiles or
> total size represented).  However, to clarify why the OOM happened it
> is important to realize that it is not about 'flow files over a quick
> period of time' but rather 'flow files held within a single process
> session.  Your SplitText was pulling a single flowfile but then
> creating lets say 1,000,000 resulting flow files and then committing
> that change.  That happens within a session.  But all those flow file
> objects (not their content) are held in memory and at such high
> numbers it creates excessive heap usage.  The two phase divide/conquer
> approach Koji suggested solves that and eventually we need to solve
> that by swapping out the flowfiles to disk within a session.  We
> actually do swap out flowfiles sitting on queues after a certain
> threshold is reached for this very reason.  This means you should be
> able to have many millions of flowfiles sitting around in the flow for
> whatever reason and not hit memory problems.
>
> Hope that helps there.
>
> On PutHDFS it looks like possibly two things are trying to append to
> the same file?  If yes I'd really recommend not appending but rather
> use MergeContent to create data bundles of a given size then write
> those to HDFS.
>
> Thanks
> Joe
>
> On Wed, May 31, 2017 at 10:33 AM, Martin Eden <ma...@gmail.com>
> wrote:
> > Hi Koji,
> >
> > Good to know that it can handle large files. I thought it was the case
> but
> > I was just not seeing in practice.
> >
> > Yes I am using 'Line Split Count' as 1 at SplitText.
> >
> > I added the extra SplitText processor exactly as you suggested and the
> OOM
> > went away. So, big thanks!!!
> >
> > However I have 2 follow-up questions:
> >
> > 1. Before adding the extra SplitText processor I also played with the
> > back-pressure settings on the outbound queue of the original SplitText
> > processor, since you mentioned that it is generating files at a rate that
> > is too high, I figure the queue should slow it down. I tried a limit of
> > 100MB or 1000 files and I still got the OOMs in the SplitText processor.
> > Why isn't the queue back-pressure helping me in this case? Where would
> that
> > come in handy then? Why id the extra SplitText processor needed to fix
> > things and not just the queue back-pressure?
> >
> > 2. I am now close to completing my flow but I am hitting another error.
> > This time it's the last stage, the PutHDFS throws
> > o.apache.nifi.processors.hadoop.PutHDFS
> > PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec5aa] Failed to write to HDFS
> > due to org.apache.nifi.processor.exception.ProcessException: IOException
> > thrown from PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec
> > See the full stacktrace below.
> > I have a parallelism of 1 for my PutHDFS processors. Any ideas why this
> is
> > happening?
> >
> > Thanks,
> > Martin
> >
> > 2017-05-31 13:50:29,341 ERROR [Timer-Driven Process Thread-5]
> > o.apache.nifi.processors.hadoop.PutHDFS
> > PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec5aa] F
> >
> > ailed to write to HDFS due to
> > org.apache.nifi.processor.exception.ProcessException: IOException thrown
> > from PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec
> >
> > 5aa]:
> > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.
> AlreadyBeingCreatedException):
> > Failed to APPEND_FILE /nifi_out/unmatched/log
> >
> > 20160930.csv for DFSClient_NONMAPREDUCE_-1411681085_97 on 10.128.0.7
> > because DFSClient_NONMAPREDUCE_-1411681085_97 is already the current
> lease
> > holder.
> >
> >         at
> > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> recoverLeaseInternal(FSNamesystem.java:2882)
> >
> >         at
> > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInternal(
> FSNamesystem.java:2683)
> >
> >         at
> > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> appendFileInt(FSNamesystem.java:2982)
> >
> >         at
> > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> appendFile(FSNamesystem.java:2950)
> >
> >         at
> > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.
> append(NameNodeRpcServer.java:655)
> >
> >         at
> > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSi
> deTranslatorPB.append(ClientNamenodeProtocolServerSi
> deTranslatorPB.java:421)
> >
> >         at
> > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$
> ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.j
> >
> > ava)
> >
> >         at
> > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(
> ProtobufRpcEngine.java:616)
> >
> >         at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
> >
> >         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
> >
> >         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
> >
> >         at java.security.AccessController.doPrivileged(Native Method)
> >
> >         at javax.security.auth.Subject.doAs(Subject.java:422)
> >
> >         at
> > org.apache.hadoop.security.UserGroupInformation.doAs(
> UserGroupInformation.java:1698)
> >
> >         at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)
> >
> > : {}
> >
> > org.apache.nifi.processor.exception.ProcessException: IOException thrown
> > from PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec5aa]:
> > org.apache.hadoop.ipc.Re
> >
> > moteException(org.apache.hadoop.hdfs.protocol.
> AlreadyBeingCreatedException):
> > Failed to APPEND_FILE /nifi_out/unmatched/log20160930.csv for
> DFSClient_NON
> >
> > MAPREDUCE_-1411681085_97 on 10.128.0.7 because
> > DFSClient_NONMAPREDUCE_-1411681085_97 is already the current lease
> holder.
> >
> >         at
> > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> recoverLeaseInternal(FSNamesystem.java:2882)
> >
> >         at
> > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInternal(
> FSNamesystem.java:2683)
> >
> >         at
> > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> appendFileInt(FSNamesystem.java:2982)
> >
> >         at
> > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> appendFile(FSNamesystem.java:2950)
> >
> >         at
> > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.
> append(NameNodeRpcServer.java:655)
> >
> >         at
> > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSi
> deTranslatorPB.append(ClientNamenodeProtocolServerSi
> deTranslatorPB.java:421)
> >
> >         at
> > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$
> ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.j
> >
> > ava)
> >
> >         at
> > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(
> ProtobufRpcEngine.java:616)
> >
> >         at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
> >
> >         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
> >
> >         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
> >
> >         at java.security.AccessController.doPrivileged(Native Method)
> >
> >         at javax.security.auth.Subject.doAs(Subject.java:422)
> >
> >         at
> > org.apache.hadoop.security.UserGroupInformation.doAs(
> UserGroupInformation.java:1698)
> >
> >         at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)
> >
> >
> >         at
> > org.apache.nifi.controller.repository.StandardProcessSession.read(
> StandardProcessSession.java:2148)
> >
> >         at
> > org.apache.nifi.controller.repository.StandardProcessSession.read(
> StandardProcessSession.java:2095)
> >
> >         at org.apache.nifi.processors.hadoop.PutHDFS$1.run(PutHDFS.
> java:293)
> >
> >         at java.security.AccessController.doPrivileged(Native Method)
> >
> >         at javax.security.auth.Subject.doAs(Subject.java:360)
> >
> >         at
> > org.apache.hadoop.security.UserGroupInformation.doAs(
> UserGroupInformation.java:1678)
> >
> >         at
> > org.apache.nifi.processors.hadoop.PutHDFS.onTrigger(PutHDFS.java:223)
> >
> >         at
> > org.apache.nifi.processor.AbstractProcessor.onTrigger(
> AbstractProcessor.java:27)
> >
> >         at
> > org.apache.nifi.controller.StandardProcessorNode.onTrigger(
> StandardProcessorNode.java:1118)
> >
> >         at
> > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(
> ContinuallyRunProcessorTask.java:144)
> >
> >         at
> > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(
> ContinuallyRunProcessorTask.java:47)
> >
> >         at
> > org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(
> TimerDrivenSchedulingAgent.java:132)
> >
> >         at
> > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> >
> >         at java.util.concurrent.FutureTask.runAndReset(
> FutureTask.java:308)
> >
> >         at
> > java.util.concurrent.ScheduledThreadPoolExecutor$
> ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> >
> >         at
> > java.util.concurrent.ScheduledThreadPoolExecutor$
> ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> >
> >         at
> > java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> >
> >         at
> > java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> >
> >         at java.lang.Thread.run(Thread.java:748)
> >
> > Caused by: org.apache.hadoop.ipc.RemoteException: Failed to APPEND_FILE
> > /nifi_out/unmatched/log20160930.csv for
> > DFSClient_NONMAPREDUCE_-1411681085_97 on 10.128.0.7 because
> > DFSClient_NONMAPREDUCE_-1411681085_97 is already the current lease
> holder.
> >
> >         at
> > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> recoverLeaseInternal(FSNamesystem.java:2882)
> >
> >         at
> > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInternal(
> FSNamesystem.java:2683)
> >
> >         at
> > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> appendFileInt(FSNamesystem.java:2982)
> >
> >         at
> > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> appendFile(FSNamesystem.java:2950)
> >
> >         at
> > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.
> append(NameNodeRpcServer.java:655)
> >
> >         at
> > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSi
> deTranslatorPB.append(ClientNamenodeProtocolServerSi
> deTranslatorPB.java:421)
> >
> >         at
> > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$
> ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.
> java)
> >
> >         at
> > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(
> ProtobufRpcEngine.java:616)
> >
> >         at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
> >
> >         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
> >
> >         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
> >
> >         at java.security.AccessController.doPrivileged(Native Method)
> >
> >         at javax.security.auth.Subject.doAs(Subject.java:422)
> >
> >         at
> > org.apache.hadoop.security.UserGroupInformation.doAs(
> UserGroupInformation.java:1698)
> >
> >         at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)
> >
> >
> >         at org.apache.hadoop.ipc.Client.call(Client.java:1475)
> >
> >         at org.apache.hadoop.ipc.Client.call(Client.java:1412)
> >
> >         at
> > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.
> invoke(ProtobufRpcEngine.java:229)
> >
> >         at com.sun.proxy.$Proxy188.append(Unknown Source)
> >
> >         at
> > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslat
> orPB.append(ClientNamenodeProtocolTranslatorPB.java:328)
> >
> >         at sun.reflect.GeneratedMethodAccessor314.invoke(Unknown Source)
> >
> >         at
> > sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> >
> >         at java.lang.reflect.Method.invoke(Method.java:498)
> >
> >         at
> > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(
> RetryInvocationHandler.java:191)
> >
> >         at
> > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(
> RetryInvocationHandler.java:102)
> >
> >         at com.sun.proxy.$Proxy194.append(Unknown Source)
> >
> >         at org.apache.hadoop.hdfs.DFSClient.callAppend(
> DFSClient.java:1808)
> >
> >         at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1877)
> >
> >         at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1847)
> >
> >         at
> > org.apache.hadoop.hdfs.DistributedFileSystem$4.
> doCall(DistributedFileSystem.java:340)
> >
> >         at
> > org.apache.hadoop.hdfs.DistributedFileSystem$4.
> doCall(DistributedFileSystem.java:336)
> >
> >         at
> > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(
> FileSystemLinkResolver.java:81)
> >
> >         at
> > org.apache.hadoop.hdfs.DistributedFileSystem.append(
> DistributedFileSystem.java:348)
> >
> >         at
> > org.apache.hadoop.hdfs.DistributedFileSystem.append(
> DistributedFileSystem.java:318)
> >
> >         at org.apache.hadoop.fs.FileSystem.append(FileSystem.java:1176)
> >
> >         at
> > org.apache.nifi.processors.hadoop.PutHDFS$1$1.process(PutHDFS.java:301)
> >
> >         at
> > org.apache.nifi.controller.repository.StandardProcessSession.read(
> StandardProcessSession.java:2125)
> >
> >         ... 18 common frames omitted
> >
> > On Wed, May 31, 2017 at 10:29 AM, Koji Kawamura <ij...@gmail.com>
> > wrote:
> >
> >> Hi Martin,
> >>
> >> Generally, NiFi processor doesn't load entire content of file and is
> >> capable of handling huge files.
> >> However, having massive amount of FlowFiles can cause OOM issue as
> >> FlowFiles and its Attributes resides on heap.
> >>
> >> I assume you are using 'Line Split Count' as 1 at SplitText.
> >> We recommend to use multiple SplitText processors to not generate many
> >> FlowFiles in a short period of time.
> >> For example, 1st SplitText splits files per 5,000 lines, then the 2nd
> >> SplitText splits into each line.
> >> This way, we can decrease number of FlowFiles at a given time
> >> requiring less heap.
> >>
> >> I hope this helps.
> >>
> >> Thanks,
> >> Koji
> >>
> >> On Wed, May 31, 2017 at 6:20 PM, Martin Eden <ma...@gmail.com>
> >> wrote:
> >> > Hi all,
> >> >
> >> > I have a vanilla Nifi 1.2.0 node with 1GB of heap.
> >> >
> >> > The flow I am trying to run is:
> >> > ListHDFS -> FetchHDFS -> SplitText -> RouteOnContent -> MergeContent
> ->
> >> > PutHDFS
> >> >
> >> > When I give it a 300MB input zip file (2.5GB uncompressed) I am
> getting
> >> > Java OutOfMemoryError as below.
> >> >
> >> > Does NiFi read in the entire contents of files in memory? This is
> >> > unexpected. I thought it is chunking through files. Giving more ram is
> >> not
> >> > a solution as you can always get larger input files in the future.
> >> >
> >> > Does this mean NiFi is not suitable as a scalable ETL solution?
> >> >
> >> > Can someone please explain what is happening and how to mitigate large
> >> > files in NiFi? Any patterns?
> >> >
> >> > Thanks,
> >> > M
> >> >
> >> > ERROR [Timer-Driven Process Thread-9]
> >> > o.a.nifi.processors.standard.SplitText
> >> > SplitText[id=e16939ca-f28f-1178-b66e-054e43a0a724]
> >> > SplitText[id=e16939ca-f28f-1178-b66e-054e43a0a724] failed to process
> >> > session due to java.lang.OutOfMemoryError: Java heap space: {}
> >> >
> >> > java.lang.OutOfMemoryError: Java heap space
> >> >
> >> >         at java.util.HashMap$EntrySet.iterator(HashMap.java:1013)
> >> >
> >> >         at java.util.HashMap.putMapEntries(HashMap.java:511)
> >> >
> >> >         at java.util.HashMap.<init>(HashMap.java:489)
> >> >
> >> >         at
> >> > org.apache.nifi.controller.repository.StandardFlowFileRecord$
> >> Builder.initializeAttributes(StandardFlowFileRecord.java:219)
> >> >
> >> >         at
> >> > org.apache.nifi.controller.repository.StandardFlowFileRecord$
> >> Builder.addAttributes(StandardFlowFileRecord.java:234)
> >> >
> >> >         at
> >> > org.apache.nifi.controller.repository.StandardProcessSession.
> >> putAllAttributes(StandardProcessSession.java:1723)
> >> >
> >> >         at
> >> > org.apache.nifi.processors.standard.SplitText.
> >> updateAttributes(SplitText.java:367)
> >> >
> >> >         at
> >> > org.apache.nifi.processors.standard.SplitText.generateSplitFlowFiles(
> >> SplitText.java:320)
> >> >
> >> >         at
> >> > org.apache.nifi.processors.standard.SplitText.onTrigger(
> >> SplitText.java:258)
> >> >
> >> >         at
> >> > org.apache.nifi.processor.AbstractProcessor.onTrigger(
> >> AbstractProcessor.java:27)
> >> >
> >> >         at
> >> > org.apache.nifi.controller.StandardProcessorNode.onTrigger(
> >> StandardProcessorNode.java:1118)
> >> >
> >> >         at
> >> > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(
> >> ContinuallyRunProcessorTask.java:144)
> >> >
> >> >         at
> >> > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(
> >> ContinuallyRunProcessorTask.java:47)
> >> >
> >> >         at
> >> > org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.
> run(
> >> TimerDrivenSchedulingAgent.java:132)
> >> >
> >> >         at
> >> > java.util.concurrent.Executors$RunnableAdapter.
> call(Executors.java:511)
> >> >
> >> >         at java.util.concurrent.FutureTask.runAndReset(
> >> FutureTask.java:308)
> >> >
> >> >         at
> >> > java.util.concurrent.ScheduledThreadPoolExecutor$
> >> ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> >> >
> >> >         at
> >> > java.util.concurrent.ScheduledThreadPoolExecutor$
> >> ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> >> >
> >> >         at
> >> > java.util.concurrent.ThreadPoolExecutor.runWorker(
> >> ThreadPoolExecutor.java:1142)
> >> >
> >> >         at
> >> > java.util.concurrent.ThreadPoolExecutor$Worker.run(
> >> ThreadPoolExecutor.java:617)
> >> >
> >> >         at java.lang.Thread.run(Thread.java:748)
> >>
>

Re: SplitText processor OOM larger input files

Posted by Joe Witt <jo...@gmail.com>.
Split failed before even with backpressure:
- yes that backpressure kicks in when destination queues for a given
processor have reached their target size (in count of flowfiles or
total size represented).  However, to clarify why the OOM happened it
is important to realize that it is not about 'flow files over a quick
period of time' but rather 'flow files held within a single process
session.  Your SplitText was pulling a single flowfile but then
creating lets say 1,000,000 resulting flow files and then committing
that change.  That happens within a session.  But all those flow file
objects (not their content) are held in memory and at such high
numbers it creates excessive heap usage.  The two phase divide/conquer
approach Koji suggested solves that and eventually we need to solve
that by swapping out the flowfiles to disk within a session.  We
actually do swap out flowfiles sitting on queues after a certain
threshold is reached for this very reason.  This means you should be
able to have many millions of flowfiles sitting around in the flow for
whatever reason and not hit memory problems.

Hope that helps there.

On PutHDFS it looks like possibly two things are trying to append to
the same file?  If yes I'd really recommend not appending but rather
use MergeContent to create data bundles of a given size then write
those to HDFS.

Thanks
Joe

On Wed, May 31, 2017 at 10:33 AM, Martin Eden <ma...@gmail.com> wrote:
> Hi Koji,
>
> Good to know that it can handle large files. I thought it was the case but
> I was just not seeing in practice.
>
> Yes I am using 'Line Split Count' as 1 at SplitText.
>
> I added the extra SplitText processor exactly as you suggested and the OOM
> went away. So, big thanks!!!
>
> However I have 2 follow-up questions:
>
> 1. Before adding the extra SplitText processor I also played with the
> back-pressure settings on the outbound queue of the original SplitText
> processor, since you mentioned that it is generating files at a rate that
> is too high, I figure the queue should slow it down. I tried a limit of
> 100MB or 1000 files and I still got the OOMs in the SplitText processor.
> Why isn't the queue back-pressure helping me in this case? Where would that
> come in handy then? Why id the extra SplitText processor needed to fix
> things and not just the queue back-pressure?
>
> 2. I am now close to completing my flow but I am hitting another error.
> This time it's the last stage, the PutHDFS throws
> o.apache.nifi.processors.hadoop.PutHDFS
> PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec5aa] Failed to write to HDFS
> due to org.apache.nifi.processor.exception.ProcessException: IOException
> thrown from PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec
> See the full stacktrace below.
> I have a parallelism of 1 for my PutHDFS processors. Any ideas why this is
> happening?
>
> Thanks,
> Martin
>
> 2017-05-31 13:50:29,341 ERROR [Timer-Driven Process Thread-5]
> o.apache.nifi.processors.hadoop.PutHDFS
> PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec5aa] F
>
> ailed to write to HDFS due to
> org.apache.nifi.processor.exception.ProcessException: IOException thrown
> from PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec
>
> 5aa]:
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException):
> Failed to APPEND_FILE /nifi_out/unmatched/log
>
> 20160930.csv for DFSClient_NONMAPREDUCE_-1411681085_97 on 10.128.0.7
> because DFSClient_NONMAPREDUCE_-1411681085_97 is already the current lease
> holder.
>
>         at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2882)
>
>         at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInternal(FSNamesystem.java:2683)
>
>         at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInt(FSNamesystem.java:2982)
>
>         at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFile(FSNamesystem.java:2950)
>
>         at
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.append(NameNodeRpcServer.java:655)
>
>         at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.append(ClientNamenodeProtocolServerSideTranslatorPB.java:421)
>
>         at
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.j
>
> ava)
>
>         at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
>
>         at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
>
>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
>
>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
>
>         at java.security.AccessController.doPrivileged(Native Method)
>
>         at javax.security.auth.Subject.doAs(Subject.java:422)
>
>         at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
>
>         at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)
>
> : {}
>
> org.apache.nifi.processor.exception.ProcessException: IOException thrown
> from PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec5aa]:
> org.apache.hadoop.ipc.Re
>
> moteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException):
> Failed to APPEND_FILE /nifi_out/unmatched/log20160930.csv for DFSClient_NON
>
> MAPREDUCE_-1411681085_97 on 10.128.0.7 because
> DFSClient_NONMAPREDUCE_-1411681085_97 is already the current lease holder.
>
>         at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2882)
>
>         at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInternal(FSNamesystem.java:2683)
>
>         at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInt(FSNamesystem.java:2982)
>
>         at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFile(FSNamesystem.java:2950)
>
>         at
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.append(NameNodeRpcServer.java:655)
>
>         at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.append(ClientNamenodeProtocolServerSideTranslatorPB.java:421)
>
>         at
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.j
>
> ava)
>
>         at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
>
>         at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
>
>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
>
>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
>
>         at java.security.AccessController.doPrivileged(Native Method)
>
>         at javax.security.auth.Subject.doAs(Subject.java:422)
>
>         at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
>
>         at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)
>
>
>         at
> org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2148)
>
>         at
> org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2095)
>
>         at org.apache.nifi.processors.hadoop.PutHDFS$1.run(PutHDFS.java:293)
>
>         at java.security.AccessController.doPrivileged(Native Method)
>
>         at javax.security.auth.Subject.doAs(Subject.java:360)
>
>         at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1678)
>
>         at
> org.apache.nifi.processors.hadoop.PutHDFS.onTrigger(PutHDFS.java:223)
>
>         at
> org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
>
>         at
> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1118)
>
>         at
> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:144)
>
>         at
> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
>
>         at
> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:132)
>
>         at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>
>         at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>
>         at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>
>         at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>
>         at java.lang.Thread.run(Thread.java:748)
>
> Caused by: org.apache.hadoop.ipc.RemoteException: Failed to APPEND_FILE
> /nifi_out/unmatched/log20160930.csv for
> DFSClient_NONMAPREDUCE_-1411681085_97 on 10.128.0.7 because
> DFSClient_NONMAPREDUCE_-1411681085_97 is already the current lease holder.
>
>         at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2882)
>
>         at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInternal(FSNamesystem.java:2683)
>
>         at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInt(FSNamesystem.java:2982)
>
>         at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFile(FSNamesystem.java:2950)
>
>         at
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.append(NameNodeRpcServer.java:655)
>
>         at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.append(ClientNamenodeProtocolServerSideTranslatorPB.java:421)
>
>         at
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>
>         at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
>
>         at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
>
>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
>
>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
>
>         at java.security.AccessController.doPrivileged(Native Method)
>
>         at javax.security.auth.Subject.doAs(Subject.java:422)
>
>         at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
>
>         at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)
>
>
>         at org.apache.hadoop.ipc.Client.call(Client.java:1475)
>
>         at org.apache.hadoop.ipc.Client.call(Client.java:1412)
>
>         at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
>
>         at com.sun.proxy.$Proxy188.append(Unknown Source)
>
>         at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.append(ClientNamenodeProtocolTranslatorPB.java:328)
>
>         at sun.reflect.GeneratedMethodAccessor314.invoke(Unknown Source)
>
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
>         at java.lang.reflect.Method.invoke(Method.java:498)
>
>         at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
>
>         at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>
>         at com.sun.proxy.$Proxy194.append(Unknown Source)
>
>         at org.apache.hadoop.hdfs.DFSClient.callAppend(DFSClient.java:1808)
>
>         at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1877)
>
>         at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1847)
>
>         at
> org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:340)
>
>         at
> org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:336)
>
>         at
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>
>         at
> org.apache.hadoop.hdfs.DistributedFileSystem.append(DistributedFileSystem.java:348)
>
>         at
> org.apache.hadoop.hdfs.DistributedFileSystem.append(DistributedFileSystem.java:318)
>
>         at org.apache.hadoop.fs.FileSystem.append(FileSystem.java:1176)
>
>         at
> org.apache.nifi.processors.hadoop.PutHDFS$1$1.process(PutHDFS.java:301)
>
>         at
> org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2125)
>
>         ... 18 common frames omitted
>
> On Wed, May 31, 2017 at 10:29 AM, Koji Kawamura <ij...@gmail.com>
> wrote:
>
>> Hi Martin,
>>
>> Generally, NiFi processor doesn't load entire content of file and is
>> capable of handling huge files.
>> However, having massive amount of FlowFiles can cause OOM issue as
>> FlowFiles and its Attributes resides on heap.
>>
>> I assume you are using 'Line Split Count' as 1 at SplitText.
>> We recommend to use multiple SplitText processors to not generate many
>> FlowFiles in a short period of time.
>> For example, 1st SplitText splits files per 5,000 lines, then the 2nd
>> SplitText splits into each line.
>> This way, we can decrease number of FlowFiles at a given time
>> requiring less heap.
>>
>> I hope this helps.
>>
>> Thanks,
>> Koji
>>
>> On Wed, May 31, 2017 at 6:20 PM, Martin Eden <ma...@gmail.com>
>> wrote:
>> > Hi all,
>> >
>> > I have a vanilla Nifi 1.2.0 node with 1GB of heap.
>> >
>> > The flow I am trying to run is:
>> > ListHDFS -> FetchHDFS -> SplitText -> RouteOnContent -> MergeContent ->
>> > PutHDFS
>> >
>> > When I give it a 300MB input zip file (2.5GB uncompressed) I am getting
>> > Java OutOfMemoryError as below.
>> >
>> > Does NiFi read in the entire contents of files in memory? This is
>> > unexpected. I thought it is chunking through files. Giving more ram is
>> not
>> > a solution as you can always get larger input files in the future.
>> >
>> > Does this mean NiFi is not suitable as a scalable ETL solution?
>> >
>> > Can someone please explain what is happening and how to mitigate large
>> > files in NiFi? Any patterns?
>> >
>> > Thanks,
>> > M
>> >
>> > ERROR [Timer-Driven Process Thread-9]
>> > o.a.nifi.processors.standard.SplitText
>> > SplitText[id=e16939ca-f28f-1178-b66e-054e43a0a724]
>> > SplitText[id=e16939ca-f28f-1178-b66e-054e43a0a724] failed to process
>> > session due to java.lang.OutOfMemoryError: Java heap space: {}
>> >
>> > java.lang.OutOfMemoryError: Java heap space
>> >
>> >         at java.util.HashMap$EntrySet.iterator(HashMap.java:1013)
>> >
>> >         at java.util.HashMap.putMapEntries(HashMap.java:511)
>> >
>> >         at java.util.HashMap.<init>(HashMap.java:489)
>> >
>> >         at
>> > org.apache.nifi.controller.repository.StandardFlowFileRecord$
>> Builder.initializeAttributes(StandardFlowFileRecord.java:219)
>> >
>> >         at
>> > org.apache.nifi.controller.repository.StandardFlowFileRecord$
>> Builder.addAttributes(StandardFlowFileRecord.java:234)
>> >
>> >         at
>> > org.apache.nifi.controller.repository.StandardProcessSession.
>> putAllAttributes(StandardProcessSession.java:1723)
>> >
>> >         at
>> > org.apache.nifi.processors.standard.SplitText.
>> updateAttributes(SplitText.java:367)
>> >
>> >         at
>> > org.apache.nifi.processors.standard.SplitText.generateSplitFlowFiles(
>> SplitText.java:320)
>> >
>> >         at
>> > org.apache.nifi.processors.standard.SplitText.onTrigger(
>> SplitText.java:258)
>> >
>> >         at
>> > org.apache.nifi.processor.AbstractProcessor.onTrigger(
>> AbstractProcessor.java:27)
>> >
>> >         at
>> > org.apache.nifi.controller.StandardProcessorNode.onTrigger(
>> StandardProcessorNode.java:1118)
>> >
>> >         at
>> > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(
>> ContinuallyRunProcessorTask.java:144)
>> >
>> >         at
>> > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(
>> ContinuallyRunProcessorTask.java:47)
>> >
>> >         at
>> > org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(
>> TimerDrivenSchedulingAgent.java:132)
>> >
>> >         at
>> > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> >
>> >         at java.util.concurrent.FutureTask.runAndReset(
>> FutureTask.java:308)
>> >
>> >         at
>> > java.util.concurrent.ScheduledThreadPoolExecutor$
>> ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>> >
>> >         at
>> > java.util.concurrent.ScheduledThreadPoolExecutor$
>> ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>> >
>> >         at
>> > java.util.concurrent.ThreadPoolExecutor.runWorker(
>> ThreadPoolExecutor.java:1142)
>> >
>> >         at
>> > java.util.concurrent.ThreadPoolExecutor$Worker.run(
>> ThreadPoolExecutor.java:617)
>> >
>> >         at java.lang.Thread.run(Thread.java:748)
>>

Re: SplitText processor OOM larger input files

Posted by Martin Eden <ma...@gmail.com>.
Hi Koji,

Good to know that it can handle large files. I thought it was the case but
I was just not seeing in practice.

Yes I am using 'Line Split Count' as 1 at SplitText.

I added the extra SplitText processor exactly as you suggested and the OOM
went away. So, big thanks!!!

However I have 2 follow-up questions:

1. Before adding the extra SplitText processor I also played with the
back-pressure settings on the outbound queue of the original SplitText
processor, since you mentioned that it is generating files at a rate that
is too high, I figure the queue should slow it down. I tried a limit of
100MB or 1000 files and I still got the OOMs in the SplitText processor.
Why isn't the queue back-pressure helping me in this case? Where would that
come in handy then? Why id the extra SplitText processor needed to fix
things and not just the queue back-pressure?

2. I am now close to completing my flow but I am hitting another error.
This time it's the last stage, the PutHDFS throws
o.apache.nifi.processors.hadoop.PutHDFS
PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec5aa] Failed to write to HDFS
due to org.apache.nifi.processor.exception.ProcessException: IOException
thrown from PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec
See the full stacktrace below.
I have a parallelism of 1 for my PutHDFS processors. Any ideas why this is
happening?

Thanks,
Martin

2017-05-31 13:50:29,341 ERROR [Timer-Driven Process Thread-5]
o.apache.nifi.processors.hadoop.PutHDFS
PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec5aa] F

ailed to write to HDFS due to
org.apache.nifi.processor.exception.ProcessException: IOException thrown
from PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec

5aa]:
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException):
Failed to APPEND_FILE /nifi_out/unmatched/log

20160930.csv for DFSClient_NONMAPREDUCE_-1411681085_97 on 10.128.0.7
because DFSClient_NONMAPREDUCE_-1411681085_97 is already the current lease
holder.

        at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2882)

        at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInternal(FSNamesystem.java:2683)

        at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInt(FSNamesystem.java:2982)

        at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFile(FSNamesystem.java:2950)

        at
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.append(NameNodeRpcServer.java:655)

        at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.append(ClientNamenodeProtocolServerSideTranslatorPB.java:421)

        at
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.j

ava)

        at
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)

        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)

        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)

        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)

        at java.security.AccessController.doPrivileged(Native Method)

        at javax.security.auth.Subject.doAs(Subject.java:422)

        at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)

        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)

: {}

org.apache.nifi.processor.exception.ProcessException: IOException thrown
from PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec5aa]:
org.apache.hadoop.ipc.Re

moteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException):
Failed to APPEND_FILE /nifi_out/unmatched/log20160930.csv for DFSClient_NON

MAPREDUCE_-1411681085_97 on 10.128.0.7 because
DFSClient_NONMAPREDUCE_-1411681085_97 is already the current lease holder.

        at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2882)

        at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInternal(FSNamesystem.java:2683)

        at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInt(FSNamesystem.java:2982)

        at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFile(FSNamesystem.java:2950)

        at
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.append(NameNodeRpcServer.java:655)

        at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.append(ClientNamenodeProtocolServerSideTranslatorPB.java:421)

        at
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.j

ava)

        at
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)

        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)

        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)

        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)

        at java.security.AccessController.doPrivileged(Native Method)

        at javax.security.auth.Subject.doAs(Subject.java:422)

        at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)

        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)


        at
org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2148)

        at
org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2095)

        at org.apache.nifi.processors.hadoop.PutHDFS$1.run(PutHDFS.java:293)

        at java.security.AccessController.doPrivileged(Native Method)

        at javax.security.auth.Subject.doAs(Subject.java:360)

        at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1678)

        at
org.apache.nifi.processors.hadoop.PutHDFS.onTrigger(PutHDFS.java:223)

        at
org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)

        at
org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1118)

        at
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:144)

        at
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)

        at
org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:132)

        at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)

        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)

        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)

        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

        at java.lang.Thread.run(Thread.java:748)

Caused by: org.apache.hadoop.ipc.RemoteException: Failed to APPEND_FILE
/nifi_out/unmatched/log20160930.csv for
DFSClient_NONMAPREDUCE_-1411681085_97 on 10.128.0.7 because
DFSClient_NONMAPREDUCE_-1411681085_97 is already the current lease holder.

        at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2882)

        at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInternal(FSNamesystem.java:2683)

        at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInt(FSNamesystem.java:2982)

        at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFile(FSNamesystem.java:2950)

        at
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.append(NameNodeRpcServer.java:655)

        at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.append(ClientNamenodeProtocolServerSideTranslatorPB.java:421)

        at
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)

        at
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)

        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)

        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)

        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)

        at java.security.AccessController.doPrivileged(Native Method)

        at javax.security.auth.Subject.doAs(Subject.java:422)

        at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)

        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)


        at org.apache.hadoop.ipc.Client.call(Client.java:1475)

        at org.apache.hadoop.ipc.Client.call(Client.java:1412)

        at
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)

        at com.sun.proxy.$Proxy188.append(Unknown Source)

        at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.append(ClientNamenodeProtocolTranslatorPB.java:328)

        at sun.reflect.GeneratedMethodAccessor314.invoke(Unknown Source)

        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

        at java.lang.reflect.Method.invoke(Method.java:498)

        at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)

        at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)

        at com.sun.proxy.$Proxy194.append(Unknown Source)

        at org.apache.hadoop.hdfs.DFSClient.callAppend(DFSClient.java:1808)

        at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1877)

        at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1847)

        at
org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:340)

        at
org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:336)

        at
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)

        at
org.apache.hadoop.hdfs.DistributedFileSystem.append(DistributedFileSystem.java:348)

        at
org.apache.hadoop.hdfs.DistributedFileSystem.append(DistributedFileSystem.java:318)

        at org.apache.hadoop.fs.FileSystem.append(FileSystem.java:1176)

        at
org.apache.nifi.processors.hadoop.PutHDFS$1$1.process(PutHDFS.java:301)

        at
org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2125)

        ... 18 common frames omitted

On Wed, May 31, 2017 at 10:29 AM, Koji Kawamura <ij...@gmail.com>
wrote:

> Hi Martin,
>
> Generally, NiFi processor doesn't load entire content of file and is
> capable of handling huge files.
> However, having massive amount of FlowFiles can cause OOM issue as
> FlowFiles and its Attributes resides on heap.
>
> I assume you are using 'Line Split Count' as 1 at SplitText.
> We recommend to use multiple SplitText processors to not generate many
> FlowFiles in a short period of time.
> For example, 1st SplitText splits files per 5,000 lines, then the 2nd
> SplitText splits into each line.
> This way, we can decrease number of FlowFiles at a given time
> requiring less heap.
>
> I hope this helps.
>
> Thanks,
> Koji
>
> On Wed, May 31, 2017 at 6:20 PM, Martin Eden <ma...@gmail.com>
> wrote:
> > Hi all,
> >
> > I have a vanilla Nifi 1.2.0 node with 1GB of heap.
> >
> > The flow I am trying to run is:
> > ListHDFS -> FetchHDFS -> SplitText -> RouteOnContent -> MergeContent ->
> > PutHDFS
> >
> > When I give it a 300MB input zip file (2.5GB uncompressed) I am getting
> > Java OutOfMemoryError as below.
> >
> > Does NiFi read in the entire contents of files in memory? This is
> > unexpected. I thought it is chunking through files. Giving more ram is
> not
> > a solution as you can always get larger input files in the future.
> >
> > Does this mean NiFi is not suitable as a scalable ETL solution?
> >
> > Can someone please explain what is happening and how to mitigate large
> > files in NiFi? Any patterns?
> >
> > Thanks,
> > M
> >
> > ERROR [Timer-Driven Process Thread-9]
> > o.a.nifi.processors.standard.SplitText
> > SplitText[id=e16939ca-f28f-1178-b66e-054e43a0a724]
> > SplitText[id=e16939ca-f28f-1178-b66e-054e43a0a724] failed to process
> > session due to java.lang.OutOfMemoryError: Java heap space: {}
> >
> > java.lang.OutOfMemoryError: Java heap space
> >
> >         at java.util.HashMap$EntrySet.iterator(HashMap.java:1013)
> >
> >         at java.util.HashMap.putMapEntries(HashMap.java:511)
> >
> >         at java.util.HashMap.<init>(HashMap.java:489)
> >
> >         at
> > org.apache.nifi.controller.repository.StandardFlowFileRecord$
> Builder.initializeAttributes(StandardFlowFileRecord.java:219)
> >
> >         at
> > org.apache.nifi.controller.repository.StandardFlowFileRecord$
> Builder.addAttributes(StandardFlowFileRecord.java:234)
> >
> >         at
> > org.apache.nifi.controller.repository.StandardProcessSession.
> putAllAttributes(StandardProcessSession.java:1723)
> >
> >         at
> > org.apache.nifi.processors.standard.SplitText.
> updateAttributes(SplitText.java:367)
> >
> >         at
> > org.apache.nifi.processors.standard.SplitText.generateSplitFlowFiles(
> SplitText.java:320)
> >
> >         at
> > org.apache.nifi.processors.standard.SplitText.onTrigger(
> SplitText.java:258)
> >
> >         at
> > org.apache.nifi.processor.AbstractProcessor.onTrigger(
> AbstractProcessor.java:27)
> >
> >         at
> > org.apache.nifi.controller.StandardProcessorNode.onTrigger(
> StandardProcessorNode.java:1118)
> >
> >         at
> > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(
> ContinuallyRunProcessorTask.java:144)
> >
> >         at
> > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(
> ContinuallyRunProcessorTask.java:47)
> >
> >         at
> > org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(
> TimerDrivenSchedulingAgent.java:132)
> >
> >         at
> > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> >
> >         at java.util.concurrent.FutureTask.runAndReset(
> FutureTask.java:308)
> >
> >         at
> > java.util.concurrent.ScheduledThreadPoolExecutor$
> ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> >
> >         at
> > java.util.concurrent.ScheduledThreadPoolExecutor$
> ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> >
> >         at
> > java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> >
> >         at
> > java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> >
> >         at java.lang.Thread.run(Thread.java:748)
>

Re: SplitText processor OOM larger input files

Posted by Koji Kawamura <ij...@gmail.com>.
Hi Martin,

Generally, NiFi processor doesn't load entire content of file and is
capable of handling huge files.
However, having massive amount of FlowFiles can cause OOM issue as
FlowFiles and its Attributes resides on heap.

I assume you are using 'Line Split Count' as 1 at SplitText.
We recommend to use multiple SplitText processors to not generate many
FlowFiles in a short period of time.
For example, 1st SplitText splits files per 5,000 lines, then the 2nd
SplitText splits into each line.
This way, we can decrease number of FlowFiles at a given time
requiring less heap.

I hope this helps.

Thanks,
Koji

On Wed, May 31, 2017 at 6:20 PM, Martin Eden <ma...@gmail.com> wrote:
> Hi all,
>
> I have a vanilla Nifi 1.2.0 node with 1GB of heap.
>
> The flow I am trying to run is:
> ListHDFS -> FetchHDFS -> SplitText -> RouteOnContent -> MergeContent ->
> PutHDFS
>
> When I give it a 300MB input zip file (2.5GB uncompressed) I am getting
> Java OutOfMemoryError as below.
>
> Does NiFi read in the entire contents of files in memory? This is
> unexpected. I thought it is chunking through files. Giving more ram is not
> a solution as you can always get larger input files in the future.
>
> Does this mean NiFi is not suitable as a scalable ETL solution?
>
> Can someone please explain what is happening and how to mitigate large
> files in NiFi? Any patterns?
>
> Thanks,
> M
>
> ERROR [Timer-Driven Process Thread-9]
> o.a.nifi.processors.standard.SplitText
> SplitText[id=e16939ca-f28f-1178-b66e-054e43a0a724]
> SplitText[id=e16939ca-f28f-1178-b66e-054e43a0a724] failed to process
> session due to java.lang.OutOfMemoryError: Java heap space: {}
>
> java.lang.OutOfMemoryError: Java heap space
>
>         at java.util.HashMap$EntrySet.iterator(HashMap.java:1013)
>
>         at java.util.HashMap.putMapEntries(HashMap.java:511)
>
>         at java.util.HashMap.<init>(HashMap.java:489)
>
>         at
> org.apache.nifi.controller.repository.StandardFlowFileRecord$Builder.initializeAttributes(StandardFlowFileRecord.java:219)
>
>         at
> org.apache.nifi.controller.repository.StandardFlowFileRecord$Builder.addAttributes(StandardFlowFileRecord.java:234)
>
>         at
> org.apache.nifi.controller.repository.StandardProcessSession.putAllAttributes(StandardProcessSession.java:1723)
>
>         at
> org.apache.nifi.processors.standard.SplitText.updateAttributes(SplitText.java:367)
>
>         at
> org.apache.nifi.processors.standard.SplitText.generateSplitFlowFiles(SplitText.java:320)
>
>         at
> org.apache.nifi.processors.standard.SplitText.onTrigger(SplitText.java:258)
>
>         at
> org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
>
>         at
> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1118)
>
>         at
> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:144)
>
>         at
> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
>
>         at
> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:132)
>
>         at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>
>         at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>
>         at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>
>         at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>
>         at java.lang.Thread.run(Thread.java:748)