You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flume.apache.org by Balthasar Schopman <b....@tech.leaseweb.com> on 2015/10/13 09:12:30 UTC

recovery after memory transaction capacity is exceeded

Hi,

I'm creating a proof-of-concept of a Flume agent that'll buffer events and stops consuming events from the source when the sink is unavailable. Only when the sink is available again, the buffered events should be processed and then the source restarts consumption.

For this I've created a simple agent, which reads from a SpoolDir and writes to a file. To simulate that the sink service is down, I change file permissions so Flume can't write to it. Then I start Flume some events are buffered in the memory channel and it stops consuming events when the channel capacity is full, as expected. As soon as the file becomes writeable, the sink is able to process the events and Flume recovers. However, that only works when the transaction capacity is not exceeded. As soon as the transaction capacity is exceeded, Flume never recovers and keeps writing the following error:

    2015-10-02 14:52:51,940 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR -
    org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable to
    deliver event. Exception follows.
    org.apache.flume.EventDeliveryException: Failed to process transaction
        at org.apache.flume.sink.RollingFileSink.process(RollingFileSink.java:218)
        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:745)
    Caused by: org.apache.flume.ChannelException: Take list for MemoryTransaction,
    capacity 4 full, consider committing more frequently, increasing capacity, or
    increasing thread count
        at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(MemoryChannel.java:96)
        at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
        at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
        at org.apache.flume.sink.RollingFileSink.process(RollingFileSink.java:191)
        ... 3 more

As soon as the number of events buffered in memory exceed the transaction capacity (4) this error occurs. I don't understand why, because the batchSize of the fileout is 1, so it should take out the events one by one.

This is the config I'm using:

    agent.sources = spool-src
    agent.channels = mem-channel
    agent.sinks = fileout

    agent.sources.spool-src.channels = mem-channel
    agent.sources.spool-src.type = spooldir
    agent.sources.spool-src.spoolDir = /tmp/flume-spool
    agent.sources.spool-src.batchSize = 1

    agent.channels.mem-channel.type = memory
    agent.channels.mem-channel.capacity = 10
    agent.channels.mem-channel.transactionCapacity = 4

    agent.sinks.fileout.channel = mem-channel
    agent.sinks.fileout.type = file_roll
    agent.sinks.fileout.sink.directory = /tmp/flume-output
    agent.sinks.fileout.sink.rollInterval = 0
    agent.sinks.fileout.batchSize = 1

I've tested this config with different values for the channel capacity & transaction capacity (e.g., 3 and 3), but haven't found a config where Flume is able to recover after the channel capacity is full. Any ideas on how to achieve this?

--
Kind regards,
Balthasar Schopman
LeaseWeb CDN Innovation Engineer

Kind regards,

Balthasar Schopman
Software Developer
LeaseWeb Technologies B.V.

T: +31 20 316 0232
M:
E: b.schopman@tech.leaseweb.com
W: http://www.leaseweb.com

Luttenbergweg 8, 1101 EC Amsterdam, Netherlands



RE: recovery after memory transaction capacity is exceeded

Posted by Balthasar Schopman <b....@tech.leaseweb.com>.
I re-ran the test with the source & sink batchSizes set to 100, the memory channel transactionCapacity set to 100 and its capacity on 300. With those values, the proof of concept works exactly as expected, so it indeed that bug that caused my problem. Thank you!

To specify the statement "the proof of concept works exactly as expected": if Flume isn't able to write events to the sink, it will read files from the spoolDir until the memory capacity is maxed out (i.e. 300 events). Any files that are added to the spoolDir are _not_ processed, until Flume is able to write to the sink. As soon as Flume is able to write to the sink again, all the stalled events are processed (the events in memory and the unread files in the spoolDir).

--
Kind regards,
Balthasar Schopman
LeaseWeb CDN Innovation Engineer


Kind regards,

Balthasar Schopman
Software Developer
LeaseWeb Technologies B.V.

T: +31 20 316 0232

E: b.schopman@tech.leaseweb.com
W: www.leaseweb.com<http://www.leaseweb.com>

Luttenbergweg 8,        1101 EC Amsterdam,      Netherlands


________________________________
From: Gonzalo Herreros [gherreros@gmail.com]
Sent: Tuesday, October 13, 2015 10:04
To: user@flume.apache.org
Subject: Re: recovery after memory transaction capacity is exceeded

I believe you are suffering from this bug: https://issues.apache.org/jira/browse/FLUME-2778
So when it's running is able to keep up but when the channel has more than 4 events queued, the Sink tried to extract 100 (default batch size) and you get that error.

Regards,
Gonzalo

On 13 October 2015 at 08:12, Balthasar Schopman <b....@tech.leaseweb.com>> wrote:
Hi,

I'm creating a proof-of-concept of a Flume agent that'll buffer events and stops consuming events from the source when the sink is unavailable. Only when the sink is available again, the buffered events should be processed and then the source restarts consumption.

For this I've created a simple agent, which reads from a SpoolDir and writes to a file. To simulate that the sink service is down, I change file permissions so Flume can't write to it. Then I start Flume some events are buffered in the memory channel and it stops consuming events when the channel capacity is full, as expected. As soon as the file becomes writeable, the sink is able to process the events and Flume recovers. However, that only works when the transaction capacity is not exceeded. As soon as the transaction capacity is exceeded, Flume never recovers and keeps writing the following error:

    2015-10-02 14:52:51,940 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR -
    org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable to
    deliver event. Exception follows.
    org.apache.flume.EventDeliveryException: Failed to process transaction
        at org.apache.flume.sink.RollingFileSink.process(RollingFileSink.java:218)
        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:745)
    Caused by: org.apache.flume.ChannelException: Take list for MemoryTransaction,
    capacity 4 full, consider committing more frequently, increasing capacity, or
    increasing thread count
        at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(MemoryChannel.java:96)
        at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
        at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
        at org.apache.flume.sink.RollingFileSink.process(RollingFileSink.java:191)
        ... 3 more

