You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flume.apache.org by "Adam Gent (JIRA)" <ji...@apache.org> on 2014/06/18 19:42:24 UTC

[jira] [Commented] (FLUME-2403) FileChannel causes OOME for large messages.

    [ https://issues.apache.org/jira/browse/FLUME-2403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14036012#comment-14036012 ] 

Adam Gent commented on FLUME-2403:
----------------------------------

I updated the title. 

{quote}Curious how do you plan to measure the size of the incoming event in the source w/o actually making a copy of it into memory.{quote}

Just like how many other protocols work: I would hope that a header would be set with the payload size. For example this is how HTTP works. There is a header that describe the body size or if its streaming etc.. The headers would obviously always be loaded into memory but the body might not be.

I realize that is not how the Flume {{Event}} currently works so my only option in terms of size filtering in flume is to have a custom {{Source}} that reads the headers from the original source (AMQP or whatever) to determine if it should drop the message or not.

Given that our average message size is about 4K (and not 4MB) I am sort of curious what sort of performance we would get defer loading the body till absolutely needed and when needed pull the byte[] from the filesystem or configurable buffer (mmap). In other words no copying and lazy byte[] array since I can't change the Event contract. Perhaps a work around is even special headers to go tell the FileChannel where to pick up the body from a file created by the Source. This would be to avoid keeping the byte[] in memory for long and rely on the kernel fs cache.



> FileChannel causes OOME for large messages.
> -------------------------------------------
>
>                 Key: FLUME-2403
>                 URL: https://issues.apache.org/jira/browse/FLUME-2403
>             Project: Flume
>          Issue Type: Bug
>          Components: Channel
>    Affects Versions: v1.5.0
>            Reporter: Adam Gent
>            Priority: Minor
>
> The spillable memory channel will fail rather badly on large messages.
> {code}
> Error while writing to required channel: FileChannel es1 { dataDirs: [/var/lib/flume/data] }
> java.lang.OutOfMemoryError: Java heap space
>         at java.util.Arrays.copyOf(Arrays.java:2271)
>         at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
>         at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
>         at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
>         at com.google.protobuf.CodedOutputStream.writeRawBytes(CodedOutputStream.java:984)
>         at com.google.protobuf.CodedOutputStream.writeRawBytes(CodedOutputStream.java:905)
>         at com.google.protobuf.CodedOutputStream.writeBytesNoTag(CodedOutputStream.java:386)
>         at com.google.protobuf.CodedOutputStream.writeBytes(CodedOutputStream.java:229)
>         at org.apache.flume.channel.file.proto.ProtosFactory$FlumeEvent.writeTo(ProtosFactory.java:6259)
>         at com.google.protobuf.CodedOutputStream.writeMessageNoTag(CodedOutputStream.java:380)
>         at com.google.protobuf.CodedOutputStream.writeMessage(CodedOutputStream.java:222)
>         at org.apache.flume.channel.file.proto.ProtosFactory$Put.writeTo(ProtosFactory.java:4112)
>         at com.google.protobuf.AbstractMessageLite.writeDelimitedTo(AbstractMessageLite.java:90)
>         at org.apache.flume.channel.file.Put.writeProtos(Put.java:93)
>         at org.apache.flume.channel.file.TransactionEventRecord.toByteBuffer(TransactionEventRecord.java:174)
>         at org.apache.flume.channel.file.Log.put(Log.java:611)
>         at org.apache.flume.channel.file.FileChannel$FileBackedTransaction.doPut(FileChannel.java:458)
>         at org.apache.flume.channel.BasicTransactionSemantics.put(BasicTransactionSemantics.java:93)
>         at org.apache.flume.channel.SpillableMemoryChannel$SpillableMemoryTransaction.commitPutsToOverflow(SpillableMemoryChannel.java:490)
>         at org.apache.flume.channel.SpillableMemoryChannel$SpillableMemoryTransaction.putCommit(SpillableMemoryChannel.java:480)
>         at org.apache.flume.channel.SpillableMemoryChannel$SpillableMemoryTransaction.doCommit(SpillableMemoryChannel.java:401)
>         at org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
>         at org.apache.flume.channel.ChannelProcessor.processEvent(ChannelProcessor.java:267)
>         at org.apache.flume.source.rabbitmq.RabbitMQSource.process(RabbitMQSource.java:162)
>         at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:139)
>         at java.lang.Thread.run(Thread.java:744)
> {code}
> My config:
> {code}
> agent.channels.es1.type = SPILLABLEMEMORY
> agent.channels.es1.memoryCapacity = 10000
> agent.channels.es1.overflowCapacity = 1000000
> agent.channels.es1.byteCapacity = 800000
> agent.channels.es1.checkpointDir = /var/lib/flume/checkpoint
> agent.channels.es1.dataDirs = /var/lib/flume/data
> {code}
> I haven't looked at the code but I have some concerns like why a ByteArrayOutputStream is being used instead of some other buffered stream directly to the file system? Perhaps its because of the transactional nature but I'm pretty sure you can write to the filesystem and rollback as Kafka and modern databases do this with fsync.
> One could argue that I should just raise the max heap but this message is coming from a RabbitMQ which had no issue holding on to the message (I believe the message is like 500K).



--
This message was sent by Atlassian JIRA
(v6.2#6252)