As soon as the number of events buffered in memory exceed the transaction capacity (4) this error occurs. I don't understand why, because the batchSize of the fileout is 1, so it should take out the events one by one.

This is the config I'm using:

    agent.sources = spool-src
    agent.channels = mem-channel
    agent.sinks = fileout

    agent.sources.spool-src.channels = mem-channel
    agent.sources.spool-src.type = spooldir
    agent.sources.spool-src.spoolDir = /tmp/flume-spool
    agent.sources.spool-src.batchSize = 1

    agent.channels.mem-channel.type = memory
    agent.channels.mem-channel.capacity = 10
    agent.channels.mem-channel.transactionCapacity = 4

    agent.sinks.fileout.channel = mem-channel
    agent.sinks.fileout.type = file_roll
    agent.sinks.fileout.sink.directory = /tmp/flume-output
    agent.sinks.fileout.sink.rollInterval = 0
    agent.sinks.fileout.batchSize = 1

I've tested this config with different values for the channel capacity & transaction capacity (e.g., 3 and 3), but haven't found a config where Flume is able to recover after the channel capacity is full. Any ideas on how to achieve this?

--
Kind regards,
Balthasar Schopman
LeaseWeb CDN Innovation Engineer

Kind regards,

Balthasar Schopman
Software Developer
LeaseWeb Technologies B.V.

T: +31 20 316 0232
M:
E: b.schopman@tech.leaseweb.com<ma...@tech.leaseweb.com>
W: http://www.leaseweb.com

Luttenbergweg 8, 1101 EC Amsterdam, Netherlands





Re: recovery after memory transaction capacity is exceeded

Posted by Gonzalo Herreros <gh...@gmail.com>.
I believe you are suffering from this bug:
https://issues.apache.org/jira/browse/FLUME-2778
So when it's running is able to keep up but when the channel has more than
4 events queued, the Sink tried to extract 100 (default batch size) and you
get that error.

Regards,
Gonzalo

On 13 October 2015 at 08:12, Balthasar Schopman <
b.schopman@tech.leaseweb.com> wrote:

> Hi,
>
> I'm creating a proof-of-concept of a Flume agent that'll buffer events and
> stops consuming events from the source when the sink is unavailable. Only
> when the sink is available again, the buffered events should be processed
> and then the source restarts consumption.
>
> For this I've created a simple agent, which reads from a SpoolDir and
> writes to a file. To simulate that the sink service is down, I change file
> permissions so Flume can't write to it. Then I start Flume some events are
> buffered in the memory channel and it stops consuming events when the
> channel capacity is full, as expected. As soon as the file becomes
> writeable, the sink is able to process the events and Flume recovers.
> However, that only works when the transaction capacity is not exceeded. As
> soon as the transaction capacity is exceeded, Flume never recovers and
> keeps writing the following error:
>
>     2015-10-02 14:52:51,940
> (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR -
>     org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)]
> Unable to
>     deliver event. Exception follows.
>     org.apache.flume.EventDeliveryException: Failed to process transaction
>         at
> org.apache.flume.sink.RollingFileSink.process(RollingFileSink.java:218)
>         at
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>         at
> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>         at java.lang.Thread.run(Thread.java:745)
>     Caused by: org.apache.flume.ChannelException: Take list for
> MemoryTransaction,
>     capacity 4 full, consider committing more frequently, increasing
> capacity, or
>     increasing thread count
>         at
> org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(MemoryChannel.java:96)
>         at
> org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
>         at
> org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
>         at
> org.apache.flume.sink.RollingFileSink.process(RollingFileSink.java:191)
>         ... 3 more
>
> As soon as the number of events buffered in memory exceed the transaction
> capacity (4) this error occurs. I don't understand why, because the
> batchSize of the fileout is 1, so it should take out the events one by one.
>
> This is the config I'm using:
>
>     agent.sources = spool-src
>     agent.channels = mem-channel
>     agent.sinks = fileout
>
>     agent.sources.spool-src.channels = mem-channel
>     agent.sources.spool-src.type = spooldir
>     agent.sources.spool-src.spoolDir = /tmp/flume-spool
>     agent.sources.spool-src.batchSize = 1
>
>     agent.channels.mem-channel.type = memory
>     agent.channels.mem-channel.capacity = 10
>     agent.channels.mem-channel.transactionCapacity = 4
>
>     agent.sinks.fileout.channel = mem-channel
>     agent.sinks.fileout.type = file_roll
>     agent.sinks.fileout.sink.directory = /tmp/flume-output
>     agent.sinks.fileout.sink.rollInterval = 0
>     agent.sinks.fileout.batchSize = 1
>
> I've tested this config with different values for the channel capacity &
> transaction capacity (e.g., 3 and 3), but haven't found a config where
> Flume is able to recover after the channel capacity is full. Any ideas on
> how to achieve this?
>
> --
> Kind regards,
> Balthasar Schopman
> LeaseWeb CDN Innovation Engineer
>
> Kind regards,
>
> Balthasar Schopman
> Software Developer
> LeaseWeb Technologies B.V.
>
> T: +31 20 316 0232
> M:
> E: b.schopman@tech.leaseweb.com
> W: http://www.leaseweb.com
>
> Luttenbergweg 8, 1101 EC Amsterdam, Netherlands
>
>
>